聊聊 RXJS

发布时间 2023-10-17 08:54:52作者: 长安城下翩翩少年

一 什么是rxjs?
RxJS(Reactive Extensions for JavaScript)是一个用于响应式编程的 JavaScript 库。它通过使用可观察对象(Observables)和操作符(Operators)来处理异步和事件驱动的代码。
什么是响应式编程? 程序的输入可以被当成一个数据流,例如 excel表格的累加。
响应式编程世界里知名度最高的框架 - Reactive Extension , 简称 RX,指实践响应式编程的一套工具。在Rx官网有这样一段文字
An API for asynchronous programming with observable streams.

[图片]
Rx的概念最初由微软公司实现并开源,也就是Rx.NET,因为Rx带来的编程方式大大改进了异步编程模型,在.NET之后,众多开发者在其他平台和语言上也实现了Rx的类库。可见,Rx其实是一个大家族,在这个大家族中,还有用Java实现的RxJava,用C++实现的RxCpp,用Ruby实现的Rx.rb,用Python实现的RxPy,当然,还有这个大家族中最年长的Rx.NET。在本书中,我们介绍的是RxJS,也就是Rx的JavaScript语言实现。
关于设计模式:
任何一种模式,指的都是解决某一个特定类型问题的套路和方法。现实世界的问题复杂多变,往往不是靠单独一种模式能够解决的,更需要的是多种模式的组合,RxJS的Observable就是观察者模式和迭代器模式的组合。
观察者模式 要解决的问题,就是在一个持续产生事件的系统中,如何分割功能,让不同模块只需要处理一部分逻辑,这种分而治之的思想是基本的系统设计概念,当然,“分”很容易,关键是如何“治”。观察者模式对“治”这个问题提的解决方法是这样,将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产生事件,它会通知所有注册挂上号的观察者,而不关心这些观察者如何处理这些事件,相对的,观察者可以被注册上某个发布者,只管接收到事件之后就处理,而不关心这些数据是如何产生的。
迭代器模式(Iterator,也称为“迭代器”)指的是能够遍历一个数据集合的对象,因为数据集合的实现方式很多,可以是一个数组,也可以是一个树形结构,也可以是一个单向链表……迭代器的作用就是提供一个通用的接口,让使用者完全不用关心这个数据集合的具体实现方式。迭代器另一个容易理解的名字叫游标(cursor),就像是一个移动的指针一样,从集合中一个元素移到另一个元素,完成对整个集合的遍历。

  1. Rxjs 特点
  2. 使用数据流对现实问题进行抽象
  3. 擅长处理异步操作
  4. 把复杂问题抽象成简单问题的组合
  5. Rxjs 的基本概念
  6. 数据源 - 可观察对象 Observable
    import { Observable } from 'rxjs';
    //代表“流”的变量标示符,都是用$符号结尾,这是RxJS编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)
    const source$ = new Observable((observer)=>{
    observer.next(1);
    observer.next(2);
    observer.complete();
    // 异常处理
    observer.error('error');
    });
  7. 操作符
    通过 pipe - 管道,提供的各种操作符来实现各种数据流的转化和操作。
    包括:创建型/转化型/过滤型/合并型/错误处理型
    import { ajax } from 'rxjs/ajax';
    import { retry } from 'rxjs/operators';

ajax('https://example.com/api/data').pipe(
// 最多重新订阅 3 次,如果仍然失败,则输出错误信息
retry(3)
).subscribe(
response => console.log(response),
error => console.log('Request failed after multiple attempts:', error)
);

  1. 观察者 - 订阅可观察对象
    import { Subject } from 'rxjs';

