2 // https://github.com/zenparsing/es-observable
3 var $export = require('./_export')
4 , global = require('./_global')
5 , core = require('./_core')
6 , microtask = require('./_microtask')()
7 , OBSERVABLE = require('./_wks')('observable')
8 , aFunction = require('./_a-function')
9 , anObject = require('./_an-object')
10 , anInstance = require('./_an-instance')
11 , redefineAll = require('./_redefine-all')
12 , hide = require('./_hide')
13 , forOf = require('./_for-of')
14 , RETURN = forOf.RETURN;
16 var getMethod = function(fn){
17 return fn == null ? undefined : aFunction(fn);
20 var cleanupSubscription = function(subscription){
21 var cleanup = subscription._c;
23 subscription._c = undefined;
28 var subscriptionClosed = function(subscription){
29 return subscription._o === undefined;
32 var closeSubscription = function(subscription){
33 if(!subscriptionClosed(subscription)){
34 subscription._o = undefined;
35 cleanupSubscription(subscription);
39 var Subscription = function(observer, subscriber){
43 observer = new SubscriptionObserver(this);
45 var cleanup = subscriber(observer)
46 , subscription = cleanup;
48 if(typeof cleanup.unsubscribe === 'function')cleanup = function(){ subscription.unsubscribe(); };
49 else aFunction(cleanup);
55 } if(subscriptionClosed(this))cleanupSubscription(this);
58 Subscription.prototype = redefineAll({}, {
59 unsubscribe: function unsubscribe(){ closeSubscription(this); }
62 var SubscriptionObserver = function(subscription){
63 this._s = subscription;
66 SubscriptionObserver.prototype = redefineAll({}, {
67 next: function next(value){
68 var subscription = this._s;
69 if(!subscriptionClosed(subscription)){
70 var observer = subscription._o;
72 var m = getMethod(observer.next);
73 if(m)return m.call(observer, value);
76 closeSubscription(subscription);
83 error: function error(value){
84 var subscription = this._s;
85 if(subscriptionClosed(subscription))throw value;
86 var observer = subscription._o;
87 subscription._o = undefined;
89 var m = getMethod(observer.error);
91 value = m.call(observer, value);
94 cleanupSubscription(subscription);
98 } cleanupSubscription(subscription);
101 complete: function complete(value){
102 var subscription = this._s;
103 if(!subscriptionClosed(subscription)){
104 var observer = subscription._o;
105 subscription._o = undefined;
107 var m = getMethod(observer.complete);
108 value = m ? m.call(observer, value) : undefined;
111 cleanupSubscription(subscription);
115 } cleanupSubscription(subscription);
121 var $Observable = function Observable(subscriber){
122 anInstance(this, $Observable, 'Observable', '_f')._f = aFunction(subscriber);
125 redefineAll($Observable.prototype, {
126 subscribe: function subscribe(observer){
127 return new Subscription(observer, this._f);
129 forEach: function forEach(fn){
131 return new (core.Promise || global.Promise)(function(resolve, reject){
133 var subscription = that.subscribe({
134 next : function(value){
139 subscription.unsubscribe();
149 redefineAll($Observable, {
150 from: function from(x){
151 var C = typeof this === 'function' ? this : $Observable;
152 var method = getMethod(anObject(x)[OBSERVABLE]);
154 var observable = anObject(method.call(x));
155 return observable.constructor === C ? observable : new C(function(observer){
156 return observable.subscribe(observer);
159 return new C(function(observer){
161 microtask(function(){
164 if(forOf(x, false, function(it){
166 if(done)return RETURN;
167 }) === RETURN)return;
172 } observer.complete();
175 return function(){ done = true; };
179 for(var i = 0, l = arguments.length, items = Array(l); i < l;)items[i] = arguments[i++];
180 return new (typeof this === 'function' ? this : $Observable)(function(observer){
182 microtask(function(){
184 for(var i = 0; i < items.length; ++i){
185 observer.next(items[i]);
187 } observer.complete();
190 return function(){ done = true; };
195 hide($Observable.prototype, OBSERVABLE, function(){ return this; });
197 $export($export.G, {Observable: $Observable});
199 require('./_set-species')('Observable');