webflux 实现发布订阅

发布时间 2023-10-25 12:51:38作者: 白衣风云

模拟数据源,每5秒产生一个数据

 private void createData(FluxSink<String> sink){
        Flux.interval(Duration.ofSeconds(5)).map(i-> "Flux data---"+ i)
                .doOnNext(sink::next)
                .subscribe();
    }

创建两个全局变量,一个用来保存数据,一个用来获取数据

  Flux<String> source;
  Flux<String> result;

把产生的数据保存的全局变量中

    public OrderMessageHandler() {
       addData().subscribe();
    }

    public Flux<Void> addData(){
        System.out.println("添加数据");
        result = Flux.push(this::createData);
        source = Flux.push(this::generateData);
        return Flux.empty();
    }

    private void generateData(FluxSink<String> sink){
        result.doOnNext(sink::next)
                .takeWhile(str -> !sink.isCancelled())
                .subscribe();
    }

发送到前端

    public Mono<Void> handle(WebSocketSession session) {
        session.receive().subscribe(System.out::println);
            System.out.println("websocket");
        return session.send(initData(session));
    }

    public Flux<WebSocketMessage> initData(WebSocketSession session ){
          return source.map(session::textMessage);

    }


来自为知笔记(Wiz)