学习rxjs,解决异步和事件组合的问题

2019-03-20 by 杜宏伟

解决 js 异步事件有很多办法,但处理方式都是 pull 方式,可不可以用 push 方式呢?

rxjs 简介

Rxjs 是用来组织异步调用的一个library。和promise只能resolve一个值不同的是, 可以把Rxjs看成是 专门处理 eventslodash Rxjs可以不断产生值,可以是同步的,也可以是异步的。

环境准备

可以直接把代码下载,然后在浏览器中引用即可,上面的没压缩,下面的是压缩的

umd代码在全局暴露了 rxjs,可以象下面这样引用

  const { empty, Observable, timer, combineLatest } = rxjs;
  const { map, concat } = rxjs.operators

原理

数据生产者和消费者。生产者和消费者之前的关系可以用两种,pull,push

消费者决定什么时候要数据,生产者不关心什么时候提供数据,就是pull方式 生产者决定什么时候提供数据,消费者被动接收,就是push方式

每个javascript函数都是一个pull系统,函数是数据生产者,每次调用都会产生一条数据,默认会返回undefined.

rxjs可以看成是push的方式

核心思想

具体概念和详细说明可以去官网看,我这里用实例说明用法和使用场景,加上我自己的理解。

第一个例子是官网的例子,因为这个例子太重要了,可以说是Rxjs的核心思想所在

import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
});

foo.subscribe(x => {
  console.log(x);
});
foo.subscribe(y => {
  console.log(y);
});

subscribe完整写法

observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});

subscribe简写,只有一个函数的时候就是next函数

observable.subscribe(x => {
  console.log(x);
});

complete,error,next是生产者和订阅者(消费)之间的约定,因为是举例,我们只关注next产生下一条数据。

output

"Hello"
42
"Hello"
42

生产者产生数据是惰性的。有订阅的时候,才会执行生产数据。每次定阅都是独立的,会为每次订阅从头完整执行生产数据的过程。

Observables是同步的。

console.log('before');
console.log(foo.call());
console.log('after');

output

"before"
"Hello"
42
"after"

订阅的时候也是一样的

console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

output

"before"
"Hello"
42
"after"

最重要的特色

普通的javascript函数一次调用只能返回一个值 ,但是Rxjs可以返回多个,可以返回同步值,也可以返回异步值,象一个数据生产器,实际上,本来就是数据生产者。

import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

输出

"before"
"Hello"
42
100
200
"after"
300

多播

默认情况下,Rxjs每次生产只一数据,只提供给一个订阅者。因为每次订阅都会触发一次完整的数据生产过程。但是有些时候,我们需要象adeventListner那样生产一次数据,可以把一份数据分发给多个订阅者。有四个操作可以让observerbles有多播能力 publish,multicast,share,shareReplay 其中,share,shareReplay更象是语法糖,我个人认为,掌握publish,multicast就足够了。

从原理上来说,多播是告靠subject实现的。subject设计的初衷也是为了实现多播。

publish

代码引用自 https://www.learnrxjs.io/operators/multicasting/publish.html

// RxJS v6+
import { interval } from 'rxjs';
import { publish, tap } from 'rxjs/operators';

//emit value every 1 second
const source = interval(1000);
const example = source.pipe(
  //side effects will be executed once
  tap(_ => console.log('Do Something!')),
  //do nothing until connect() is called
  publish()
);

/*
  source will not emit values until connect() is called
  output: (after 5s)
  "Do Something!"
  "Subscriber One: 0"
  "Subscriber Two: 0"
  "Do Something!"
  "Subscriber One: 1"
  "Subscriber Two: 1"
*/
const subscribe = example.subscribe(val =>
  console.log(`Subscriber One: ${val}`)
);
const subscribeTwo = example.subscribe(val =>
  console.log(`Subscriber Two: ${val}`)
);

//call connect after 5 seconds, causing source to begin emitting items
setTimeout(() => {
  example.connect();
}, 5000);

这个例子展示了publish的作用和用法。publish是让observable有多播能力的最简单的操作。

multicast

英文讲解 https://www.learnrxjs.io/operators/multicasting/multicast.html

代码 就不贴了,和publish差不多,不同的是 multicast 更强大,可以用不同的subject,比如当用ReplaySubject作中间人的时候,就有了replay data的能力

其它的部分都是辅助部分,如果说上面讲的是骨干,这些就都是树叶了,也比较好理解,可以直接看英文学习的网站和官网。

refCount

refCount是为了省掉手动执行connection,但是我觉得,最好还是手动执行。因为refCount 是在第一次订阅时自动进行connection,这样可能导致后面的订阅接不到某些数据。需要注意一下。

    let mul = new Observable(subscriber => {
      console.log('a')
      subscriber.next(1);
      subscriber.next(2);

    }).pipe(multicast(new Subject()), refCount())


    mul.subscribe(d => console.log('a',d))
    mul.subscribe(d => console.log('b',d))

output

a
a 1
a 2

可以看到只有第一个订阅拿 到了数据,因为第一个订阅之后,数据生产者就complete

我们把代码修改一下

let mul = new Observable(subscriber => {
      console.log('a')
      subscriber.next(1);
      window.setTimeout(() => {
        subscriber.next(2);
      }, 1000)


    }).pipe(multicast(new Subject()), refCount())


    mul.subscribe(d => console.log(d))
    mul.subscribe(d => console.log(d))

output

a
a 1
b 1
a 2
b 2

这回都拿到数据,因为第二个订阅是第一个订阅connect后数据生产者complete前加入的

所以总结来说,除非必要,还是手动执行connect为好,这样代码好理解。

参考

rxjs学习有中文版的,不过翻译的太差,象是机器翻译,建议看原版

我叫杜宏伟,前端开发。

一直想写博客,在2018的年的最后几天,终于上线了。

对于前端开发,一个特点就是太零散,很容易会了后面忘了前面,所以归纳总结很重要。再有就是分享,做前端好多年,以前都是看你们写的文章, 现在我也开始写一些,希望可以帮到入行的小伙伴。微信号 duhongwei5775

欢迎转载,只需注明作者,出处即可

版权说明:署名 4.0 国际(CC BY 4.0)