Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型。
RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据。
在Disruptor框架中,生产者生产的数据叫做Event。
1.MyEvent:自定义对象,充当“生产者-消费者”模型中的数据。
2.MyEventFactory:实现EventFactory的接口,用于生产数据。
3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布。
4.MyEventHandler:自定义消费者。
初次接触Disruptor,认识停留在表面,零散,模糊,在此记一个简单的示例,以便日后深入研究。
package com.disruptor.basic;public class LongEvent {private long value;public long getValue() {return value;
}public void setValue(long value) {this.value = value;
}
}package com.disruptor.basic;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {public LongEvent newInstance() {// TODO Auto-generated method stubreturn new LongEvent();
}
}package com.disruptor.basic;import java.nio.ByteBuffer;import com.lmax.disruptor.EventTranslatorOneArg;import com.lmax.disruptor.RingBuffer;public class LongEventProducerWithTranslator {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;
}private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {/** * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
* bb:包含有将要被存储到目标对象中的数据的容器 */public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {// TODO Auto-generated method stubevent.setValue(bb.getLong(0));// 将数据存储到目标对象中 }
};public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者 }
}package com.disruptor.basic;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent> {public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO Auto-generated method stubSystem.out.println("当前消费的数据="+event.getValue());
}
}package com.disruptor.basic;import java.nio.ByteBuffer;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.junit.Test;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;public class LongEventTest {
@SuppressWarnings({ "unchecked", "deprecation" })
@Testpublic void test01() throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
EventFactory<LongEvent> factory = new LongEventFactory();int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,new YieldingWaitStrategy());
disruptor.handleEventsWith(new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// LongEventProducer producer = new// LongEventProducer(ringBuffer);LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);// long startTime = System.currentTimeMillis();for (long a = 0; a < 100; a++) {
bb.putLong(0, a);
producer.onData(bb);/*if (a == 99) {
long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));
}*/Thread.sleep(100);
}/*long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));*/disruptor.shutdown();
executor.shutdown();
}/*@Test
public void test02() {
long startTime = System.currentTimeMillis();
for (long a = 0; a < 100; a++) {
System.out.println(a);
}
long endTime = System.currentTimeMillis();
System.out.println("useTime=" + (endTime - startTime));
}*/}
以上就是Disruptor的概述与使用的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号