在日常开发中,我们有时会遇到遇到多线程处理任务的情况,JDK里提供了便利的ThreadPoolExecutor以及其包装的工具类Executors。但是我们知道ExecutorService.excute(Runnable r)是异步的,超过线程池处理能力的线程会被加入到执行队列里。有时候为了保证任务提交的顺序性,我们不希望有这个执行队列,在线程池满的时候,则把主线程阻塞。那么,怎么实现呢?
最直接的想法是继承ThreadPoolExecutor,重载excute()方法,加入线程池是否已满的检查,若线程池已满,则等待直到上一个任务执行完毕。这里ThreadPoolExecutor提供了一个afterExecute(Runnable r, Throwable t)方法,每个任务执行结束时会调用这个方法。 同时,我们会用到concurrent包的ReentrantLock以及Condition.wait/notify方法。以下是实现代码:
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
@Override
public void execute(Runnable command) {
pauseLock.lock();
try {
while (getPoolSize()==getMaximumPoolSize() && getQueue().remainingCapacity()==0)
unpaused.await();
super.execute(command);//放到lock外面的话,在压力测试下会有漏网的!
} catch (InterruptedException e) {
log.warn(this, e);
} finally {
pauseLock.unlock();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r,t);
try{
pauseLock.lock();
unpaused.signal();
}finally{
pauseLock.unlock();
}
当然,有些熟悉JDK源码的人会说,自己实现这个太费劲了,不喜欢!有没有比较简单的方法呢?
这里介绍一下vela同学的方法:http://vela.diandian.com/post/2012-07-24/40031283329
研究ThreadPoolExecutor.excute()源码会发现,它调用了BlockingQueue.offer()来实现多余任务的入队。BlockingQueue有两个方法:BlockingQueue.offer()和BlockingQueue.put(),前者在队列满时不阻塞,直接失败,后者在队列满时阻塞。那么,问题就很简单了,继承某个BlockingQueue,然后将offer()重写,改成调用put()就搞定了!最短的代码量,也能起到很好的效果哦!
package com.diandian.framework.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorsEx {
/**
* 创建一个堵塞队列
*
* @param threadSize
* @return
*/
public static ExecutorService newFixedThreadPool(int threadSize) {
return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
private static final long serialVersionUID = -9028058603126367678L;
@Override
public boolean offer(Runnable e) {
try {
put(e);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
});
}
当然这个方法有一点让人不快的地方,因为它与我们熟知的OO基本原则之一–里氏替换原则冲突了,即子类的方法与父类的方法有不同的行为。毕竟都是实现了BlockingQueue接口,offer()方法的行为被改变了。虽然只是一个匿名类,但是对于某些OOP的拥趸来说总有些不爽的地方吧!
没关系,我们还有JDK默认的解决方法:使用RejectedExecutionHandler。当ThreadPoolExecutor.excute执行失败时,会调用的RejectedExecutionHandler,这就是ThreadPoolExecutor的可定制的失败策略机制。JDK默认提供了4种失败策略: AbortPolicy(中止)、CallersRunPolicy(调用者运行)、DiscardPolicy(丢弃)、DiscardOldestPolicy(丢弃最旧的)。 其中值得说的是CallersRunPolicy,它会在excute失败后,尝试使用主线程(就是调用excute方法的线程)去执行它,这样就起到了阻塞的效果!于是一个完完全全基于JDK的方法诞生了:
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue
new ThreadPoolExecutor.CallerRunsPolicy());
}
当然这个方法有一个问题:这样加上主线程,总是会比参数的size线程多上一个。要么在函数开始就把size-1,要么,我们可以尝试自己实现一个RejectedExecutionHandler:
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
怎么样,这下是不是感觉挺好了呢?
[来源: http://my.oschina.net/flashsword/blog/114527]
分类目录
- arch/management (17)
- computer (38)
- java/j2ee (304)
- lnmpa (237)
- mac/iphone/ipad/android (11)
- mysql/oracle/postgresql (126)
- os/software (74)
- other (518)
- python (6)
- redis/memcached/mongo (31)
- sitebuild (143)
随便看看
标签云
程序员 创业 人生箴言 eclipse 快捷键 术语 索引 unix命令 vim wordpress java学习笔记 环境变量 oracle内置函数 index 人生 数据类型 nohup tuxedo mysql学习笔记 MS-DOS命令 servlet spring 职场进阶 职业进阶 服务器选购 服务器选型 apache JPA MongoDB 注解 tomcat 子女教育 jquery maven JVM aix命令 网络营销 java异常 seo 人生规划 关键字 css 网络推广 struts 系统优化 成长 frame iframe bluehost jdbc select 我的信仰 oracle函数 cookie HashMap 站长工具 乱码 ArrayList secureCRT jsp session tail find halt 事务 oracle单记录函数 算法 URL window table javascript操作表单元素 String 字符串处理 健康 http 域名 情感 more google A记录 域名解析 netstat 弹出对话框 弹出窗口 框架集 框架 excel 字符串 javascript函数 showModalDialog nginx number 数组 sql frameset 开源程序 java数组 软件 oracle服务友情链接
收藏链接