使用线程执行框架的一次经历

场景

一个线程从某个地方接收消息(数据),使用可以是线程其他主机或者消息队列,然后转由另外的执行一个线程池来执行具体处理消息的逻辑,并且消息的框架处理速度小于接收消息的速度。这种情景很常见,经历试想一下,使用你会怎么设计和实现?线程

直观想法

很显然采用JUC的线程框架,可以迅速写出代码。执行

消息接收者:

public class Receiver {      private static volatile boolean inited = false;     private static volatile boolean shutdown = false;     private static volatile int cnt = 0;     private MessageHandler messageHandler;     public void start(){          Executors.newSingleThreadExecutor().execute(new Runnable() {              @Override             public void run() {                  while(!shutdown){                      init();                     recv();                 }             }         });     }     /**      * 模拟消息接收      */     public void recv(){              Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s",框架 ++cnt, msg)); messageHandler.handle(msg); } public void init(){  if(!inited){  messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) {  new Receiver().start();     } } 

消息处理:

public class MessageHandler {      private static final int THREAD_POOL_SIZE = 4;     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);     public void handle(Message msg) {          try {              service.execute(new Runnable() {                  @Override                 public void run() {                      parseMsg(msg);                 }             });         } catch (Throwable e) {              System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) {  while (true) {  try {  System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) {                  e.printStackTrace();             }         }     } } 

效果:这种方案导致的现象是接收到的消息会迅速堆积,我们从消息队列(或者其他地方)取出了大量消息,经历但是使用处理线程的速度又跟不上,所以导致的线程问题是大量的Task会堆积在线程池底层维护的一个阻塞队列中,这会极大的执行耗费存储空间,影响系统的框架性能。

分析:当execute()一个任务的经历时候,服务器租用如果有空闲的worker线程,那么投入运行,否则看设置的***线程个数,没有达到线程个数限制就创建新线程,接新任务,否则就把任务缓冲到一个阻塞队列中,问题就是这个队列,默认的大小是没有限制的,所以就会大量的堆积任务,必然耗费heap空间。

public static ExecutorService newFixedThreadPool(int nThreads) {          return new ThreadPoolExecutor(nThreads, nThreads,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>());     } public LinkedBlockingQueue() {          this(Integer.MAX_VALUE); // capacity     } 

计数限制

面对上述问题,想到了要限制消息接收的速度,自然就想到了各种线程同步的原语,不过在这里最简单的就是使用一个Volatile的计数器。

消息接收者:

public class Receiver {      private static volatile boolean inited = false;     private static volatile boolean shutdown = false;     private static volatile int cnt = 0;     private MessageHandler messageHandler;     public void start(){          Executors.newSingleThreadExecutor().execute(new Runnable() {              @Override             public void run() {                  while(!shutdown){                      init();                     recv();                 }             }         });     }     /**      * 模拟消息接收      */     public void recv(){              Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){  if(!inited){  messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) {  new Receiver().start();     } } 

消息处理:

public class MessageHandler {      private static final int THREAD_POOL_SIZE = 1;     private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE);     public void handle(Message msg){          try {              service.execute(new Runnable() {                  @Override                 public void run() {                      parseMsg(msg);                 }             });         } catch (Throwable e) {              System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message){  try {  Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) {  e.printStackTrace(); }finally {              Receiver.limit --;         }     } } 

效果:通过控制消息的个数来阻塞消息的接收过程,就不会导致任务的堆积,系统的网站模板内存消耗会比较平缓,限制消息的个数本质就和下面限制任务队列大小一样。

使用同步队列 SynchronousQueue

SynchronousQueue 虽名为队列,但是其实不会缓冲任务的对象,只是作为对象传递的控制点,如果有空闲线程或者没有达到***线程限制,就会交付给worker线程去执行,否则就会拒绝,我们需要自己实现对应的拒绝策略RejectedExecutionHandler,默认的是抛出异常RejectedExecutionException。

消息接收者同上。

消息处理:

public class MessageHandler {      private static final int THREAD_POOL_SIZE = 4;     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,             new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {          @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {              System.out.println("自定义拒绝策略"); try {  executor.getQueue().put(r); System.out.println("重新放任务回队列"); } catch (InterruptedException e) {  e.printStackTrace(); } } }); public void handle(Message msg) {  try {  System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() {  @Override public void run() {  parseMsg(msg); } }); } catch (Throwable e) {  System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) {  while (true) {  try {  System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) {                  e.printStackTrace();             }         }     } } 

效果:能够控制消息的接收速度,但是我们需要在rejectedExecution中实现某种阻塞的操作,但是选择在发生拒绝的时候把任务重新放回队列,带来的问题就是这个Task会发生饥饿现象。

使用大小限制的阻塞队列

使用LinkedBlockingQueue作为线程框架底层的香港云服务器任务缓冲区,并且设置大小限制,思想上和上述方案一样,都是有一个阻塞的点,但是通过***的jvm monitor看到这里的CPU消耗更少,内存使用有所降低,并且波动小(具体原因有待探索)。

消息接收者同上。

消息处理:

public class MessageHandler {      private static final int THREAD_POOL_SIZE = 4;     private static final int BLOCK_QUEUE_CAP = 500;     ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,             new LinkedBlockingQueue<Runnable>(BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() {          @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {              System.out.println("自定义拒绝策略"); try {  executor.getQueue().put(r); System.out.println("重新放任务回队列"); } catch (InterruptedException e) {  e.printStackTrace(); } } }); public void handle(Message msg) {  try {  service.execute(new Runnable() {  @Override public void run() {  parseMsg(msg); } }); } catch (Throwable e) {  System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) {  try {  Thread.sleep(5000); System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) {  e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory {  @Override public Thread newThread(Runnable r) {  Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread;         }     } } 

总结

多线程是比较容易出问题的地方,特别当对方法不熟悉的时候

域名
上一篇:数据中心如何为飓风的到来做好准备
下一篇:戴尔Precision T7820 一款彪悍的塔式工作站