diff --git a/sources/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md b/sources/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md deleted file mode 100644 index 04a25a5cc1..0000000000 --- a/sources/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md +++ /dev/null @@ -1,1031 +0,0 @@ -BriFuture is translating - -The Extensive Guide to Creating Streams in RxJS -============================================================ - -![](https://cdn-images-1.medium.com/max/900/1*hj8mGnl5tM_lAlx5_vqS-Q.jpeg) - -For most developers the first contact with RxJS is established by libraries, like Angular. Some functions return streams and to make use of them the focus naturally is on operators. - -At some point mixing reactive and some of the non-reactive code seems practical. Then people get interested in creating streams themselves. Whenever you are dealing with asynchronous code or data processing, chances are that streams are a good option. - -RxJS offers numerous ways to create streams. Whatever situation you are facing, there is one perfect way for you to create a stream. You may not need them all, but knowing them can save you time and some code. - -I have put all possible options into four categories, based on their main purpose: - -* Stream existing data - -* Generate data - -* Interoperate with existing APIs - -* Combine and select from existing streams - -Note: The examples use RxJS 6 and can be different from older versions. Something that’s different for sure is the way you import the functions. - -RxJS 6 - -``` -import {of, from} from 'rxjs'; -``` - -``` -of(...); -from(...); -``` - -RxJS < 6 - -``` -import { Observable } from 'rxjs/Observable'; -import 'rxjs/add/observable/of'; -import 'rxjs/add/observable/from'; -``` - -``` -Observable.of(...); -Observable.from(...); -``` - -``` -//or -``` - -``` -import { of } from 'rxjs/observable/of'; -import { from } from 'rxjs/observable/from'; -``` - -``` -of(...); -from(...); -``` - -A note on the stream diagrams: - -* | means the stream completes - -* X means the stream terminates with an error - -* … means the stream goes on indefinitely - -* * * - -### Stream Existing Data - -You have some data and want to feed them into a stream. There are three flavors, all of which also allow you to provide a scheduler as the last argument (If you want to know more about schedulers you can take a look at my [previous article][5]). All resulting streams will be cold. - -#### of - -Use  _of_  if you have only one element or a few separate ones. - -``` -of(1,2,3) - .subscribe(); -``` - -``` -// Produces -// 1 2 3 | -``` - -#### from - -Use  _from_  if you have an array or  _Iterable_  and want all elements in it emitted to the stream. You can also use it to convert a promise to an observable. - -``` -const foo = [1,2,3]; -``` - -``` -from(foo) - .subscribe(); -``` - -``` -// Produces -// 1 2 3 | -``` - -#### pairs - -Streams key/value pairs of an object. Particularly useful if the object represents a dictionary. - -``` -const foo = { a: 1, b: 2}; -``` - -``` -pairs(foo) - .subscribe(); -``` - -``` -// Produces -// [a,1] [b,2] | -``` - -#### What about other data structures? - -Maybe your data is stored in a custom structure that does not implement the  _Iterable_  protocol or you have a recursive, tree-like structure. In those cases one of following options might be suitable: - -* Extract data to an array first - -* Use the  _generate_  function from the next section to iterate over the data - -* Create a custom stream (see that section) - -* Create an iterator - -Options 2 and 3 are explained later, so I focus on creating an iterator here. We can create a stream from an  _iterable_  by calling  _from_ . An  _iterable_  is an object that can deliver an iterator (see [this mdn article][6] if you are interested in the details). - -One simple way to create an iterator is a [generator function][7]. When you invoke a generator function, it returns an object that conforms to both the  _iterable_ protocol and the  _iterator_  protocol. - -``` -//Our custom data structure -class List { - add(element) ... - get(index) ... - get size() ... - ... -} -``` - -``` -function* listIterator(list) { - for (let i = 0; i console.log("foo"); -//prints foo after 5 seconds -``` - -Most of the time interval will be used to process data periodically: - -``` -interval(10000).pipe( - flatMap(i => fetch("https://server/stockTicker") -).subscribe(updateChart) -``` - -This will get new data every 10 seconds and update the screen. - -#### generate - -A more complex function that allows you to emit a sequence of any type. It has some overloads and I show you the most interesting one(s): - -``` -generate( - 0, // start with this value - x => x < 10, // condition: emit as long as a value is less than 10 - x => x*2 // iteration: double the previous value -).subscribe(); -``` - -``` -// Produces -// 1 2 4 8 | -``` - -You can also use it to iterate over values, if a structure does not implement the  _Iterable_  interface. Let’s try that with our list example from before: - -``` -const myList = new List(); -myList.add(1); -myList.add(3); -``` - -``` -generate( - 0, // start with this value - i => i < list.size, // condition: emit until we have processed the whole list - i => ++i, // iteration: get next index - i => list.get(i) // selection: get value from list -).subscribe(); -``` - -``` -// Produces -// 1 3 | -``` - -As you can see I have added another argument: The selector. It works like the  _map_  operator and converts the generated value to something more useful. - -* * * - -### Empty Streams - -Sometimes you need to pass or return a stream that does not emit any data. There are three functions, one for every possible situation. You can pass a scheduler to all functions.  _empty_  and  _throwError_  accept a scheduler as argument. - -#### empty - -Creates a stream that completes without emitting a value. - -``` -empty() - .subscribe(); -``` - -``` -// Produces -// | -``` - -#### never - -Creates a stream that never completes, but also never emits anything. - -``` -never() - .subscribe(); -``` - -``` -// Produces -// ... -``` - -#### throwError - -Creates a stream that fails without emitting a value. - -``` -throwError('error') - .subscribe(); -``` - -``` -// Produces -// X -``` - -* * * - -### Hook into existing APIs - -Not all libraries and all of your legacy code use or support streams. Luckily RxJS provides functions to bridge non-reactive and reactive code. This section discusses only patterns provided by RxJS for exactly that purpose. - -You may also be interested in this [extensive article][8] from [Ben Lesh][9] covering probably every possible way to interoperate with promises. - -#### from - -We already had that and I list it here too because it can be used to wrap a promise into an observable. - -``` -from(new Promise(resolve => resolve(1))) - .subscribe(); -``` - -``` -// Produces -// 1 | -``` - -#### fromEvent - -Adds an event listener to a DOM element and I am pretty sure you know that. What you may not know is that you can also use it with other types, e.g. a jQuery object. - -``` -const element = $('#fooButton'); // creates a jQuery object for a DOM element -``` - -``` -from(element, 'click') - .subscribe(); -``` - -``` -// Produces -// clickEvent ... -``` - -#### fromEventPattern - -In order to understand why we need this one if we already have fromEvent, we need to understand how fromEvent works. Take this code: - -``` -from(document, 'click') - .subscribe(); -``` - -It tells RxJS that we want to listen to click events from the document. During subscription RxJS finds out that document is an  _EventTarget_  type, hence it can call  _addEventListener_  on it. If we pass a jQuery object instead of document, then RxJS knows that it has to call  _on_  instead. - -This example using  _fromEventPattern_  basically does the same as  _fromEvent_ : - -``` -function addClickHandler(handler) { - document.addEventListener('click', handler); -} -``` - -``` -function removeClickHandler(handler) { - document.removeEventListener('click', handler); -} -``` - -``` -fromEventPattern( - addClickHandler, - removeClickHandler, -) -.subscribe(console.log); -``` - -``` -//is equivalent to -fromEvent(document, 'click') -``` - -RxJS itself creates the actual listener ( _handler_ ) and your job is to add and remove it. The purpose of  _fromEventPattern_  is basically to tell RxJS how to register and remove event listeners. - -Now imagine you use a library where you have to call a method named  _registerListener_ . We no longer can use  _fromEvent_  because it doesn’t know how to deal with it. - -``` -const listeners = []; -``` - -``` -class Foo { - registerListener(listener) { - listeners.push(listener); - } -``` - -``` - emit(value) { - listeners.forEach(listener => listener(value)); - } -} -``` - -``` -const foo = new Foo(); -``` - -``` -fromEventPattern(listener => foo.registerListener(listener)) - .subscribe(); -``` - -``` -foo.emit(1); -``` - -``` -// Produces -// 1 ... -``` - -When we call foo.emit(1) the listener from RxJS is called and it can emit the value to the stream. - -You could also use it to listen to more than one event type or connect with any API that communicates via callbacks, e.g. the WebWorker API: - -``` -const myWorker = new Worker('worker.js'); -``` - -``` -fromEventPattern( - handler => { myWorker.onmessage = handler }, - handler => { myWorker.onmessage = undefined } -) -.subscribe(); -``` - -``` -// Produces -// workerMessage ... -``` - -#### bindCallback - -This is similar to fromEventPattern, but it’s only meant for single values. That is the stream completes after the callback has been invoked . The usage is different as well – You wrap the function with bindCallback, then it magically returns a stream when it‘s called: - -``` -function foo(value, callback) { - callback(value); -} -``` - -``` -// without streams -foo(1, console.log); //prints 1 in the console -``` - -``` -// with streams -const reactiveFoo = bindCallback(foo); -//when we call reactiveFoo it returns an observable -``` - -``` -reactiveFoo(1) - .subscribe(console.log); //prints 1 in the console -``` - -``` -// Produces -// 1 | -``` - -#### websocket - -Yes, you can actually create a websocket connection and expose it as stream: - -``` -import { webSocket } from 'rxjs/webSocket'; -``` - -``` -let socket$ = webSocket('ws://localhost:8081'); -``` - -``` -//receive messages -socket$.subscribe( - (msg) => console.log('message received: ' + msg), - (err) => console.log(err), - () => console.log('complete') * ); -``` - -``` -//send message -socket$.next(JSON.stringify({ op: 'hello' })); -``` - -It’s really that easy to add websocket support to your application.  _websocket_ creates a subject. That means you can both subscribe to it in order to receive messages and send messages through it by calling  _next_ . - -#### ajax - -Just so you know it: Similar to websocket and offers support for AJAX requests. You probably use a library or framework with built-in AJAX support anyway. And if you don’t then I recommend using fetch (and a polyfill if necessary) instead and wrap the returned promise into an observable (see also the  _defer_  function below). - -* * * - -### Custom Streams - -Sometimes the already presented functions are not flexible enough. Or you need more control over subscriptions. - -#### Subject - -A subject is a special object that allows you to emit data to the stream and control it. The subject itself is also an observable, but if you want to expose the stream to other code it’s recommended to use the  _asObservable_  method. That way you cannot accidentally call the source methods. - -``` -const subject = new Subject(); -const observable = subject.asObservable(); -``` - -``` -observable.subscribe(); -``` - -``` -subject.next(1); -subject.next(2); -subject.complete(); -``` - -``` -// Produces -// 1 2 | -``` - -Note that values emitted before subscriptions are “lost”: - -``` -const subject = new Subject(); -const observable = subject.asObservable(); -``` - -``` -subject.next(1); -``` - -``` -observable.subscribe(console.log); -``` - -``` -subject.next(2); -subject.complete(); -``` - -``` -// Prints -// 2 -``` - -In addition to the regular subject RxJS provides three specialized versions. - -The  _AsyncSubject_  emits only the last value after completion. - -``` -const subject = new AsyncSubject(); -const observable = subject.asObservable(); -``` - -``` -observable.subscribe(console.log); -``` - -``` -subject.next(1); -subject.next(2); -subject.complete(); -``` - -``` -// Prints -// 2 -``` - -The  _BehaviorSubject_  allows you to provide a (default) value that will be emitted to every subscriber if no other value has been emitted so far. Otherwise subscribers receive the last emitted value. - -``` -const subject = new BehaviorSubject(1); -const observable = subject.asObservable(); -``` - -``` -const subscription1 = observable.subscribe(console.log); -``` - -``` -subject.next(2); -subscription1.unsubscribe(); -``` - -``` -// Prints -// 1 -// 2 -``` - -``` -const subscription2 = observable.subscribe(console.log); -``` - -``` -// Prints -// 2 -``` - -The  _ReplaySubject_  stores all emitted values up to a certain number, time or infinitely. All new subscribers will then get all stored values. - -``` -const subject = new ReplaySubject(); -const observable = subject.asObservable(); -``` - -``` -subject.next(1); -``` - -``` -observable.subscribe(console.log); -``` - -``` -subject.next(2); -subject.complete(); -``` - -``` -// Prints -// 1 -// 2 -``` - -You can find more information on subjects in the [ReactiveX documentation][10](that also offers additional links). [Ben Lesh][11] offers some insights on subjects in [On The Subject Of Subjects][12], as does [Nicholas Jamieson][13] [in RxJS: Understanding Subjects][14]. - -#### Observable - -You can create an observable by simply using the the new operator. With the function you pass in you can control the stream. That function is called whenever someone subscribe and it receives an observer that you can use like a subject, i.e. call next, complete and error. - -Let’s revisit our list example: - -``` -const myList = new List(); -myList.add(1); -myList.add(3); -``` - -``` -new Observable(observer => { - for (let i = 0; i { - //stream it, baby! -``` - -``` - return () => { - //clean up - }; -}) -.subscribe(); -``` - -#### Subclass Observable - -Before the advent of lettable operators this was a way to implement custom operators. RxJS extends  _Observable_  internally. One example is  _Subject_ , another is the  _publish_  operator. It returns a  _ConnectableObservable_  that provides the additional method  _connect_ . - -#### Implement Subscribable - -Sometimes you already have an object that holds state and can emit values. You can turn it into an observable if you implement the Subscribable interface that consists of only a subscribe method. - -``` -interface Subscribable { subscribe(observerOrNext?: PartialObserver | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable} -``` - -* * * - -### Combine and Select Existing Streams - -Knowing how to create individual streams is not enough. Sometimes you are confronted with several streams but you only need one. Some of the functions are also available as operators, that’s why I won’t go too deep here. I can recommend an [article][15] from [Max NgWizard K][16] that even contains some fancy animations. - -One more recommendation: You can interactively play with combination operators on [RxMarbles][17] by dragging around elements. - -#### The ObservableInput type - -Operators and functions that expect a stream (or an array of streams) usually do not only work with observables. Instead they actually expect the argument to be of the type ObservableInput that is defined as follows: - -``` -type ObservableInput = SubscribableOrPromise | ArrayLike | Iterable; -``` - -That means you can e.g. pass promises or arrays without needing to convert them to observables first! - -#### defer - -The main purpose is to defer the creation of an observable to the time when someone wants to subscribe. This is useful if - -* the creation of the observable is computationally expensive - -* you want a new observable for each subscriber - -* you want to choose between different observables at subscription time - -* some code must not be executed before subscription - -The last point includes one not so obvious use case: Promises (defer can also return a promise). Take this example using the fetch API: - -``` -function getUser(id) { - console.log("fetching data"); - return fetch(`https://server/user/${id}`); -} -``` - -``` -const userPromise = getUser(1); -console.log("I don't want that request now"); -``` - -``` -//somewhere else -userPromise.then(response => console.log("done"); -``` - -``` -// Prints -// fetching data -// I don't want that request now -// done -``` - -Promises are executed immediately, whereas streams are executed when you subscribe. The very moment we call getUser, a request is sent even if we did not want that at that point. Sure, we can use from to convert a promise to an observable, but the promise we pass has already been created / executed. defer allows us to wait until subscription: - -``` -const user$ = defer(() => getUser(1)); -``` - -``` -console.log("I don't want that request now"); -``` - -``` -//somewhere else -user$.subscribe(response => console.log("done"); -``` - -``` -// Prints -// I don't want that request now -// fetching data -// done -``` - -#### iif - - _iif_  covers a special use case of  _defer_ : Deciding between two streams at subscription time: - -``` -iif( - () => new Date().getHours() < 12, - of("AM"), - of("PM") -) -.subscribe(); -``` - -``` -// Produces -// AM before noon, PM afterwards -``` - -To quote the documentation: - -> Actually `[iif][3]` can be easily implemented with `[defer][4]`and exists only for convenience and readability reasons. - -#### onErrorResumeNext - -Starts the first stream and if it fails continues with the next stream. The error is ignored. - -``` -const stream1$ = of(1, 2).pipe( - tap(i => { if(i>1) throw 'error'}) //fail after first element -); -``` - -``` -const stream2$ = of(3,4); -``` - -``` -onErrorResumeNext(stream1$, stream2$) - .subscribe(console.log); -``` - -``` -// Produces -// 1 3 4 | -``` - -This can be useful if you have more than one web service. In case the main one fails the backup service can be called automatically. - -#### forkJoin - -Lets streams run concurrently and emits their last values in an array when they are completed. Since only the last values of each streams are emitted it’s typically used for streams that only emit a single element, like HTTP requests. You want the requests run in parallel and do something when all have responses. - -``` -function handleResponses([user, account]) { - // do something -} -``` - -``` -forkJoin( - fetch("https://server/user/1"), - fetch("https://server/account/1") -) -.subscribe(handleResponses); -``` - -#### merge / concat - -Emits every value that is emitted by one of the source observables. - - _merge_  supports a parameter that let’s you define how many source streams are subscribed to concurrently. The default is unlimited. A value of 1 would mean listen to one source stream and when it’s completed subscribe to the next one. Since that is a very common scenario you RxJS provides an explicit function:  _concat_ . - -``` -merge( - interval(1000).pipe(mapTo("Stream 1"), take(2)), - interval(1200).pipe(mapTo("Stream 2"), take(2)), - timer(0, 1000).pipe(mapTo("Stream 3"), take(2)), - 2 //two concurrent streams -) -.subscribe(); -``` - -``` -// Subscribes to stream 1 and 2 only -``` - -``` -// prints -// Stream 1 -> after 1000ms -// Stream 2 -> after 1200ms -// Stream 1 -> after 2000ms -``` - -``` -// Stream 1 has completed, now subscribe to stream 3 -``` - -``` -// prints -// Stream 3 -> after 0 ms -// Stream 2 -> after 400 ms (2400ms from beginning) -// Stream 3 -> after 1000ms -``` - -``` - -merge( - interval(1000).pipe(mapTo("Stream 1"), take(2)), - interval(1200).pipe(mapTo("Stream 2"), take(2)) - 1 -) -// is equal to -concat( - interval(1000).pipe(mapTo("Stream 1"), take(2)), - interval(1200).pipe(mapTo("Stream 2"), take(2)) -) -``` - -``` -// prints -// Stream 1 -> after 1000ms -// Stream 1 -> after 2000ms -// Stream 2 -> after 3200ms -// Stream 2 -> after 4400ms -``` - -#### zip / combineLatest - -While  _merge_  and  _concat_  emit all values from the source streams individually, zip and combineLatest combine one value of each source stream and emit them together.  _zip_  combines the first values emitted by all(!) source streams, the second values and so on. This is useful if the contents of the streams are related. - -``` -zip( - interval(1000), - interval(1200), -) -.subscribe(); -``` - -``` -// Produces -// [0, 0] [1, 1] [2, 2] ... -``` - - _combineLatest_  is similar but combines the latest values emitted by the source streams. Nothing happens until all source streams have emitted at least one value. From then on every time a source stream emits a value, it is combined with the last values of the other streams. - -``` -combineLatest( - interval(1000), - interval(1200), -) -.subscribe(); -``` - -``` -// Produces -// [0, 0] [1, 0] [1, 1] [2, 1] ... -``` - -Both functions allow you to pass a selector function that can combine the elements to something else than an array: - -``` -zip( - interval(1000), - interval(1200), - (e1, e2) -> e1 + e2 -) -.subscribe(); -``` - -``` -// Produces -// 0 2 4 6 ... -``` - -#### race - -The first stream that emits a value is selected. So the resulting stream is essentially the fastest stream. - -``` -race( - interval(1000), - of("foo") -) -.subscribe(); -``` - -``` -// Produces -// foo | -``` - -Since  _of_  produces a value immediately it’s the faster stream and the stream that gets selected. - -* * * - -### Conclusion - -That have been a lot of ways to create observables. Knowing them is essential if you want to create reactive APIs or want to combine legacy APIs with reactive ones. - -I have presented you all options but much more could be said about all of them. If you want to dive deeper I highly recommend consulting the [documentation][20] or reading the suggested articles. - -Another interesting way to get insight is [RxViz][21]. You write RxJS code and the resulting streams are then displayed graphically and animated. - --------------------------------------------------------------------------------- - -via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a - -作者:[Oliver Flaggl][a] -译者:[译者ID](https://github.com/译者ID) -校对:[校对者ID](https://github.com/校对者ID) - -本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出 - -[a]:https://blog.angularindepth.com/@abetteroliver -[1]:https://rxjs-dev.firebaseapp.com/api/index/Subscribable -[2]:https://rxjs-dev.firebaseapp.com/api/index/Subscribable#subscribe -[3]:https://rxjs-dev.firebaseapp.com/api/index/iif -[4]:https://rxjs-dev.firebaseapp.com/api/index/defer -[5]:https://itnext.io/concurrency-and-asynchronous-behavior-with-rxjs-11b0c4b22597 -[6]:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols -[7]:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function* -[8]:https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875 -[9]:https://medium.com/@benlesh -[10]:http://reactivex.io/documentation/subject.html -[11]:https://medium.com/@benlesh -[12]:https://medium.com/@benlesh/on-the-subject-of-subjects-in-rxjs-2b08b7198b93 -[13]:https://medium.com/@cartant -[14]:https://blog.angularindepth.com/rxjs-understanding-subjects-5c585188c3e1 -[15]:https://blog.angularindepth.com/learn-to-combine-rxjs-sequences-with-super-intuitive-interactive-diagrams-20fce8e6511 -[16]:https://medium.com/@maximus.koretskyi -[17]:http://rxmarbles.com/#merge -[18]:https://rxjs-dev.firebaseapp.com/api/index/ObservableInput -[19]:https://rxjs-dev.firebaseapp.com/api/index/SubscribableOrPromise -[20]:http://reactivex.io/documentation/operators.html#creating -[21]:https://rxviz.com/ diff --git a/translated/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md b/translated/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md new file mode 100644 index 0000000000..66b0e920e9 --- /dev/null +++ b/translated/tech/20170709 The Extensive Guide to Creating Streams in RxJS.md @@ -0,0 +1,1029 @@ +在 RxJS 中创建流的延伸教程 +============================================================ + +![](https://cdn-images-1.medium.com/max/900/1*hj8mGnl5tM_lAlx5_vqS-Q.jpeg) + +对大多数开发者来说,RxJS 是以库的形式与之接触,就像 Angular。一些函数会返回流,要使用它们就得把注意力放在操作符上。 + +有些时候,混用响应式和非响应式代码似乎很有用。然后大家就开始热衷流的创造。不论是在编写异步代码或者是数据处理时,流都是一个不错的方案。 + +RxJS 提供很多方式来创建流。不管你遇到的是什么情况,都会有一个完美的创建流的方式。你可能根本用不上它们,但了解它们可以节省你的时间,让你少码一些代码。 + +我把所有可能的方法,按它们的主要目的,分放在四个目录中: + +* 流式化现有数据 + +* 生成数据 + +* 使用现有 APIs 进行交互 + +* 选择现有的流,并结合起来 + +注意:示例用的是 RxJS 6,可能会以前的版本有所不同。已知的区别是你导入函数的方式不同了。 + +RxJS 6 + +``` +import {of, from} from 'rxjs'; +``` + +``` +of(...); +from(...); +``` + +RxJS < 6 + +``` +import { Observable } from 'rxjs/Observable'; +import 'rxjs/add/observable/of'; +import 'rxjs/add/observable/from'; +``` + +``` +Observable.of(...); +Observable.from(...); +``` + +``` +//or +``` + +``` +import { of } from 'rxjs/observable/of'; +import { from } from 'rxjs/observable/from'; +``` + +``` +of(...); +from(...); +``` + +流的图示中的标记: + +* | 表示流结束了 + +* X 表示流出现错误并被终结 + +* … 表示流的走向不定 + +* * * + +### 流式化已有数据 + +你有一些数据,想把它们放到流中。有三种方式,并且都允许你把调度器当作最后一个参数传入(你如果想深入了解调度器,可以看看我的 [上一篇文章][5])。这些生成的流都是静态的。 + +#### of + +如果只有一个或者一些不同的元素,使用 _of_ : + +``` +of(1,2,3) + .subscribe(); +``` + +``` +// 结果 +// 1 2 3 | +``` + +#### from + +如果有一个数组或者 _可迭代的_ 对象,而且你想要其中的所有元素发送到流中,使用 _from_。你也可以用它来把一个 promise 对象变成可观测的。 + +``` +const foo = [1,2,3]; +``` + +``` +from(foo) + .subscribe(); +``` + +``` +// 结果 +// 1 2 3 | +``` + +#### pairs + +流式化一个对象的键/值对。用这个对象表示字典时特别有用。 + +``` +const foo = { a: 1, b: 2}; +``` + +``` +pairs(foo) + .subscribe(); +``` + +``` +// 结果 +// [a,1] [b,2] | +``` + +#### 那么其他的数据结构呢? + +也许你的数据存储在自定义的结构中,而它又没有实现 _Iterable_ 接口,又或者说你的结构是递归的,树状的。也许下面某种选择适合这些情况: + +* 先将数据提取到数组里 + +* 使用下一节将会讲到的 _generate_ 函数,遍历所有数据 + +* 创建一个自定义流(见下一节) + +* 创建一个迭代器 + +稍后会讲到选项 2 和 3 ,因此这里的重点是创建一个迭代器。我们可以对一个 _iterable_ 对象调用 _from_ 创建一个流。 _iterable_ 是一个对象,可以产生一个迭代器(如果你对细节感兴趣,参考 [这篇 mdn 文章][6])。 + +创建一个迭代器的简单方式是 [generator function][7]。当你调用一个生成函数(generator function)时,它返回一个对象,该对象同时遵循 _iterable_ 接口和 _iterator_ 接口。 + +``` +// 自定义的数据结构 +class List { + add(element) ... + get(index) ... + get size() ... + ... +} +``` + +``` +function* listIterator(list) { + for (let i = 0; i console.log("foo"); +// 5 秒后打印 foo +``` + +大多数定时器将会用来周期性的处理数据: + +``` +interval(10000).pipe( + flatMap(i => fetch("https://server/stockTicker") +).subscribe(updateChart) +``` + +这段代码每 10 秒获取一次数据,更新屏幕。 + +#### generate + +这是个更加复杂的函数,允许你发送一系列任意类型的对象。它有一些重载,这里你看到的是最有意思的部分: + +``` +generate( + 0, // 从这个值开始 + x => x < 10, // 条件:只要值小于 10,就一直发送 + x => x*2 // 迭代:前一个值加倍 +).subscribe(); +``` + +``` +// 结果 +// 1 2 4 8 | +``` + +你也可以用它来迭代值,如果一个结构没有实现 _Iterable_ 接口。我们用前面的 list 例子来进行演示: + +``` +const myList = new List(); +myList.add(1); +myList.add(3); +``` + +``` +generate( + 0, // 从这个值开始 + i => i < list.size, // 条件:发送数据,直到遍历完整个列表 + i => ++i, // 迭代:获取下一个索引 + i => list.get(i) // 选择器:从列表中取值 +).subscribe(); +``` + +``` +// 结果 +// 1 3 | +``` + +如你所见,我添加了另一个参数:选择器(selector)。它和 _map_ 操作符作用类似,将生成的值转换为更有用的东西。 + +* * * + +### 空的流 + +有时候你要传递或返回一个不用发送任何数据的流。有三个函数分别用于不同的情况。你可以给这三个函数传递调度器。_empty_ 和 _throwError_ 接收一个调度器参数。 + +#### empty + +创建一个空的流,一个值也不发送。 + +``` +empty() + .subscribe(); +``` + +``` +// 结果 +// | +``` + +#### never + +创建一个永远不会结束的流,仍然不发送值。 + +``` +never() + .subscribe(); +``` + +``` +// 结果 +// ... +``` + +#### throwError + +创建一个流,流出现错误,不发送数据。 + +``` +throwError('error') + .subscribe(); +``` + +``` +// 结果 +// X +``` + +* * * + +### 挂钩已有的 API + +不是所有的库和所有你之前写的代码使用或者支持流。幸运的是 RxJS 提供函数用来桥接非响应式和响应式代码。这一节仅仅讨论 RxJS 为桥接代码提供的模版。 + +你可能还对这篇出自 [Ben Lesh][9] 的 [延伸阅读][8] 感兴趣,这篇文章讲了几乎所有能与 promises 交互操作的方式。 + +#### from + +我们已经用过它,把它列在这里是因为,它可以封装一个含有 observable 对象的 promise 对象。 + +``` +from(new Promise(resolve => resolve(1))) + .subscribe(); +``` + +``` +// 结果 +// 1 | +``` + +#### fromEvent + +fromEvent 为 DOM 元素添加一个事件监听器,我确定你知道这个。但你可能不知道的是,也可以通过其它类型来添加事件监听器,例如,一个 jQuery 对象。 + +``` +const element = $('#fooButton'); // 从 DOM 元素中创建一个 jQuery 对象 +``` + +``` +from(element, 'click') + .subscribe(); +``` + +``` +// 结果 +// clickEvent ... +``` + +#### fromEventPattern + +要理解为什么有 fromEvent 了还需要 fromEventPattern,我们得先理解 fromEvent 是如何工作的。看这段代码: + +``` +from(document, 'click') + .subscribe(); +``` + +这告诉 RxJS 我们想要监听 document 中的点击事件。在提交过程中,RxJS 发现 document 是一个 _EventTarget_ 类型,因此它可以调用它的 _addEventListener_ 方法。如果我们传入的是一个 jQuery 对象而非 document,那么 RxJs 知道它得调用 _on_ 方法。 + +这个例子用的是 _fromEventPattern_,和 _fromEvent_ 的工作基本上一样: + +``` +function addClickHandler(handler) { + document.addEventListener('click', handler); +} +``` + +``` +function removeClickHandler(handler) { + document.removeEventListener('click', handler); +} +``` + +``` +fromEventPattern( + addClickHandler, + removeClickHandler, +) +.subscribe(console.log); +``` + +``` +// 等效于 +fromEvent(document, 'click') +``` + +RxJS 自动创建实际的监听器( _handler_ )你的工作是添加或者移除监听器。_fromEventPattern_ 的目的基本上是告诉 RxJS 如何注册和移除事件监听器。 + +现在想象一下你使用了一个库,你可以调用一个叫做 _registerListener_ 的方法。我们不能再用 _fromEvent_,因为它并不知道该怎么处理这个对象。 + +``` +const listeners = []; +``` + +``` +class Foo { + registerListener(listener) { + listeners.push(listener); + } +``` + +``` + emit(value) { + listeners.forEach(listener => listener(value)); + } +} +``` + +``` +const foo = new Foo(); +``` + +``` +fromEventPattern(listener => foo.registerListener(listener)) + .subscribe(); +``` + +``` +foo.emit(1); +``` + +``` +// Produces +// 1 ... +``` + +当我们调用 foo.emit(1) 时,RxJS 中的监听器将被调用,然后它就能把值发送到流中。 + +你也可以用它来监听多个事件类型,或者结合所有可以通过回调进行通讯的 API,例如,WebWorker API: + +``` +const myWorker = new Worker('worker.js'); +``` + +``` +fromEventPattern( + handler => { myWorker.onmessage = handler }, + handler => { myWorker.onmessage = undefined } +) +.subscribe(); +``` + +``` +// 结果 +// workerMessage ... +``` + +#### bindCallback + +它和 fromEventPattern 相似,但它能用于单个值。就在回调函数被调用时,流就结束了。用法当然也不一样 —— 你可以用 bindCallBack 封装函数,然后它就会在调用时魔术般的返回一个流: + +``` +function foo(value, callback) { + callback(value); +} +``` + +``` +// 没有流 +foo(1, console.log); //prints 1 in the console +``` + +``` +// 有流 +const reactiveFoo = bindCallback(foo); +// 当我们调用 reactiveFoo 时,它返回一个 observable 对象 +``` + +``` +reactiveFoo(1) + .subscribe(console.log); // 在控制台打印 1 +``` + +``` +// 结果 +// 1 | +``` + +#### websocket + +是的,你完全可以创建一个 websocket 连接然后把它暴露给流: + +``` +import { webSocket } from 'rxjs/webSocket'; +``` + +``` +let socket$ = webSocket('ws://localhost:8081'); +``` + +``` +// 接收消息 +socket$.subscribe( + (msg) => console.log('message received: ' + msg), + (err) => console.log(err), + () => console.log('complete') * ); +``` + +``` +// 发送消息 +socket$.next(JSON.stringify({ op: 'hello' })); +``` + +把 websocket 功能添加到你的应用中真的很简单。_websocket_ 创建一个 subject。这意味着你可以订阅它,通过调用 _next_ 来获得消息和发送消息。 + +#### ajax + +如你所知:类似于 websocket,提供 AJAX 查询的功能。你可能用了一个带有 AJAX 功能的库或者框架。或者你没有用,那么我建议使用 fetch(或者必要的话用 polyfill),把返回的 promise 封装到一个 observable 对象中(参考稍后会讲到的 _defer_ 函数)。 + +* * * + +### Custom Streams + +有时候已有的函数用起来并不是足够灵活。或者你需要对订阅有更强的控制。 + +#### Subject + +subject 是一个特殊的对象,它使得你的能够把数据发送到流中,并且能够控制数据。subject 本身就是一个 observable 对象,但如果你想要把流暴露给其它代码,建议你使用 _asObservable_ 方法。这样你就不能意外调用原始方法。 + +``` +const subject = new Subject(); +const observable = subject.asObservable(); +``` + +``` +observable.subscribe(); +``` + +``` +subject.next(1); +subject.next(2); +subject.complete(); +``` + +``` +// 结果 +// 1 2 | +``` + +注意在订阅前发送的值将会“丢失”: + +``` +const subject = new Subject(); +const observable = subject.asObservable(); +``` + +``` +subject.next(1); +``` + +``` +observable.subscribe(console.log); +``` + +``` +subject.next(2); +subject.complete(); +``` + +``` +// 结果 +// 2 +``` + +除了常规的 subject,RxJS 还提供了三种特殊的版本。 + +_AsyncSubject_ 在结束后只发送最后的一个值。 + +``` +const subject = new AsyncSubject(); +const observable = subject.asObservable(); +``` + +``` +observable.subscribe(console.log); +``` + +``` +subject.next(1); +subject.next(2); +subject.complete(); +``` + +``` +// 输出 +// 2 +``` + +_BehaviorSubject_ 使得你能够提供一个(默认的)值,如果当前没有其它值发送的话,这个值会被发送给每个订阅者。否则订阅者收到最后一个发送的值。 + +``` +const subject = new BehaviorSubject(1); +const observable = subject.asObservable(); +``` + +``` +const subscription1 = observable.subscribe(console.log); +``` + +``` +subject.next(2); +subscription1.unsubscribe(); +``` + +``` +// 输出 +// 1 +// 2 +``` + +``` +const subscription2 = observable.subscribe(console.log); +``` + +``` +// 输出 +// 2 +``` + +The _ReplaySubject_ 存储一定数量、或一定时间或所有的发送过的值。所有新的订阅者将会获得所有存储了的值。 + +``` +const subject = new ReplaySubject(); +const observable = subject.asObservable(); +``` + +``` +subject.next(1); +``` + +``` +observable.subscribe(console.log); +``` + +``` +subject.next(2); +subject.complete(); +``` + +``` +// 输出 +// 1 +// 2 +``` + +你可以在 [ReactiveX documentation][10](它提供了一些其它的连接) 里面找到更多关于 subjects 的信息。[Ben Lesh][11] 在 [On The Subject Of Subjects][12] 上面提供了一些关于 subjects 的理解,[Nicholas Jamieson][13] 在 [in RxJS: Understanding Subjects][14] 上也提供了一些理解。 + +#### Observable + +你可以简单地用 new 操作符创建一个 observable 对象。通过你传入的函数,你可以控制流,只要有人订阅了或者它接收到一个可以当成 subject 使用的 observer,这个函数就会被调用,比如,调用 next,complet 和 error。 + +让我们回顾一下列表示例: + +``` +const myList = new List(); +myList.add(1); +myList.add(3); +``` + +``` +new Observable(observer => { + for (let i = 0; i { + // 流式化 +``` + +``` + return () => { + //clean up + }; +}) +.subscribe(); +``` + +#### 继承 Observable + +在有可用的操作符前,这是一种实现自定义操作符的方式。RxJS 在内部扩展了 _Observable_。_Subject_ 就是一个例子,另一个是 _publisher_ 操作符。它返回一个 _ConnectableObservable_ 对象,该对象提供额外的方法 _connect_。 + +#### 实现 Subscribable 接口 + +有时候你已经用一个对象来保存状态,并且能够发送值。如果你实现了 Subscribable 接口,你可以把它转换成一个 observable 对象。Subscribable 接口中只有一个 subscribe 方法。 + +``` +interface Subscribable { subscribe(observerOrNext?: PartialObserver | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable} +``` + +* * * + +### 结合和选择现有的流 + +知道怎么创建一个独立的流还不够。有时候你有好几个流但其实只需要一个。有些函数也可作为操作符,所以我不打算在这里深入展开。推荐看看 [Max NgWizard K][16] 所写的一篇 [文章][15],它还包含一些有趣的动画。 + +还有一个建议:你可以通过拖拽元素的方式交互式的使用结合操作,参考 [RxMarbles][17]。 + +#### ObservableInput 类型 + +期望接收流的操作符和函数通常不单独和 observables 一起工作。相反,他们实际上期望的参数类型是 ObservableInput,定义如下: + +``` +type ObservableInput = SubscribableOrPromise | ArrayLike | Iterable; +``` + +这意味着你可以传递一个 promises 或者数组却不需要事先把他们转换成 observables。 + +#### defer + +主要的目的是把一个 observable 对象的创建延迟(defer)到有人想要订阅的时间。在以下情况,这很有用: + +* 创建 observable 对象的开销较大 + +* 你想要给每个订阅者新的 observable 对象 + +* 你想要在订阅时候选择不同的 observable 对象 + +* 有些代码必须在订阅之后执行 + +最后一点包含了一个并不起眼的用例:Promises(defer 也可以返回一个 promise 对象)。看看这个用到了 fetch API 的例子: + +``` +function getUser(id) { + console.log("fetching data"); + return fetch(`https://server/user/${id}`); +} +``` + +``` +const userPromise = getUser(1); +console.log("I don't want that request now"); +``` + +``` +// 其它地方 +userPromise.then(response => console.log("done"); +``` + +``` +// 输出 +// fetching data +// I don't want that request now +// done +``` + +只要流在你订阅的时候执行了,promise 就会立即执行。我们调用 getUser 的瞬间,就发送了一个请求,哪怕我们这个时候不想发送请求。当然,我们可以使用 from 来把一个 promise 对象转换成 observable 对象,但我们传递的 promise 对象已经创建或执行了。defer 让我们能够等到订阅才发送这个请求: + +``` +const user$ = defer(() => getUser(1)); +``` + +``` +console.log("I don't want that request now"); +``` + +``` +// 其它地方 +user$.subscribe(response => console.log("done"); +``` + +``` +// 输出 +// I don't want that request now +// fetching data +// done +``` + +#### iif + + _iif 包含了一个关于 _defer_ 的特殊用例:在订阅时选择两个流中的一个: + +``` +iif( + () => new Date().getHours() < 12, + of("AM"), + of("PM") +) +.subscribe(); +``` + +``` +// 结果 +// AM before noon, PM afterwards +``` + +引用了文档: + +> 实际上 `[iif][3]` 能够轻松地用 `[defer][4]` 实现,它仅仅是出于方便和可读性的目的。 + +#### onErrorResumeNext + +开启第一个流并且在失败的时候继续进行下一个流。错误被忽略掉。 + +``` +const stream1$ = of(1, 2).pipe( + tap(i => { if(i>1) throw 'error'}) //fail after first element +); +``` + +``` +const stream2$ = of(3,4); +``` + +``` +onErrorResumeNext(stream1$, stream2$) + .subscribe(console.log); +``` + +``` +// 结果 +// 1 3 4 | +``` + +如果你有多个 web 服务,这就很有用了。万一主服务器开启失败,那么备份的服务就能自动调用。 + +#### forkJoin + +它让流并行运行,当流结束时发送存在数组中的最后的值。由于每个流只有最后一个值被发送,它一般用在只发送一个元素的流的情况,就像 HTTP 请求。你让请求并行运行,在所有流收到响应时执行某些任务。 + +``` +function handleResponses([user, account]) { + // 执行某些任务 +} +``` + +``` +forkJoin( + fetch("https://server/user/1"), + fetch("https://server/account/1") +) +.subscribe(handleResponses); +``` + +#### merge / concat + +发送每一个从源 observables 对象中发出的值。 + + _merge_  接收一个参数,让你定义有多少流能被同时订阅。默认是无限制的。设为 1 就意味着监听一个源流,在它结束的时候订阅下一个。由于这是一个常见的场景,RxJS 为你提供了一个显示的函数:_concat_。 + +``` +merge( + interval(1000).pipe(mapTo("Stream 1"), take(2)), + interval(1200).pipe(mapTo("Stream 2"), take(2)), + timer(0, 1000).pipe(mapTo("Stream 3"), take(2)), + 2 //two concurrent streams +) +.subscribe(); +``` + +``` +// 只订阅流 1 和流 2 +``` + +``` +// 输出 +// Stream 1 -> after 1000ms +// Stream 2 -> after 1200ms +// Stream 1 -> after 2000ms +``` + +``` +// 流 1 结束后,开始订阅流 3 +``` + +``` +// 输出 +// Stream 3 -> after 0 ms +// Stream 2 -> after 400 ms (2400ms from beginning) +// Stream 3 -> after 1000ms +``` + +``` + +merge( + interval(1000).pipe(mapTo("Stream 1"), take(2)), + interval(1200).pipe(mapTo("Stream 2"), take(2)) + 1 +) +// 等效于 +concat( + interval(1000).pipe(mapTo("Stream 1"), take(2)), + interval(1200).pipe(mapTo("Stream 2"), take(2)) +) +``` + +``` +// 输出 +// Stream 1 -> after 1000ms +// Stream 1 -> after 2000ms +// Stream 2 -> after 3200ms +// Stream 2 -> after 4400ms +``` + +#### zip / combineLatest + + _merge_ 和 _concat_ 一个接一个的发送所有从源流中读到的值,而 zip 和 combineLatest 是把每个流中的一个值结合起来一起发送。_zip_ 结合所有源流中发送的第一个值。如果流的内容相关联,那么这就很有用。 + +``` +zip( + interval(1000), + interval(1200), +) +.subscribe(); +``` + +``` +// 结果 +// [0, 0] [1, 1] [2, 2] ... +``` + +_combineLatest_ 与之类似,但结合的是源流中发送的最后一个值。直到所有源流至少发送一个值之后才会触发事件。这之后每次源流发送一个值,它都会把这个值与其他流发送的最后一个值结合起来。 + +``` +combineLatest( + interval(1000), + interval(1200), +) +.subscribe(); +``` + +``` +// 结果 +// [0, 0] [1, 0] [1, 1] [2, 1] ... +``` + +两个函数都让允许传递一个选择器函数,把元素结合成其它对象而不是数组: + +``` +zip( + interval(1000), + interval(1200), + (e1, e2) -> e1 + e2 +) +.subscribe(); +``` + +``` +// 结果 +// 0 2 4 6 ... +``` + +#### race + +选择第一个发送数据的流。产生的流基本是最快的。 + +``` +race( + interval(1000), + of("foo") +) +.subscribe(); +``` + +``` +// 结果 +// foo | +``` + +由于 _of_ 立即产生一个值,因此它是最快的流,然而这个流就被选中了。 + +* * * + +### 总结 + +已经有很多创建 observables 对象的方式了。如果你想要创造响应式的 APIs 或者想用响应式的 API 结合传统 APIs,那么了解这些方法很重要。 + +我已经向你展示了所有可用的方法,但它们其实还有很多内容可以讲。如果你想更加深入地了解,我极力推荐你查阅 [documentation][20] 或者阅读相关文章。 + +[RxViz][21] 是另一种值得了解的有意思的方式。你编写 RxJS 代码,产生的流可以用图形或动画进行显示。 + +-------------------------------------------------------------------------------- + +via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a + +作者:[Oliver Flaggl][a] +译者:[BriFuture](https://github.com/BriFuture) +校对:[校对者ID](https://github.com/校对者ID) + +本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出 + +[a]:https://blog.angularindepth.com/@abetteroliver +[1]:https://rxjs-dev.firebaseapp.com/api/index/Subscribable +[2]:https://rxjs-dev.firebaseapp.com/api/index/Subscribable#subscribe +[3]:https://rxjs-dev.firebaseapp.com/api/index/iif +[4]:https://rxjs-dev.firebaseapp.com/api/index/defer +[5]:https://itnext.io/concurrency-and-asynchronous-behavior-with-rxjs-11b0c4b22597 +[6]:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols +[7]:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function* +[8]:https://medium.com/@benlesh/rxjs-observable-interop-with-promises-and-async-await-bebb05306875 +[9]:https://medium.com/@benlesh +[10]:http://reactivex.io/documentation/subject.html +[11]:https://medium.com/@benlesh +[12]:https://medium.com/@benlesh/on-the-subject-of-subjects-in-rxjs-2b08b7198b93 +[13]:https://medium.com/@cartant +[14]:https://blog.angularindepth.com/rxjs-understanding-subjects-5c585188c3e1 +[15]:https://blog.angularindepth.com/learn-to-combine-rxjs-sequences-with-super-intuitive-interactive-diagrams-20fce8e6511 +[16]:https://medium.com/@maximus.koretskyi +[17]:http://rxmarbles.com/#merge +[18]:https://rxjs-dev.firebaseapp.com/api/index/ObservableInput +[19]:https://rxjs-dev.firebaseapp.com/api/index/SubscribableOrPromise +[20]:http://reactivex.io/documentation/operators.html#creating +[21]:https://rxviz.com/