create_new_thread(thd){ // 检查连接数是否过大 mysql_mutext_lock(&lock_connection_count); if (connetion_count >= max_connections + 1 || abort_loop){ mysql_mutext_unlock(&lock_connection_count); delete thd; // will close the connection return; } ++connection_count; if (connection_count > max_used_connections){ max_used_connections = connection_count; } mysql_mutext_unlock(&lock_connection_count); mysql_mutex_lock(&lock_thread_count); // 我在想如果连接很多很多,溢出了怎么办?? thd->thread_id = thd->variables.pseudo_thread_id = thread_id++; thread_count++; // add_connection当然是个函数指针,thread_scheduler是一个放了这些函数指针的结构体 // 为了跨平台,却没有使用C++的多态,而是用了函数指针实现了多态 thread_scheduler->add_connection(thd);}
// 这个就是add_connection的实现void create_thread_to_handle_connection(thd){ // 注意这个判断,cache_thread_count大于wake_thread的时候才会进来, // 当线程结束的时候,线程cachd_thread_count++ // 当发现有空闲,先wake_thread_count++,cachd_thread_count-- // 成功启动线程后,wake_thread_count--,这时wake_thread_count和cachd_thread_count又归于平衡 // 简单来说,cache_thread_count是用来记录可用线程数量的,而wake_thread是用来记录就绪线程数量的 // 多数时候,wake_thread_count基本是0,而cache_thread_count则大于等于0 if (cache_thread_count > wake_thread){ thread_cache.push_back(thd); wake_thread++; mysql_cond_signal(&cond_thread_cache); } else { // 如果没有cached thread则直接创建新的线程 thread_created++; // threads这个list放着所有正在运行时的thd threads.append(thd); thd->prior_thr_create_utime = thd->start_utime = my_micro_time(); // 真正创建一个线程出来 if (error = mysql_thread_create(key_thread_one_connection, &thd->real_id, &connection_attrib handle_one_connection)) { // do some clear up // ignore the lock operations // 创建失败清理一下 thread_count--; connection_count--; delete thd; } }}
那么什么时候会进入if (connetion_count >= max_connections + 1 || abort_loop)这个if语句当中呢?就是有线程结束,并等待新的thd来的时候,看看线程结束的时候做了些什么:
// 线程结束时的回调bool one_thread_per_connection_end(thd, bool put_in_cache){ unlink_thd(thd); // delete thd if (put_in_cache) { // 关键函数cache_thread put_in_cache = cache_thread(); } // 如果已经获得新thd,就不结束了 if (put_in_cache){ return 0; } // 否则结束线程 my_thread_end(); mysql_cond_broadcast(&cond_thread_count); pthread_exit(0); return 0;}
其中的cache_thread是关键函数:
// 这个函数就是cache thread的关键,他的思路就是线程结束的时候,wait for cache_thread队列// 取出新的thd,运行之static bool cache_thread(){ // 判断是否到达cache 线程的阈值 if (cache_thread_count < thread_cache_size && !abort_loop && !kill_cache_threads){ cache_thread_count++; // 进入cache状态 // wait for the signal to relive the thread // 开始wait for新的thd装进cache_thread list // 注意这里用的是while而不是if,因为 // 1、cond_wait可能虚假唤醒,可能因为竞争,并没有真正获得新的thd // 2、若获取失败则再次等新的thd,总之就是尽可能的获得新的thd while (!abort_loop && !wake_thread && !kill_cached_threads){ mysql_cond_wait(&cond_thread_cache, &lock_thread_count); cached_thread_count--; // 此处没仔细看是什么逻辑,暂时跳过 if (kill_cached_threads){ mysql_cond_signal(&cond_flush_thread_cache); } // 发现有就绪的thd,获取之,然后执行 if (wake_thread){ THD* thd; wake_thread--; thd = thread_cache.get(); thd->thread_stack = (char*)&thd; thd->store_globals(); thd->mysys_var->abort = 0; thd->prior_thr_create_utime = thd->start_utime = my_micro_time(); // 返回1使得线程不会结束,跑新的thd了 return 1; } } } // 结束线程 return 0;}
分析cache thread思路完毕。
mysql线程池的实现原理浅析
标签:逻辑 art 个数 class tor max 如何 scheduler 空闲
小编还为您整理了以下内容,可能对您也有帮助:
线程池原理
线程池原理是处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。
扩展资料
应用范围:
1、需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。
因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
线程池工作原理
管理线程,当线程执行完当前任务,不会死掉而是 会从队列里面取
1.降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
2.提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
3.提高线程的可管理性。线程是稀缺资源,如果无*的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。
本文主要是围绕 ThreadPoolExecutor(线程池框架的核心类)的构造方法参数 展开:
1.corePoolSize
线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
2.maximumPoolSize
额外最大线程数。上面说到任务数足够多,且使用的是有界队列,如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,首先从队列里面取,如果队列里面的消息执行完毕,等下一定时间,额外线程自动销毁。
3.keepAliveTime
线程空闲时的存活时间。默认情况下,可以理解成额外最大线程数没活干了,额外线程线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
4.unit
keepAliveTime参数的时间单位。
5.workQueue
任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。
一般来说,这里的BlockingQueue有以下三种选择:
* SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无*的线程增长。
* LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无*的增长。
* ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
6.threadFactory
线程工厂,创建新线程时使用的线程工厂。
7.handler
任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:
* AbortPolicy :丢弃任务并抛出RejectedExecutionException异常,默认策略;
* CallerRunsPolicy :由调用execute方法的线程执行该任务;
* DiscardPolicy :丢弃任务,但是不抛出异常;
* DiscardOldestPolicy :丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
* 当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
总结下上诉参数:假设corePoolSize为10 ,maximumPoolSize为10,线程空闲时的存活时间为60s,队列采用的是有界队列ArrayBlockingQueue 设置阈值200,使 用拒绝策略 , 当前2000个任务提交过来 流程如下图
参数案例描述:
当前2000笔 任务进来,10个核心线程去处理,剩下的1990任务队列里面放200个。剩下的1790个任务。队列塞满会去创建10个额外线程和核心线程一起去 去执行剩下的1780个任务。 当还有剩下任务处理不了就会触发任务拒绝策略 。
当前220笔 任务进来,10个核心线程去处理,剩下的210任务队列里面放200个。剩下的10个任务。队列塞满会去创建10个额外线程 去执行队列放不下的任务。 当额外线程和核心线程处理完队列里面的队列。没有任务可执行时,额外线程会等待我们设置的keepAliveTime,还是没有任务的情况下,就会被回收了 。
以上是绝对理想的状况下。
由参数可知 核心线程 和额外线程值是相同的,额外线程被回收时间是0,采用的是无界队列。默认采用的拒绝策略为 AbortPolicy。分析得 核心线程和额外线程处理不过来得情况,会一直往队列里面放任务。
可能存在的问题:队列过大 导致内存溢出 OOM
当任务量足够大,超过队列。交由额外线程处理。就会创建过多线程 。
可能存在问题:特殊场景下,线程过多可能会导致系统奔溃。cpu负载过高。
1.具体解决方案 根据业务系统而定:
华瑞批量查证举例:定时任务CZJZRW001每隔2min 轮询一次 会从业务表verifycationTask 中 查询出待处理和处理中的状态的任务 根据表中的查证类型 分流到具体的 反欺诈异步查证 ,还款查证, 充值查证,贷款查证 。 具体查证根据处理结果更新verifycationTask表查证状态。处理成功 或者失败的定时任务无法再次轮询。这样就不需要考虑以上场景。使用线程池的情况下核心线程,额外线程处理不过来且队列已满使用DiscardPolicy拒绝不抛异常策略 ,即可满足该业务场景。类结构如下图
2.思路
可以实现 RejectedExecutionHandler接口 自定义拒绝策略 将被拒绝的任务信息缓存到磁盘,等待线程池负载较低 从磁盘读取重新提交到任务里面去执行
线程池工作原理
管理线程,当线程执行完当前任务,不会死掉而是 会从队列里面取
1.降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
2.提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
3.提高线程的可管理性。线程是稀缺资源,如果无*的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。
本文主要是围绕 ThreadPoolExecutor(线程池框架的核心类)的构造方法参数 展开:
1.corePoolSize
线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
2.maximumPoolSize
额外最大线程数。上面说到任务数足够多,且使用的是有界队列,如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,首先从队列里面取,如果队列里面的消息执行完毕,等下一定时间,额外线程自动销毁。
3.keepAliveTime
线程空闲时的存活时间。默认情况下,可以理解成额外最大线程数没活干了,额外线程线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
4.unit
keepAliveTime参数的时间单位。
5.workQueue
任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。
一般来说,这里的BlockingQueue有以下三种选择:
* SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无*的线程增长。
* LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无*的增长。
* ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
6.threadFactory
线程工厂,创建新线程时使用的线程工厂。
7.handler
任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:
* AbortPolicy :丢弃任务并抛出RejectedExecutionException异常,默认策略;
* CallerRunsPolicy :由调用execute方法的线程执行该任务;
* DiscardPolicy :丢弃任务,但是不抛出异常;
* DiscardOldestPolicy :丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
* 当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
总结下上诉参数:假设corePoolSize为10 ,maximumPoolSize为10,线程空闲时的存活时间为60s,队列采用的是有界队列ArrayBlockingQueue 设置阈值200,使 用拒绝策略 , 当前2000个任务提交过来 流程如下图
参数案例描述:
当前2000笔 任务进来,10个核心线程去处理,剩下的1990任务队列里面放200个。剩下的1790个任务。队列塞满会去创建10个额外线程和核心线程一起去 去执行剩下的1780个任务。 当还有剩下任务处理不了就会触发任务拒绝策略 。
当前220笔 任务进来,10个核心线程去处理,剩下的210任务队列里面放200个。剩下的10个任务。队列塞满会去创建10个额外线程 去执行队列放不下的任务。 当额外线程和核心线程处理完队列里面的队列。没有任务可执行时,额外线程会等待我们设置的keepAliveTime,还是没有任务的情况下,就会被回收了 。
以上是绝对理想的状况下。
由参数可知 核心线程 和额外线程值是相同的,额外线程被回收时间是0,采用的是无界队列。默认采用的拒绝策略为 AbortPolicy。分析得 核心线程和额外线程处理不过来得情况,会一直往队列里面放任务。
可能存在的问题:队列过大 导致内存溢出 OOM
当任务量足够大,超过队列。交由额外线程处理。就会创建过多线程 。
可能存在问题:特殊场景下,线程过多可能会导致系统奔溃。cpu负载过高。
1.具体解决方案 根据业务系统而定:
华瑞批量查证举例:定时任务CZJZRW001每隔2min 轮询一次 会从业务表verifycationTask 中 查询出待处理和处理中的状态的任务 根据表中的查证类型 分流到具体的 反欺诈异步查证 ,还款查证, 充值查证,贷款查证 。 具体查证根据处理结果更新verifycationTask表查证状态。处理成功 或者失败的定时任务无法再次轮询。这样就不需要考虑以上场景。使用线程池的情况下核心线程,额外线程处理不过来且队列已满使用DiscardPolicy拒绝不抛异常策略 ,即可满足该业务场景。类结构如下图
2.思路
可以实现 RejectedExecutionHandler接口 自定义拒绝策略 将被拒绝的任务信息缓存到磁盘,等待线程池负载较低 从磁盘读取重新提交到任务里面去执行
线程池原理
线程池的工作原理
当一个并发任务提交给线程池,线程池分配线程去执行任务的过程如下:
线程池执行所提交的任务过程主要有这样几个阶段:
(1)先判断线程池中核心线程池所有的线程是否都在执行任务。 如果不是,则新创建一个线程执行刚提交的任务,否则,核心线程池中所有的线程都在执行任务,则进入(2)
(2)判断当前阻塞队列是否已满,如果未满, 则将提交的任务放置在阻塞队列中;否则,则进入(3)
(3)判断线程池中所有的线程是否都在执行任务, 如果没有,则创建一个新的线程来执行任务,否则,则交给饱和策略进行处理
线程池的实现原理?
1.
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个...
2.
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以...
3.
maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满...
4.
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程...
线程池的实现原理?
1.
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个...
2.
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以...
3.
maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满...
4.
ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程...
ThreadLocal 实现原理是什么 & 有哪些引用类型及使用场景?
对于一个 ThreadLocal 对象,通常会有两个引用指向它:
key 是弱引用,当不存在外部强引用时,会被自动回收。而 value 是强引用,引用链如下
所以只有当 Thread 被回收,value 才会被回收,否则 value 将一直存在,但是让每个线程关闭,是不现实的。在线程池中,大部分线程会伴随着系统的整个周期,那么 value 可能会造成泄漏。
解决方法,在 ThreadLocalMap 进行 set(),get(),remove() 的时候,都进行清理:
真正回收 value 的是 expungeStaleEntry() 方法,在 remove 和 set 方法中都会调用这个方法。
ThreadLocal 为了避免内存泄露,不仅使用了弱引用维护 key ,还在每个操作上检查 key 是否被回收,进而再回收value。
1、强引用
2、软引用
3、弱引用
4、虚引用
合理使用线程池以及线程变量
背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
线程池概述
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
总体来说,线程池有如下的优势:
线程池的使用
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
ThreadPoolExecutor线程池有如下几种状态:
线程池提交一个任务时任务调度的主要步骤如下:
核心代码如下:
Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
Executors常用方法有以下几个:
Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端:
使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。
使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。
在工程实践中,通常使用下述公式来计算核心线程数:
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。
上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。
为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:
为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。
如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。
ThreadLocal线程变量概述
ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。
ThreadLocal的原理与实践
对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。
众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:
Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。
从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。
EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:
在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:
EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:
在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。
在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/bbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:
【问题1:权益领取失败分析】
在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxMole形式,Mole间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的mole 的init方法先执行)。
在应用中,权益领取接口的主入口为CommonXXApplyMole类,CommonXXApplyMole依赖XXSessionMole。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionMole的init方法会优先执行;而开发同学在CommonXXApplyMole类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionMole模块获取不到正确的会话id等信息而导致权益领取失败。
【问题2:脏数据分析】
权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionMole通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。
【解决方案】
在依赖注入框架入口处AbstractGate#visit(或在XXSessionMole中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。
若使用强引用类型,则threadlocal的引用链为:Thread ThreadLocal.ThreadLocalMap Entry[] Entry key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。
若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:
在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。
如果不执行清理操作,则可能会出现:
建议使用try...finally 进行清理。
我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。
在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S>supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:
总结
在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须*的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。
合理使用线程池以及线程变量
背景
随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。
在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。
线程池概述
线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。
总体来说,线程池有如下的优势:
线程池的使用
在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:
可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。
在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。
在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。
在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:
在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:
在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:
ThreadPoolExecutor线程池有如下几种状态:
线程池提交一个任务时任务调度的主要步骤如下:
核心代码如下:
Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:
Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。
在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。
核心部分代码如下:
其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。
Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。
Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。
在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。
具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:
Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。
Executors常用方法有以下几个:
Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端:
使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。
使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。
在工程实践中,通常使用下述公式来计算核心线程数:
nThreads=(w+c)/c*n*u=(w/c+1)*n*u
其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。
上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。
为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:
为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。
如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。
ThreadLocal线程变量概述
ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。
ThreadLocal的原理与实践
对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。
众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:
Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。
从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。
EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:
在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:
EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:
在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。
在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/bbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:
【问题1:权益领取失败分析】
在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxMole形式,Mole间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的mole 的init方法先执行)。
在应用中,权益领取接口的主入口为CommonXXApplyMole类,CommonXXApplyMole依赖XXSessionMole。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionMole的init方法会优先执行;而开发同学在CommonXXApplyMole类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionMole模块获取不到正确的会话id等信息而导致权益领取失败。
【问题2:脏数据分析】
权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionMole通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。
【解决方案】
在依赖注入框架入口处AbstractGate#visit(或在XXSessionMole中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。
若使用强引用类型,则threadlocal的引用链为:Thread ThreadLocal.ThreadLocalMap Entry[] Entry key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。
若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:
在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。
如果不执行清理操作,则可能会出现:
建议使用try...finally 进行清理。
我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。
在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S>supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:
总结
在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须*的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。
线程池的实现原理 Java线程池实现原理
1、java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。
2、线程池的几个主要参数的作用
corePoolSize: 规定线程池有几个线程(worker)在运行。
maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
unit: 生存时间对于的单位
workQueue: 存放任务的队列
threadFactory: 创建线程的工厂
handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。
阻塞队列和线程池原理
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
阻塞队列在jdk中有个专门的接口,BlockingQueue。但BlockingQueue的方法并不都是阻塞的方法:
add()插入元素,remove()拿取元素。add()往一个满的队列中插元素会插不进,会抛出异常;而remove()从空队列拿,也会抛出异常。
offer()和poll(),往一个满的队列,返回一个false;poll()往一个空队列中取元素,返回一个null
take()、put(),真正体现BlockingQueue的阻塞
take()往一个满的队列中插元素会阻塞;put()往一个空队列中取元素,也会阻塞
以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。
是一个用数组实现的有界阻塞队列。此队列按照先进先出的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
缓存系统的设计: 可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
多了tryTransfer和transfer方法,
(1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间的poll()方法时),transfer()可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer()会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
(2)tryTransfer方法
tryTransfer()是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer()的区别是,tryTransfer()无论消费者是否接收,方法立即返回。而transfer()是必须等到消费者消费了才返回。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
第三:提高线程的可管理性。线程是稀缺资源,如果无地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来;
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
keepAliveTime的时间单位
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断 所有没有正在执行任务的线程 。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
要想合理地配置线程池,就必须首先分析任务特性
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
性质不同的任务可以用不同规模的线程池分开处理。
CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。