并发07–线程池及Executor框架

一、JAVA中的线程池

线程池的实现原理及流程如下图所示:

并发07--线程池及Executor框架

 

 

   如上图所示,当一个线程提交到线程池时(execute()或submit()),先判断焦点线程数(corePoolSize)是否已满,若是未满,则直接建立线程执行义务;若是已满,则判断行列(BlockingQueue)是否已满,若是未满,则将线程添加到行列中;若是已满,则判断线程池(maximumPoolSize)是否已满,若是未满,则建立线程池执行义务;若是线程池已满,则交给饱和计谋(RejectedExecutionHandler.rejectExcution())来处置。

  可以看下线程池ThreadPoolExecutor的全参组织函数源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

对其入参释义如下:

参数 形貌 作用
coolPoolSize 线程焦点线程数 当一个义务提交到线程池时,线程池会建立一个线程来执行义务,纵然其他的焦点线程足够执行新义务,也会建立线程,直到需要执行的义务数大于焦点线程数后才不再建立;若是线程池先挪用了preStartAllCoreThread()方式,则会先启动所有焦点线程。
maximumPoolSize 线程池最大线程数 若是行列满了,而且已建立的线程数小于该值,则会建立新的线程执行义务。这里需要说明一点,若是使用的行列时无界行列,那么该值无用。
keepAliveTime 存活时间 当线程池中线程跨越超时时间没有新的义务进入,则住手该线程;只会住手多于焦点线程数的那几个线程。
unit 线程存活的时间单元 可以有天、小时、分钟、秒、毫秒、玄妙、纳秒
workQueue 义务行列

用于保留守候执行义务的壅闭行列。可以选择如下几个行列:数组结构的有界行列ArrayBlockingQueue、链表效果的有界行列LinkedBlockingQueue、不存储元素的壅闭行列SynchronousQueue、一个具有优先级的无界壅闭行列PriortyBlockingQueue

threadFactory 建立线程的工厂

可以通过工厂给每个线程建立更有意义的名字。使用Guava提供的ThreadFactoryBuilder可以快速的给线程池里的线程建立有意义的名字,代码如下

new ThreadFactoryBuilder().setNameFormat(“aaaaaaaa”).build();

handler 包和计谋

当行列和线程都满了,说明线程池处于饱和状态,那么必须接纳一种计谋来处置新提交的义务。

AbortPolicy(默认),示意无法处置新义务时抛出异常。

CallerRunsPolicy:只有挪用者所在线程来运行

DiscardOldestPolicy:抛弃行列里最近的一个义务,并执行当前义务

DiscardPolicy:不处置,直接抛弃

  上面说到,向线程池提交义务有两种方式,分别是execute()和submit(),两者的区别主要是execute()提交的是不需要有返回值的义务,而submit提交的是需要有返回值的义务,而且submit()会返回一个Furure工具,而且可以使用future.get()方式获取返回值,而且get方式会壅闭,直到有返回值。

  线程池的关闭有shutdown()和shutdownNow两个方式,他们的原理是遍历线程池中的事情线程,然后逐个挪用interrupt方式来中止线程,以是无法中止的线程可能永远无法终止;然则二者也有区别,shutdownNow是将线程池的状态设置为STOP,然后实验住手所有正在执行或者暂停的线程,并返回守候执行义务列表;而shutdown只是将线程池的状态设置成SHUTDOWN,然后中止所有没有正在执行的义务。当挪用这两个方式中的任何一个后,isShutdown方式就会返回true,当所有义务都已经关闭后,挪用isTerminaed方式会返回true。

  使用线程池时,需要从义务的性子(IO密集型照样CPU密集型或是混合型)、义务的优先级、义务的执行时常、义务的依赖性(是否依赖其他系统资源,如数据库毗邻等)来综合判断,好比说,CPU密集型,就可以就可以设置N+1个线程个数,其中N为CPU核数,若是是IO密集型,则可以设置2*N个线程数;若是是混合型的义务,可以将其拆分成IO密集型和CPU密集型,然则若是两个义务的执行时间相差较大,则没有需要举行拆分;优先级差别的义务可以使用优先级行列PriortyBlockingQueue来处置;依赖数据出等其它资源的线程池,好比说依赖数据库,那么就可以加大线程数目,由于在守候sql执行的时刻,线程是处于空闲状态;另外,最好使用有界行列,由于无界行列,由于有界行列可以增添系统的稳定性和预警能力。

  对于线程的监控,另有以下几个方式可以使用:

方式 形貌
taskCount() 线程池需要执行的义务数目
completedTskCount 线程池运行历程中已经执行完毕的义务数目
IarestPoolSize 线程池中曾经建立过的最大线程数
getPoolSize 线程池的线程数目
getActiveCount 获取流动的线程数

二、Exector框架

   在java中,是用线程来异步执行义务,java线程的建立与销毁需要一定的开销。若是我们为每一个义务建立一个线程的话,这些线程就会消耗大量的盘算资源,会使处于高负荷的应用溃逃。

  在HotSpot虚拟机中,JAVA线程被一对一的映射为内陆操作系统线程。JAVA线程启动时会建立一个内陆操作系统线程,当该JAVA线程终止时,这个操作系统线程也会被收回,操作系统会挪用多有线程并将他们分配给可用的CPU。

并发07--线程池及Executor框架

 

   Executor框架的两级调剂模子如上图所示,应用程序通过Executor控制上层的调剂,而下层的挪用由操作系统内核控制,将线程映射到硬件处置器上,下层的挪用不受应用程序的控制。

   关于Executor的组成部分如下所示:

元素 形貌
义务 包罗被执行义务需要实现的接口Runnable和Callable接口
义务的执行 包罗义务执行机制的焦点接口Executor,以及继续自Executor的ExecutorService接口。Executor接口有两个要害的实现类实现了ExecutorService接口:ThreadPoolExecutor和ScheduledThreadPoolExecutor
异步盘算的效果 包罗接口Future和实现Future接口的FurureTask类

  Executor框架使用示意图如下:

并发07--线程池及Executor框架

 

 

   如上图所示,主线程首先建立实现Runnable或Callable接口的义务工具,然后把义务工具提交给ExecutorService执行,若是使用的是submit提交,执行完毕后将返回一个实现Future接口的工具,最后,主线程可以执行FutureTask.get()方式来获取返回值;主线程也可以挪用FutureTask.cancel()方式来作废此义务的执行。

  Executor框架的成员如下:

成员 形貌 子类 形貌
ThreadPoolExecutor

通常使用工厂类Executors来建立,Executors可以建立三种类型的ThreadPoolExecutor

牢固线程数的FixedThreadPool

适用于为了知足资源管理的需求,而需要限制当前线程数目的应用场景,它适用于负载对照重的应用。
单一线程的SingleThreadPool 适用于需要保证顺序的执行各个义务,而且在随便时间点都不会有多个线程流动的场景。
凭据需要建立线程的CacheThreadPool 这是一个无界的线程池,适用于执行许多短期异步义务的小程序,或者是负载对照轻的服务器。
ScheduledThreadPoolExecutor 通常使用工厂类Executors建立,Executors可以建立两种类型的ScheduledThreadPoolExecutor 包罗若干线程的ScheduledThreadPoolExecutor 适用于需要多个后台线程执行周期义务,同时为了知足资源管理的需求而需要限制后台线程数目的应用场景。
只包罗一个线程的SingleThreadScheduledExecutor                         适用于需要单个后台线程执行周期义务,同时需要保证顺序的执行各个义务的场景。                                               
ForkJoinsPool

 newWorkStealingPool适合使用在很耗时的操作,然则newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,然则都是在统一的一个Executors类中实现,由于能够合理的使用CPU举行对义务操作(并行操作),以是适合使用在很耗时的义务中 

   
Future Future接口和实现了该接口的FutureTask类来示意异步盘算的效果    
Runnable和Callable接口

Runnable和Callable接口的实现类,都可以被ThreadPoolExecutor、ScheduledThreadPool、ForkJoinThred执行;除了可以自己实现Callable接口外,我们还可以使用工厂类Executors来把一个Runnable包装成一个Callable

   

 

ThreadPoolExecutor详解

xeus-clickhouse: Jupyter 的 ClickHouse 内核