const source$ = new Subject();
// 订阅
const sub = source$.subscribe({
next:(data)=>{}
complete:()=>{}
error:(err)=>{}
});
// 取消订阅
sub.unsubscribe();
3. Rxjs 中的状态

  1. 未开始:Observable 尚未开始发出任何值。这是 Observable 的初始状态。
  2. 进行中:Observable 正在发出值,可以是零个或多个。在这种状态下,观察者可以订阅并处理这些发出的值。
  3. 完成:Observable 成功地发出了所有值,并已经终止。在这种状态下,观察者可以做相应的清理工作。
  4. 错误:Observable 在发出值的过程中遇到错误,终止并发送错误通知。观察者可以处理这个错误通知。
    注意:rxjs 只有一种终结状态,要么完成,要么出错。当 Observable 处于已经完成或错误状态时,是不再允许通过 next 方法发送新的数据的。一旦 Observable 进入完成或错误状态,它就不会再发出任何新的值。如果需要在完成或错误状态后再次发送新的数据,可以使用其他操作符或技术来处理,例如创建一个新的 Observable,或者使用错误处理操作符捕获并转换错误。
  5. RxJS 与传统 JavaScript 异步编程之间的对比:
  6. 响应式编程:RxJS 提供了一种响应式编程的范式,其中数据流被建模为可观察对象,能够在时间上发出值。你可以订阅这些可观察对象,以便在内部值发生变化时执行相应的操作。 传统 JavaScript 异步编程: 传统的 JavaScript 异步编程使用回调函数,事件处理程序或 Promise 来处理异步操作。
  7. 异步控制:RxJS 提供了一套强大的操作符,用于控制异步代码的执行顺序和并发性。你可以使用操作符如 mergeMap、switchMap 和 concatMap 来处理各种异步操作,并管理它们的执行顺序和结果。 传统 JavaScript 异步编程: 在传统的 JavaScript 异步编程中,你可能会使用嵌套的回调函数或 Promise 链来控制异步操作的顺序。
  8. 组合和转换操作:RxJS 提供了许多操作符,用于组合和转换可观察对象。例如,map 操作符可以将一个可观察对象的每个值映射到一个新值,filter 操作符可以过滤出符合特定条件的值。这些操作符使得处理数据流变得更加方便和灵活。 传统 JavaScript 异步编程: 在传统的 JavaScript 异步编程中,操作数据流通常需要手动编写循环和条件语句。
  9. 错误处理:RxJS 提供了丰富的错误处理机制,例如通过 catchError 操作符捕获错误并返回备用值,或者使用 retry 操作符重新订阅可观察对象以重试操作。 传统 JavaScript 异步编程: 在传统的 JavaScript 异步编程中,错误处理通常使用 try-catch 块或者 Promise 的 catch 方法进行处理。
    总的来说,RxJS 提供了一个强大而灵活的工具集,用于处理异步和事件驱动的代码。它可以帮助开发者将复杂的异步操作转化为简洁、可读性强的代码,并能有效地处理错误、控制执行顺序和进行数据转换。
  10. Rxjs 与 Promise 对比 : 加强版的Promise
    Promise 的写法
    // Promise 的三种状态,pending, fulfilled, rejected
    // Promise 定义出来 状态变为pending
    const p = new Promise((resolve,reject)=>{
    resolve(); // 状态变为fulfilled
    reject(); // 状态变为 rejected
    });
    p.then(()=>{}).catch().finally();

