Disruptor技术相关

发布时间 2023-10-02 18:40:36作者: strongmore

简介

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

使用

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.7</version>
</dependency>
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 多生产者多消费者的场景
 */
@Slf4j
public class TestDisruptorClient2 {
    public static void main(String[] args) throws InterruptedException {
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
                OrderEvent::new,
                1024 * 1024,
                Executors.defaultThreadFactory(),
                // 这里的枚举修改为多生产者
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        //一个消息会被多个消费者消费
//        disruptor.handleEventsWith(new OrderEventHandler("李四"), new OrderEventHandler("张三"));
        //一个消费者仅会被一个消费者消费
        disruptor.handleEventsWithWorkerPool(new OrderEventHandler("李四"), new OrderEventHandler("张三"));
        disruptor.start();
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
        // 创建一个线程池,模拟多个生产者
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) {
            fixedThreadPool.execute(() -> eventProducer.onData(UUID.randomUUID().toString()));
        }
    }

    /**
     * 消息内容
     */
    @Data
    public static class OrderEvent {
        private String id;
    }

    /**
     * 生产者
     */
    public static class OrderEventProducer {
        private final RingBuffer<OrderEvent> ringBuffer;

        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        public void onData(String orderId) {
            long sequence = ringBuffer.next();
            try {
                OrderEvent orderEvent = ringBuffer.get(sequence);
                orderEvent.setId(orderId);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }


    /**
     * 消费者
     */
    public static class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
        private String name;

        public OrderEventHandler(String name) {
            this.name = name;
        }

        @Override
        public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
            log.info("name: {}, event: {}, sequence: {}, endOfBatch: {}", this.name, event, sequence, endOfBatch);
        }

        @Override
        public void onEvent(OrderEvent event) {
            log.info("name: {}, event: {}", this.name, event);
        }
    }
}

这个库在soft-jraft(一个开源的raft协议实现)中有被使用到。

参考

高性能队列——Disruptor
高性能队列 Disruptor 使用教程