angular - Rxjs

发布时间 2023-09-02 00:03:15作者: 【唐】三三

Rxjs

//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
  setTimeout(() => {
    subscriber.next({ name: '张三' });
    subscriber.complete();
  }, 2000);
});

// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
  next: function (x: any) {
    console.log(x);
  }
}

// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('just after subscribe');

可观察对象(Observable)/观察者(Observer)/订阅(Subscribe)

(1)可多次调用next发送数据

//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
  setTimeout(() => {
    subscriber.next({ name: '张三' });
    subscriber.complete();
  }, 2000);
});

// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
  next: function (x: any) {
    console.log(x);
  }
}

// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');

(2)当完成使用 complete 结束调用

//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
  let index = 0;
  let timer = setInterval(() => {
    subscriber.next(index++);
    if (index == 5) {
      subscriber.complete();
      clearInterval(timer);
    }
  }, 1000);
});

// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
  next: function (x: any) {
    console.log(x);
  },
  complete: () => {
    console.log('完成');
  }
}

// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');

(3)error:内部逻辑错误,将失败信息发给订阅者。Observerable 终止。

//Observable(可观察者):表示未来(future)值或事件的可调用集合的概念。
const observable = new Observable((subscriber) => {
  let index = 0;
  let timer = setInterval(() => {
    subscriber.next(index++);
    if (index == 5) {
      // subscriber.complete();
      subscriber.error("失败了~");
      clearInterval(timer);
    }
  }, 1000);
});

// Observer(观察者):是一个回调集合,它知道如何监听 Observable 传来的值。
const observer = {
  next: function (x: any) {
    console.log(x);
  },
  error: (x: string) =>{
    console.log(x);
  },
  complete: () => {
    console.log('完成');
  }
}

// Subscription(订阅):表示 Observable 的一次执行,主要用于取消执行。
observable.subscribe(observer);
console.log('subscribe 之后');

Subject 构造

订阅立即执行 传参 广播历史
Subject
BehaviorSubject
ReplaySubject

用于创建可观察对象,但订阅后不会立刻执行,next 可以在可观察对象外部调用

const demoSubject = new Subject()

demoSubject.subscribe({
  next: (x: any) => {
    console.log(x);
  }
})

demoSubject.subscribe({
  next: (x: any) => {
    console.log(x);
  }
})

console.log('没调用');

setTimeout(() => {
  demoSubject.next("222") //2秒后调用
}, 2000);

BehaviorSubject:可传参的 Subject,而且立即调用

const behaviorSubject  = new BehaviorSubject('默认值~');

behaviorSubject.subscribe({
  next: (x: any) => {
    console.log(x);
  }
})

behaviorSubject.next('改变值');

ReplaySubject:它通过在新订阅者首次订阅时发送旧值来“重播”旧值。

ReplaySubject会广播历史结果,而Subject不会广播历史结果

import { ReplaySubject } from 'rxjs';

const replaySubject  = new ReplaySubject();

replaySubject.subscribe({
  next: (x: any) => {
    console.log(x);
  }
})

replaySubject.next('hello 1')
replaySubject.next('hello 2')

setTimeout(() => {
  //过2秒再订阅
  replaySubject.subscribe({
    next: (x: any) => {
      console.log(x);
    }
  })
}, 3000);

操作符

操作符是 Observable 类型上的方法,比如 .map(...).filter(...).merge(...),等等。当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

创建操作符

range 指定范围内的数字序列

range(start: number, count: number, scheduler: Scheduler): Observable

创建一个 Observable ,它发出指定范围内的数字序列。

import { range } from 'rxjs';

range(0,10).subscribe((x)=>console.log(x))

from (ObservableInput) 创建一个 Observable

从一个 ObservableInput (数组、类数组对象、Promise、迭代器对象或者类 Observable 对象) 创建一个 Observable.

from([10, 20, 30]).subscribe(x => console.log(x));
//10
//20
//30

of 根据提供的参数创建 Observable

发出你提供的参数,然后完成。

    of([1, 2, 3]).subscribe(x => console.log(x)); 
    //打印 [1,2,3]

    of('a',1,[1, 2, 3]).subscribe(x => console.log(x));
    //'a'
    //1
    //[1, 2, 3]

fromEvent 发出来自给定事件对象的指定类型事件

创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。

通过给“事件目标”添加事件监听器的方式创建 Observable,可能会是拥有addEventListener和 removeEventListener方法的对象,一个 Node.js 的 EventEmitter,一个 jQuery 式的 EventEmitter, 一个 DOM 的节点集合, 或者 DOM 的 HTMLCollection。 当输出 Observable 被订阅的时候事件处理函数会被添加, 当取消订阅的时候会将事件处理函数移除。