// 对比Observable
const s$ = new Observable((observer)=>{
// 对比promise ,observable 可以发送多个值,而Promise 只能发送一个值就会进行完结状态
// 这儿有点类似 ajax 和 websocket 的味道了
observer.next();
observer.next();
observer.compolete();
observer.error();
});
s$.pipe().subscribe({
next:(v)=>{// 接受数据},
complete:()=>{ // 完成状态后的回调},
error:(err)=>{//获取 observer.error() 发送过来的值}
});
// 使用完成后需要取消订阅
s$.unsubscsribe();
RxJS 和 Promise 都是处理异步编程的强大工具,但它们在一些方面有所不同。下面是 RxJS 相对于 Promise 的一些优点:

  1. 组合性和灵活性:RxJS 提供了丰富的操作符,允许你以各种方式组合和转换可观察对象,使其更加灵活。你可以使用 map、filter、mergeMap 等操作符来处理数据流,而不仅仅是处理单个值。这样可以使你的代码更加简洁、可读性更强,并且更容易进行复杂的异步操作。
  2. 错误处理:RxJS 提供了一套完善的错误处理机制,你可以使用 catchError 操作符捕获错误并返回备用值,或使用 retry 操作符进行错误重试。这使得在处理异步操作时更加容易处理和管理错误情况。
  3. 取消和中断:RxJS 提供了 unsubscribe 方法,可以取消订阅可观察对象并中断正在进行的异步操作。这对于在某些条件满足时取消异步操作非常有用,而 Promise 并没有直接提供这样的机制。
  4. 动态数据流:RxJS 的可观察对象是动态的,意味着你可以在需要时动态地添加、移除或改变流中的值。这使得操作流程非常灵活,可以根据实际需要进行动态调整。
    需要注意的是,Promise 也有其优势。Promise 的语法相对简单,容易上手,并且被广泛支持和采用。它在处理单个值的异步操作时非常方便。如果你的需求只是处理一次性的异步操作,那么 Promise 可能已经足够满足你的需求。
    综上所述,RxJS 相对于 Promise 在处理复杂的、多个值的异步操作以及灵活性方面具有一定的优势。它提供了更多的工具和操作符,使得处理和组合异步操作更加方便和可读。然而,选择使用 RxJS 还是 Promise 取决于你的具体需求和个人偏好。
    二 为什么要使用rxjs ?
  5. 优秀的异步数据流处理 (异步具有时效性)
  6. 丰富的operator 来操控数据流
    做组件之间的通信,观察者模式(Subject直接使用 )。画布变化事件,主要是将拖拽缩放的快交互,从React的更新体系中剥离出来,因为前者的任务调度影响了整体更新的流畅度
    而不能直接定义emit的key - key是动态获取的,这种情况,采取发布订阅模式 - 事件总线的形式
  7. Canvas 模拟事件
  8. 大量的异步数据更新,并且这些数据之间还存在相互依赖的情况
  9. 通过 RxJS 整合了异步编程、时序控制以及各种遵循函数式编程思想的 Operators 的特性,优雅的实现了这种复杂的状态机

三 rxjs语法
生产者 Observable
观察者 Observer ( .subscribe)
operators
ajax请求 ,websocket,event事件
同步数据源
Of
range(1,100) 产生 1-100 的数据源
From
异步数据源
Interval
Timer
fromEvent
ajax
webSoket

组合数据源
Merge
combineLatest

Observable
被观察者
const source$ = new Observable((subscriber)=>{
// 为啥不能解构 - 因为this绑定
subscriber.next();
subscriber.next();
subscriber.complete();
subscriber.error();
});
观察者
source$ .subscribe({
next:()=>{},
complete:()=>{},
error:()=>{}
});

Subject
多播- Subject 是一种特殊类型的可观察对象(Observable),同时也是观察者(Observer)。它允许多个观察者订阅它,并且可以通过调用 next 方法来向观察者发送新的值。
数据流既可以 next, complete, error
也可以 subscibe
注意:next发送的值,只能被已经订阅的观察者接收到。
注意:
多播也可以用作 takeUntil 的参数,用户手动来
将流变成完成时
BehaviorSubject
对比Subject
BehaviorSubject 在被订阅时会立即发送其最近的值给订阅者 (可以拿到订阅前最新一次的值)。而Subject不能传递初始值,且订阅后才能接受next发送的值,订阅之前的值,都拿不到。

of
在 RxJS 中,"of" 是一个创建 Observable 的操作符。它可以用来创建一个包含指定值的 Observable,并且这些值会按顺序依次发出。
of(a,b,c) //会依次发送 a,b,c 三个值
of([a,b,c]) // 会发送一个 数组值
of是同步发送值,并且发送值以后状态变成 complete

Interval (timer)
Interval (1000)每隔1000ms 连续发送数据,从0开始累加
timer(1000)隔1000ms发送数据0,发送一个数据后,进入完成状态

from

将其他类型的数据源转或者数据结构化为Observable

  1. 数组(数组同 of 数组不同,from 数组会依次发送,of 中数组会把数组当成一个整体进行发送)
  2. 转化Promise。内部实现会手动调用promise中的 resolve或者reject,然后将返回值进行发。
  3. 转化Iterotor 迭代器对象。 上面的数组,也就是迭代器对象,会依次调用next,将迭代器对象中每个值进行发送

fromEvent
将event 事件转化为 Observable

merge
并行聚合 , 按照发送时间的先后顺序,将多个源值并行为一个值。
of (1,2)
of(3,4,5)
Merge 后得到值, 1,2,3,4,5 。按照时间先后顺序进行发送,而不会进行组合
—{count} +

一般用于不同的数据源返回相同的数据类型,
。观察者不关心是哪个数据发送的数据,只关心
数据时间以及数据本身。

race
race(a1$, a2$,a3$) 哪一个流先发送数据,后续就一直订阅这个流的数据。其他流全部放弃。

combineLatest

combineLatest([s1$, s2$, s3$] ).subscribe(([s1,s2,s3])=>{
// 操作
})
当 s1$, s2$, s3$ 都返回至少一个值时,会发送出一个值[s1,s2,s3],然后每次 三个流中有一个流发送变化都会发送出最新的[s1,s2,s3]
const serch$ = xxx;
const currentPage$ = xxx;
combineLatest([serch$,currentPage$ ]) .subscribe(([serch,currentPage])=>{
// 转化成其他源文件

});
forkJoin

forkJoin( [a1$,a2$,a3$] ) , 需要等到 a1$, a2$, a3$ 流都进入完成时,最后返回一次 完成时最后一次发送的数据 数组[a1,a2,a3]
其中有一个流未进入完成时,都不会最终返回结果。
可以使用 take(1)让流进入完成时
有点类似于 Promise.all

animationFrameScheduler
任务调度:类似于 window.requestAnimationFrame 每一帧去去筛选数据

Operator (写在pipe中)
startWith
以什么开头,给一个默认值

map
转化数据 ,输入的是一个函数

mapTo (已废除)
转化为常量。废除了,直接用map(()=> 常量)来表示,过多的 operator 会增加知识复杂度

scan

类似于reduce积累函数 (可以累积发送的值,也可以累积是第几次发送的值,并和当次的值进行组合返回)

withLatestFrom
withLatestFrom操作符是一种在源Observable发出值时将其与其他Observable的最新值进行组合的方式。当源Observable完成时,withLatestFrom不会触发完成回调,并且不会再生成任何结果。source1$.pipe(withLagestFrom(second$)).subscribe(([s1,s2])=>{
// 保证每个source1发送的数据,都对应 second$数据为最新的值
})
注意,在外层源发出数据时,需要确保内层源已经发送出数据了(至少一个),不然取不到值。
也就是,每次外层源发送一个值,就去内层源中取 上次最新发送的值 进行组合。

switchMap

switchMap 接受一个函数作为参数,该函数将源 Observable 的值映射为一个新的 Observable。每当源 Observable 发出新的值时,switchMap 会取消订阅前一个内部 Observable,并订阅最新的内部 Observable。这样可以确保只有最新的内部 Observable 的值被发出,而忽略前一个内部 Observable 的值。

当外层数据产生新值时,内层的数据源订阅会被取消掉
从而确保内部源使用到的外部源的数据总是最新的

可用于解决翻页渲染问题。例如,我们翻页,从第一页,翻页到第二页
又快速翻页到第三页。如果出现网络问题。
switchMap 的使用,当翻页数据到达第三页时,如果第二页还没有返回数据,订阅
订阅会被取消,就算第二页数据返回了,也不会监听到。

mergeMap

串行聚合(有先后顺序的聚合)

mousedow - mousemove(takeUntil - mouseup)

concatMap

用于将一个外部 Observable 序列中的每个值映射为内部Observable,并按照顺序依次订阅内部Observable。当一个内部Observable完成后,才会开始订阅下一个内部Observable。(用于同步数据转 其他数据源,当内部的其他数据源完成时状态时,才会去订阅 外部源的另外一个发送过来的数据 - 但 外部同步数据源肯定是没有延迟的,同时发送了所有数据,但是只是被缓存了,没有进行订阅而已)

tap
拿传过的来值进行一次不影响后续 Stream 的 “纯操作”,
通过 tap 操作符进行 RxJS 的 Debug

take

取具体几个数据,然后满足条件后数据变成完成时。
take(3), 当流发送3个数据后,进入完成时。

takeUntil

获取值,直到某个流发送数据为止,并且让当前流进入完成时状态

参数为一个函数,函数的返回值为一个新的流,当这个新的流
发送第一个数据的时候。外层的流就会终止订阅
takeWhile
中一个常用的操作符是takeWhile,它可以根据条件判断是否继续发送数据或完成流。

delay

让流的发送间隔一段时间再继续发送
delay(300) 延迟300ms

catchError
它用于捕获 Observable 中的错误并处理这些错误。
相当于拿到报错信息,做一层处理,然后return 的值会被,subscribe 的的第二个参数 - 处理报错信息函数监听到。

retryWhen
在rxjs中,retryWhen运算符用于重新订阅一个Observable对象,只有当retryWhen返回的新Observable对象发出complete通知时,才会停止重新订阅。
类似Promise, 当进入错误状态后,流状态就会进入异常状态,不会再进行监听
retryWhen(errors$ => {
// 控制重试逻辑
return errors$
.pipe(
delay(1000) // 延迟1秒后重新订阅
)
.pipe(
// 可以添加更多的操作符
take(3) // 重试最多3次
);
})

delayWhen

s1$.pipe(delayWhen(()=> s2$))
当 s2$ 流发送数据时, s1$ 才会发送数据,不过s1数据会不断累积,在s2发送数据那一刻,全部爆发,一次性发送过来。

distinctUntilChanged
直到源文件发送的值发生改变后,才发送数据
Const currentPage$ = currentSubject$.pipe(distinctUntilChange())
debounceTime
防抖
debounceTime(300)

throttleTime
节流
throttleTime(300)

observeOn

调度器
Import { animationFrameScheduler } from 'rxjs';

  1. asyncScheduler: 调度器可以在异步上下文中执行操作符和观察者。它通过使用 setTimeout 来异步调度任务。
  2. queueScheduler: 调度器会按顺序执行任务,并且每个任务之间有一个最小延迟时间。它使用 setTimeout 进行任务调度
  3. asapScheduler:调度器用于尽快执行任务,优先于 setTimeout。它使用 setImmediate(在支持的环境中)或者 setTimeout(在不支持 setImmediate 的环境中)进行调度。
  4. animationFrameScheduler: 是用来执行与动画相关的任务,基于浏览器的动画帧调度器。
  5. virtualTimeScheduler: 是一个虚拟的调度器,主要用于测试目的。它使用虚拟的时间概念,可以提供对时间的精确控制
    使⽤了asap,是指产⽣全部数据的时间跨度是215毫秒,⽽
    不是独占唯⼀的线程215毫秒,asap把产⽣每⼀个数据的⼯作都通过Micro Task来实现,这多绕了个路,时间跨度当然⼤,但是这也避免了同步调 ⽤,在这215毫秒⾥,其他插⼊的微任务可以获得执⾏的机会,如果是在浏 览器中执⾏,⽹页中的渲染和对⽤户操作的响应也可以在事件处理的间隙
    获得执⾏的机会。
    使⽤asap这样的Scheduler,⽬的是提⾼整体的感知性能,⽽不 是为了缩短单独⼀个数据流的执⾏时间。

asap会尽量使⽤Micro Task,⽽async利⽤的是Macro Task
queue这个Scheduler,如果调⽤它的schedule函数式参数delay是0,那 它就⽤同步的⽅式执⾏,如果delay参数⼤于0,那queue的表现其实就和 async⼀模⼀样
调度器
时间调度:requestAnimationFrame - raf
验证 mergeMap 和 switchMap 的区别
of(1000, 300, 200, 100)
.pipe(
// switchMap((v) => of(v).pipe(delay(v))),
mergeMap((v) => of(v).pipe(delay(v))),
//observeOn(animationFrameScheduler),
)
.subscribe((v) => {
console.log(v);
});
四 什么情况下需要使用rxjs

  1. 出现多重异步的情况,使用promise 或者async/await 都有点吃力,写出来的代码耦合度高、杂乱不易理解。
  2. 需要使用观察者模式进行监听发布时,可以考虑使用rxjs
    使用rxjs目的,减少异步书写难度,减少代码耦合、增强代码可维护性。
    注意,一般能使用promise、async/await 解决的简单问题,不建议使用rxjs。
    五 案例介绍
  3. 需求一: 用rxjs 实现一个简单的 excel 表格
  4. 需求二:实现一个表单搜索与表格翻页于一体的功能
  5. input框输入要进行 防抖操作
  6. 翻页数字必须变化才会进行搜索
  7. 最后渲染的数据必须同搜索的数据流一致(防止因某些网络原因或者服务器原因,导致前进行请求的数据后返回的情况)
  8. 需求三: 你画我猜 游戏
  9. 鼠标按下,鼠标移动根据鼠标移动的轨迹进行线条绘制
  10. 鼠标弹起,停止绘制
  11. 需求四:键盘按键触发彩蛋- 大招
  12. 比如,需要在3秒内触发 上下下左上右下 这几个按键就能触发一个彩蛋。
  13. 需求五:使用rxjs 实现 canvas 事件系统
    参考文档:
  14. Rxjs 官网(英文)
  15. Rxjs 官网 (中文翻译版)
  16. 学习rxjs 操作符 中文版
  17. 《深入浅出Rxjs》
  18. 从业务逻辑聊聊为什么我们需要Rxjs?
  19. 异步复杂到什么程度才需要使用rxjs?
  20. 弹珠图