1、ThreadPoolExecutor

  (1)FixedThreadPool

  组织函数如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

  组织函数中,焦点线程数和最大线程数一致,keepAliveTime为0,行列使用的是无界壅闭行列LinkedBlockingQueue(最大值是Integer.MAX_VALUE);

  焦点线程数和最大线程数保持一致,解释:若是行列满了之后,不会再建立新的线程;

  keepAliveTime为0,解释:若是运行线程数大于焦点线程数时,若是线程执行完毕,空闲线程马上被终止;

  使用无界壅闭行列,解释:当运行线程到达焦点线程数时,不会再建立线程,只会将义务加入壅闭行列;因此最大线程数参数无效;因此keepAliveTime参数无效;且不会拒绝义务(既不会执行包和计谋)

  (2)SingleThreadExecutor

  组织函数如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

  组织函数中,焦点线程数和最大线程数均为1,keepAliveTime为0,行列使用的是无界壅闭行列LinkedBlockingQueue(最大值是Integer.MAX_VALUE)

  除了牢固了焦点线程数和最大线程数为1外,其余的参数均与FixedThreadPool一致,那么就是只有一个线程会频频循环从壅闭行列中获取义务执行

  (3)CacheThreadPool

  组织函数如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  组织函数中,焦点线程数为0,最大线程数为Integer.MAX_VALUE,意味着无界,keepAliveTime为60秒,壅闭行列使用没有存储空间的SynchronousQueue

  焦点线程数为0,最大线程数为无界,解释:只要行列满了,就会建立新的线程放入线程池

  使用没有存储空间的SynchronousQueue解释:线程提交的速率高于线程被消费的速率,那么线程会被不停的建立,最终会由于线程建立过多而耗尽CPU和内存资源

2、ScheduledThreadPoolExecutor

  ScheduledThreadPoolExecutor的运行机制如下:

   (1)当挪用ScheduledTreadPoolExecutor的scheduleAtFixedRate()方式或者scheduleWithFixedDelay()方式时会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask

  (2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行义务。

  ScheduledFutureTask主要包罗以下三个成员变量

成员变量 形貌
long time 示意这个义务要被执行的时间
long sequenceNumber 示意该义务被添加到ScheduledThreadPoolExecutor中的序号
long period 示意义务执行的距离周期

  DelayQueue封装了一个PriorityQueue,当添加义务时,这个PriorityQueue会对行列中的ScheduledFutureTask举行排序,time最小的在最前面(最先被执行),若是time一致,就对照sequenceNumber,sequenceNumber小的排在前面。

  当线程执行义务时,先从DelayQueue行列中获取已经到期的义务(time大于当前时间),然后执行该义务,执行完毕后,凭据义务的执行周期,修改义务下次的执行时间time,并重新将义务添加到DelayQueue

 

 

  FutureTask详解

  Future接口和实现该接口的FutureTask类,代表异步盘算的效果。

  FutureTask的使用方式是将其交给Executor执行,也可以通过ExecutorService.submit()方式返回一个FutureTask,然后执行FutureTask.get()方式或FutureTask.cancel()方式,除此之外,还可以然则使用FutureTask。

  FutureTask有三种状态:未启动(FutureTask.run()没有被执行之前的状态)、已启动(FutureTask.run()方式执行历程中)、已完成(FutureTask.run()方式执行完成或被作废),这三种状态的流转如下图所示:

 并发07--线程池及Executor框架

  FutureTask的实现是基于AQS(AbstractQueuedSynchrouizer)来实现的,之前已经说过,每一个基于AQS实现的同步器都市至少包罗一个acquire操作和至少一个release操作。AQS被作为模板方式模式的基础类提供给FutureTask的内部子类Sync实现了AQS的tryAcquireShared(int)方式和tryReleaseShared(int)方式,Sync通过这两个方式来检查和更新同步状态。

  FutureTask涉及示意图如下图所示:

 并发07--线程池及Executor框架

  如上图所示,FutureTask.get()方式会挪用AQS的acquireSharedInterruptibly(int)方式,该方式首先会回调在子类Sync中的tryAcquireShared()方式来判断acquire操作是否乐成(state状态状态是否为执行完成RAN或作废状态CANCELED&runner不为null),若是乐成则get()方式马上返回,若是失败则到线程守候行列中去守候其他线程执行release操作;当其他线程执行release操作(好比FutureTask.run()或FutureTask.cancel())叫醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将脱离线程守候行列并叫醒它的后继线程。

  Run方式执行历程如下:

  执行在组织函数中指定的义务(Callable.call()),然后以原子方式来更新状态(挪用AQS.compareAndSetState(int expect, int update),设置state的状态为RAN),若是这个原子操作乐成,就设置代表盘算效果的变量result的值为Callable.call()的返回值,然后挪用AQS.release(int)。

  AQS.rease首先会挪用子类Sync中实现的tryReleaseShared方式来执行release操作(设置运行义务的线程为null,然后返回false),然后叫醒守候行列中的第一个线程。

  最后挪用Future.done()方式。

 

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/18716.html