Disruptor是什么可以阅读《高性能线程间队列DISRUPTOR简介》一文,下面重点讲讲在实际应用中如何去使用Disruptor。
项目结构如下:
CreateReqEvent.java
package com.bijian.study; import com.lmax.disruptor.EventFactory; public class CreateReqEvent { private String reqStr; public String getReqStr() { return reqStr; } public void setReqStr(String reqStr) { this.reqStr = reqStr; } private static class Factory implements EventFactory<CreateReqEvent> { @Override public CreateReqEvent newInstance() { return new CreateReqEvent(); } } public static final CreateReqEvent.Factory FACTORY = new CreateReqEvent.Factory(); }
CreateReqEventHandler.java
package com.bijian.study; import java.lang.invoke.MethodHandles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lmax.disruptor.EventHandler; public class CreateReqEventHandler implements EventHandler<CreateReqEvent> { private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override public void onEvent(CreateReqEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("on event:{}", event.getReqStr()); } }
CreateReqEventTranslator.java
package com.bijian.study; import com.lmax.disruptor.EventTranslator; public class CreateReqEventTranslator implements EventTranslator<CreateReqEvent> { private String reqString; public String getReqString() { return reqString; } public void setReqString(String reqString) { this.reqString = reqString; } @Override public void translateTo(CreateReqEvent event, long sequence) { event.setReqStr(reqString); } }
ReqEventUtil.java
package com.bijian.study; import java.lang.invoke.MethodHandles; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; public class ReqEventUtil { private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static volatile Disruptor<CreateReqEvent> disruptor; private static ExecutorService executor; //启动处理线程 static { int ringBufferSize = 256 * 256; // RingBuffer 大小=65536,必须是 2的 N次方; executor = Executors.newFixedThreadPool(4*4); disruptor = new Disruptor(CreateReqEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,new BlockingWaitStrategy()); EventHandler<CreateReqEvent> eventHandler = new CreateReqEventHandler(); disruptor.handleEventsWith(eventHandler); disruptor.start(); } public static void push(String reqString){ try { log.info("push create reqString event:{}", reqString); CreateReqEventTranslator translator = new CreateReqEventTranslator(); translator.setReqString(reqString); disruptor.publishEvent(translator); }catch (Exception e){ log.error("push CreateOrderEvent error:",e); } } /** * 停止处理 */ public static void shutdown(){ log.info("shutdown now..."); if(disruptor != null) { disruptor.shutdown(); } if(executor != null) { executor.shutdown(); } } }
测试类Main.java
package com.bijian.test; import java.lang.invoke.MethodHandles; import java.util.Scanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.bijian.study.ReqEventUtil; public class Main { private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static void main(String[] args) { Scanner sc = new Scanner(System.in); while(sc.hasNext()) { String reqStr = sc.nextLine(); log.info("输入的信息:" + reqStr); if(reqStr.equals("exit")) { ReqEventUtil.shutdown(); break; }else { ReqEventUtil.push(reqStr); } } } }
运行结果:
test 2018-12-24 23:12:26.337 [main] INFO Main.main(Main.java:20)[][][] - 输入的信息:test 2018-12-24 23:12:26.370 [main] INFO ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:test 2018-12-24 23:12:26.371 [pool-2-thread-1] INFO CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:test abc 2018-12-24 23:12:28.323 [main] INFO Main.main(Main.java:20)[][][] - 输入的信息:abc 2018-12-24 23:12:28.323 [main] INFO ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:abc 2018-12-24 23:12:28.324 [pool-2-thread-1] INFO CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:abc xyz 2018-12-24 23:12:30.628 [main] INFO Main.main(Main.java:20)[][][] - 输入的信息:xyz 2018-12-24 23:12:30.628 [main] INFO ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:xyz 2018-12-24 23:12:30.628 [pool-2-thread-1] INFO CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:xyz exit 2018-12-24 23:12:34.032 [main] INFO Main.main(Main.java:20)[][][] - 输入的信息:exit 2018-12-24 23:12:34.033 [main] INFO ReqEventUtil.shutdown(ReqEventUtil.java:49)[][][] - shutdown now...
PS:完整工程代码详见附件《DisruptorStudy.zip》,《DisruptorStudy02.zip》是《Disruptor样例实战》一文的完整工程代码。
相关推荐
Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
简单讲解disruptor并附上demo
disruptor 缓冲队列 高效
Disruptor专题简单案例资料 https://phoenix.blog.csdn.net/article/details/131264151
使用Netty, Disruptor处理实时外汇报价
Java工具:高性能并发工具Disruptor简单使用
那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。 可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。其实Disruptor与其说是一个框架...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 ...
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。