名称 类型 属性 描述
target EventTargetLike DOMElement, 事件目标, Node.js EventEmitter, NodeList 或者 HTMLCollection 等附加事件处理方法的对象。
eventName string 感兴趣的事件名称, 被 target 发出。
options EventListenerOptions - 可选的 可选的传递给 addEventListener 的参数。
selector SelectorMethodSignature - 可选的 可选的函数处理结果. 接收事件处理函数的参数,应该返回单个值。
fromEvent(document, 'click').subscribe(x => console.log(x));
// 结果:
// 每次点击 document 时,都会在控制台上输出 MouseEvent 。

angular

html

<button id="btn">提交</button>

app.component.ts

ngOnInit() {
    const btn = document.getElementById('btn')
    if (btn != null) {
      fromEvent(btn, 'click')
        .pipe(map(event => event.target))
        .subscribe(console.log);
    }
  }

interval 定期发出自增的数字

每隔一段时间发送数值,数值递增

interval(1000).subscribe(console.log)

timer 指定时间的 interval

就像是interval, 但是你可以指定什么时候开始发送

timer(3000,1000).subscribe(console.log) //像interval,但是这里可以设置过3秒

转换操作符

map 数据转换

基于数据流,进行数据转换。

import { map, range } from 'rxjs';

range(0,10)
.pipe(map(x=>x*10))
.subscribe((x)=>console.log(x))

swtichMap 切换可观察对象

它仍然提供一个Observable作为输出,不是通过合并,而是通过仅从最新的Observable 发出的结果。

重点突出在切换最后1次Observeble.

    const btn = document.getElementById('btn')
    if (btn != null) {
      fromEvent(btn, 'click')
        .pipe(switchMap(event => interval(1000)))
        .subscribe(console.log);
    }

对于我们的最后一个示例,如果我们使用switchMap,我们只会从最后一个Observable 中获取结果。

     const dymiAPI = (character:any): any => {
      return of(`API response for character: ${character}`).pipe(delay(1000))
    }

    from(['aa', 'bb', 'cc', 'dd']).pipe(
      switchMap(arr => dymiAPI(arr))
    ).subscribe(data => console.log(data)) //这里只会输出最新的 dd
     //结果:
     //API response for character: dd

pluck(rxjs v8要弃用)

获取属性值

ngOnInit() {
    const btn = document.getElementById('btn')
    if (btn != null) {
      fromEvent(btn, 'click')
        // .pipe(map(event => event.target))
        .pipe(pluck('target'))
        .subscribe(console.log);
    }
  }

过滤操作符

take

获取最前的N个

   interval(1000).pipe(take(2)).subscribe(console.log)

takeWhile

满足 predicate 函数的每个值,并且一旦出现不满足 predicate 的值就立即完成

    range(1, 10).pipe(takeWhile(x => x < 5)).subscribe(console.log) //1,2,3,4

    fromEvent(document, 'mousemove')
    .pipe( takeWhile((Event: any) => Event.clientX  > 200))
    .subscribe(console.log) //打印 mousemove 事件的 event,鼠标移动到clientX 小于 200 的时候,停止

takeUntil

直到 notifier Observable 发出值

    const btn = document.getElementById('btn')
    if (btn != null) {
      fromEvent(document, 'mousemove')
        .pipe(takeUntil(fromEvent(btn, 'click')))
          .subscribe(console.log)  //打印 mousemove 事件的event,点击 btn,停止
    }

throttleTime 限流

节流,可观察对象高频次发送数据流,限定时间内只能向订阅者发送一次数据流。

    fromEvent(document, 'click')
    .pipe(throttleTime(2000))
    .subscribe(console.log) //鼠标不停点击浏览器内容,2秒内只执行一次

debounceTime 防抖

防抖,高频次发送数据流,只响应最后一次。

fromEvent(document, 'click')
    .pipe(debounceTime(2000))
    .subscribe(console.log)

点浏览器内容N次,过2秒执行最后1次(最新1次)的结果

点击1次

distinctUntilChanged 检测数据流是否和上次相同

检测数据源发出的数据流是否和上次一样,相同就忽略,不相同就发出。

组合操作符

forkjoin 合并多个请求,直到发出的最后一个值

类似 Promise.all(),等待 Observables 完成,然后合并它们发出的最后一个值。

import { forkJoin, of, timer } from 'rxjs';

const observable = forkJoin({
  foo: of(1, 2, 3, 4),
  bar: Promise.resolve(8),
  baz: timer(4000)
});
observable.subscribe({
 next: value => console.log(value),
 complete: () => console.log('This is how it ends!'),
});

// Logs:
// { foo: 4, bar: 8, baz: 0 } ==> after 4 seconds
// 'This is how it ends!' ==> 紧接着