reactor模式

发布时间 2023-10-23 21:26:29作者: 做时间的好朋友

reactor模式

模型:
image

1.三种角色说明

reactor:派发器负责监听及分配事件,将事件分配给对应的handler
acceptor:请求连接器,处理用户新过来的连接
handler:请求处理器,负责事件的处理,将自身于事件绑定

2.模型分类

  • 单reactor单线程模型
  • 单reactor多线程模型
  • 主从reactor单线程模型
  • 主从reactor多线程模型

3.主从reactor多线程模型性能最好,效率最高

4.基于reactor模型思路做项目的模拟代码

4.1 reactor模型设计

Reactor.java

public abstract class Reactor<T> {
    public abstract T execute(String msg) throws Exception;
}

MainReactor.java 只处理新用户连接

public class MainReactor extends Reactor<Object>{

    @Override
    public Object execute(String msg) throws Exception {
        return null;
    }
}

SubReactor.java 实际工作reactor

@NoArgsConstructor
public class SubReactor extends Reactor<Object>{
    @Override
    public Object execute(String msg) throws Exception {
        NotifyMethod method = NotifyMethod.TIMLINE;
        HandlerDispatcher dispatcher = Singleton.get(HandlerDispatcher.class);
        AbstractHandler handler = dispatcher.getHandler(method);
        return handler.execute();
    }
}

ReactorDispatcher reactor分发器

public class ReactorDispatcher {


    private Map<MsgType, List<Reactor>> ship = new HashMap<MsgType,List<Reactor>>(){{
        put(MsgType.ALIVE, Lists.newArrayList(Singleton.get(MainReactor.class)));
        put(MsgType.BUSINESS, IntStream.range(0,Runtime.getRuntime().availableProcessors())
                .mapToObj(c -> new SubReactor()).collect(Collectors.toList()));
    }};
    public Reactor getReactor(MsgType type){
        List<Reactor> reactors = ship.get(type);
        int index = new Random().nextInt(reactors.size());
        return reactors.get(index);
    }
}

4.2 HandlerDispatcher的设计

public class HandlerDispatcher {
    private Map<NotifyMethod, AbstractHandler> ships = new HashMap<NotifyMethod,AbstractHandler>(){{
        put(NotifyMethod.TIMLINE, Singleton.get(TimlineHandler.class));
    }};

    public AbstractHandler getHandler(NotifyMethod method){
        return ships.get(method);
    }
}

4.3 handler的设计

AbstractHandler

public abstract class AbstractHandler<T> {
    protected ThreadPoolExecutor workerPool;

    public AbstractHandler(ThreadPoolExecutor workerPool) {
        this.workerPool = workerPool;
    }

    public abstract T execute() throws Exception;
}

TimlineHandler.java

public class TimlineHandler extends AbstractHandler<Result>{

    public TimlineHandler() {
        super(ThreadUtils.newDaemonFixedThreadPool(20,"TIMLINE-HANDLER"));
    }

    @Override
    public Result execute() throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<Result> future = CompletableFuture.supplyAsync(new Supplier<Result>() {
            @Override
            public Result get() {
                return Result.of(null);
            }
        }, workerPool);
        return future.get(10, TimeUnit.SECONDS);
    }
}

4.4 启动类

public class App {
    public static void main(String[] args) throws Exception {
        Msg msg = new Msg("111", MsgType.BUSINESS);
        ReactorDispatcher reactorDispatcher = Singleton.get(ReactorDispatcher.class);
        Reactor reactor = reactorDispatcher.getReactor(msg.type);
        reactor.execute(msg.content);

    }

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    static class Msg { // 消息体
        String content;
        MsgType type;
    }
}