多线程高并发编程(7) — Future源码剖析

一.观点

  A Future盘算的效果。 提供方式来检查盘算是否完成,守候其完成,并检索盘算效果。 效果只能在盘算完成后使用方式get举行检索,若有需要,壅闭,直到准备就绪。 作废由cancel方式执行。 提供其他方式来确界说务是否正常完成或被作废。 盘算完成后,不能作废盘算。 若是您想使用Future ,以便不能打消,但不提供可用的效果,则可以声明Future<?>表格的类型,并返回null作为基础义务的效果。 

public interface Future<V> {
    //实验作废执行此义务。若是义务已经完成,已经被作废或由于某些其他缘故原由而无法作废,则此实验将失败。
    //若是乐成,而且在挪用 cancel 时此义务尚未最先,则该义务永远无法运行。
    //若是义务已经最先,则 mayInterruptIfRunning 参数确定是否应中止执行该义务的线程以实验住手该义务。
    //mayInterruptIfRunning == true, 示意中止执行中的线程,false 示意让线程正常完成
    boolean cancel(boolean mayInterruptIfRunning);
    //若是此义务在正常完成之前被作废,则返回true。
    boolean isCancelled();
    //若是此义务完成,则返回true。完成可能是由于正常终止,异常或作废引起的,在所有这些情况下,此方式都将返回true。
    boolean isDone();
    //需要时守候盘算完成,然后检索其效果
    V get() throws InterruptedException, ExecutionException;
    //需要时最多守候给准时间以完成盘算,然后检索其效果(若是有)。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  多线程高并发编程(7) -- Future源码剖析

  Future是一个接口,提供了方式来检测当前的义务是否已经竣事,还可以守候义务竣事而且拿到一个效果,通过挪用Future的get()方式可以当义务竣事后返回一个效果值,若是事情没有竣事,则会壅闭当前线程,直到义务执行完毕;可以通过挪用cancel()方式来住手一个义务,若是义务已经住手,则cancel()方式会返回true;若是义务已经完成或者已经住手了或者这个义务无法住手,则cancel()会返回一个false。当一个义务被乐成住手后,他无法再次执行。isDone()和isCancel()方式可以判断当前事情是否完成和是否作废。  

  类图结构:

 多线程高并发编程(7) -- Future源码剖析

线性模型

  • ScheduledFuture:这个接口示意一个延时的行为可以被作废。通常一个安排好的future是准时义务SchedualedExecutorService的效果;
  • RunnableFuture: 这个接口同时继续Future接口和Runnable接口,在乐成执行run()方式后,可以通过Future接见执行效果;
  • ForkJoinTask:基于义务的抽象类,可以通过ForkJoinPool来执行。一个ForkJoinTask是类似于线程实体,然则相对于线程实体是轻量级的。大量的义务和子义务会被ForkJoinPool池中的真实线程挂起来,以某些使用限制为价值;
  • CompletableFuture:一个Future类是显示的完成,而且能被用作一个完成品级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或作废操作时,只有一个能够乐成;
  • RunnableScheduledFuture:执行延迟和周期性义务;在乐成执行run()方式后,可以通过Future接见执行效果;
  • FutureTask:可作废的异步盘算
    • 该类提供了一个Future的基本实现 ,具有启动和作废盘算的方式,查询盘算是否完整,并检索盘算效果。效果只能在盘算完成后才气检索; 若是盘算尚未完成,则get方式将阻止。一旦盘算完成,则无法重新启动或作废盘算(除非使用runAndReset()挪用盘算 );
    • A FutureTask可用于包装Callable或Runnable工具。 由于FutureTask实现Runnable ,一个FutureTask可以提交到一个Executor执行;
  • RecursiveTask:递归效果ForkJoinTask;
  • RecursiveAction:递归效果ForkJoinTask;

二.用法

   一个场景,我们要学习做饭,那么我们需要准备厨具和食材,厨具通过电子商务网购,食材去菜市场挑选。那么可以使用多线程来并发举行,即我们可以先网购下单,在守候快递员送货过来的这段时间去菜市场买食材,节省时间,提高效率。

  1. 直接开启线程,使用类继续Thread重写方式实现网购,join壅闭直到厨具到达才最先做饭。
    public class FutureTest {
        public static void main(String[] args) throws InterruptedException {
            long startTime = System.currentTimeMillis();
            OnlineShopping shopping = new OnlineShopping();
            shopping.start();
            Thread.sleep(2000);//守候送货执行完
            System.out.println("第二步:食材到位");
            shopping.join();//壅闭订单直到快递送到获得厨具
            System.out.println("第三步:最先厨艺");
            System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        static class OnlineShopping extends Thread {
            @Override
            public void run() {
                System.out.println("第一步:下单");
                System.out.println("第一步:守候送货");
                try {
                    Thread.sleep(5000);//送货中
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第一步:快递送到");
            }
        }
    }
    //======效果======
    第一步:下单
    第一步:守候送货
    第二步:食材到位
    第一步:快递送到
    第三步:最先厨艺
    总共用时:5003ms 
  2. 使用Future模式来完成上述操作,通过Callable返回效果来获取厨具,可以通过FutureTask天真地操作订单,由此可见,比继续Thread完成的订单,Future模式更具有天真性,
    public class FutureTest {
        public static void main(String[] args) throws Exception {
            long startTime = System.currentTimeMillis();
            Callable<String> shopping = () ->{
                System.out.println("第一步:下单");
                System.out.println("第一步:守候送货");
                Thread.sleep(5000);//快递员送货中
                System.out.println("第一步:快递送到");
                return "厨具到达";
            };
            FutureTask<String> task = new FutureTask<>(shopping);
            new Thread(task).start();
            Thread.sleep(2000);//保证下单操作执行到“守候送货”中
            System.out.println("第二步:食材到位");
            if (!task.isDone()) {  // 联系快递员,询问是否到货
                System.out.println("第三步:厨具还没到,心情好就等着(心情欠好就挪用cancel方式作废订单)");
            }
            String chuju = task.get();//获得厨具
            System.out.println("第三步:最先厨艺");
            System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms");
        }
    }
    //======效果======
    第一步:下单
    第一步:守候送货
    第二步:食材到位
    第三步:厨具还没到,心情好就等着(心情欠好就挪用cancel方式作废订单)
    第一步:快递送到
    第三步:最先厨艺
    总共用时:5048ms 

三.剖析

  使用Future模式的三部曲:

  1. 建立Callable重写call方式,把网购逻辑封装到call中,返回界说效果“厨具”;
    public interface Callable<V> {
        V call() throws Exception;
    }
  2. 建立FutureTask,把Callable实例放入FutureTask的组织方式中;
    public class FutureTask<V> implements RunnableFuture<V>{
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // 确保callable的可见性
        }
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // 确保callable的可见性
        }
    }
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }

     FutureTask的run方式:Callable的call是被FutureTask的run方式挪用的,不是异步运行的;

    public void run() {
            // 1. 若是 state !=  NEW 说明 run 方式已经运行过,直接 return
            // 2. 若是 state == NEW && CAS 竞争 设置 runner 失败,说明已经有其余线程在运行,直接 return
            // NEW 的状态由组织方式初始化,runner 是运行该 Callable 的线程
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;// 这里的callable是从组织方式内里传人的
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;// 标识符
                    try {
                        result = c.call();//获得效果
                        ran = true;
                    } catch (Throwable ex) {//异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)//乐成没有异常,设置返回值
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                //在状态设置之前,runner必须是非空的,以防止对run()的并发挪用
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                //为防止泄露中止,必须在空runner之后将状态设置为重复读
                int s = state;
                // 若是最终状态 >= INTERRUPTING,则处置中止
                // cancel 方式会通过参数 mayInterruptIfRunning 来设置 state 的值
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

     状态属性state:

        private volatile int state;//状态,volatile让状态可见性
        private static final int NEW          = 0;//组织方式建立时的状态
        private static final int COMPLETING   = 1;//这是一个中心态,完成时和泛起异常时有使用到
        private static final int NORMAL       = 2;//完成运行时的最终状态
        private static final int EXCEPTIONAL  = 3;//异常时的最终状态
        private static final int CANCELLED    = 4;//已作废
        private static final int INTERRUPTING = 5;//中止中
        private static final int INTERRUPTED  = 6;//已中止
        
        可能的 state 转换:
        NEW -> COMPLETING -> NORMAL
        NEW -> COMPLETING -> EXCEPTIONAL
        NEW -> CANCELLED
        NEW -> INTERRUPTING -> INTERRUPTED

    set设置返回值:

        private Object outcome;//通过get方式获得的返回值
        //设置返回值
        protected void set(V v) {
             // 这里为什么要用 CAS 由于可能会和 cancel 方式发生竞争。
            // 若是竞争失败,说明作废竞争乐成,在 cancel 方式负担叫醒的事情,以是直接跳过。
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 竞争乐成
                outcome = v;//outcome为返回效果
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态为NORMAL
                finishCompletion();
            }
        }

    setException运行时异常:

        protected void setException(Throwable t) {
            // 这里为什么要用 CAS 由于可能会和 cancel 方式发生竞争。
            // 若是竞争失败,说明作废竞争乐成,在 cancel 方式负担叫醒的事情,以是直接跳过。
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                // 竞争乐成
                outcome = t; // outcome 为一个 Throwable
                // 把最终状态改为 EXCEPTIONAL
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }

    finishCompletion:

        //删除当前线程并叫醒所有守候线程,挪用done(),并作废举行中的方式
        private void finishCompletion() {
            // assert state > COMPLETING;
            //从 waiters 末尾最先遍历,for 自旋直到 CAS 乐成。
            for (WaitNode q; (q = waiters) != null;) {
                // 使用 CAS 把 waiters 设置为 null,和 awaitDone 和 removeWatier 方式竞争
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    // 自旋叫醒所有线程
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);// 叫醒线程
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
            // 这个一个空方式,主要用于继续重写,这也是模板模式的体现
            done();
            callable = null;        // to reduce footprint
        }
        protected void done() { }
    
    
        //===========例子==============
        //ExecutorCompletionService 的作用就是把线程池的执行效果放到一个已完成行列中,利便获取执行效果,其内部主要通过一个 FutureTask 的实现类 QueueingFuture 来实现这个功效:
        private class QueueingFuture extends FutureTask<Void> {
                QueueingFuture(RunnableFuture<V> task) {
                    super(task, null);
                    this.task = task;
                }
                protected void done() { completionQueue.add(task); }//done方式是FutureTask方式的重写。FutureTask在完成时会执行done方式,把task放入已完成行列completionQueue。
                private final Future<V> task;
            }

    get获得返回效果:

        public V get() throws InterruptedException, ExecutionException {
            int s = state;//获得状态
            if (s <= COMPLETING)//状态未完成,把获取效果的线程放入守候链表,然后壅闭,直至被中止、完成或泛起异常。
                s = awaitDone(false, 0L);
            return report(s);//返回效果
        }
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;//用于计时
            WaitNode q = null;
            boolean queued = false;
            for (;;) {//自旋
                //若是已经被中止,则removeWaiter,抛出中止异常
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                int s = state;
                if (s > COMPLETING) {//task已经竣事
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                //马上就要竣事,则让出cpu
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                // 初始化 WaitNode
                else if (q == null)
                    q = new WaitNode();
                 // 是否已入队,没有则把WaitNode接到末尾
                else if (!queued)
                    // 和 finishCompletion 和 removeWaiter 竞争
                    // 1. finishCompletion竞争乐成,说明state已经 > COMPLETING则下次循环就会退出
                    // 2. removeWaiter竞争乐成,说明waiters变化了,下一次循环再次竞争
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                // 若是使用了计时,则判断是否超时,若是超时则移出WaitNode并立刻返回无需守候效果,否则壅闭 nanos
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    //壅闭,直到被叫醒(正常完成 || 异常 || 中止)
                    LockSupport.park(this);
            }
        }
        //凭据awaitDone返回状态返回效果或抛出异常
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)//正常
                return (V)x;
            if (s >= CANCELLED)//作废
                throw new CancellationException();
            // task 执行过程中泛起异常
            throw new ExecutionException((Throwable)x);
        }

    removeWaiter:

            /**
             * Tries to unlink a timed-out or interrupted wait node to avoid
             * accumulating garbage.  Internal nodes are simply unspliced
             * without CAS since it is harmless if they are traversed anyway
             * by releasers.  To avoid effects of unsplicing from already
             * removed nodes, the list is retraversed in case of an apparent
             * race.  This is slow when there are a lot of nodes, but we don't
             * expect lists to be long enough to outweigh higher-overhead
             * schemes.
             *实验作废链接超时或中止的守候节点以制止聚积垃圾。内部节点的拼接没有CAS,
             *由于这对释放者无论如何遍历都没有影响。 为了制止已删除节点节点未拼接的影响,
             *若是泛起显著的竞争,则重新遍历列表。 当节点许多时会很慢,然则我们不
             *期望列表足够长以抵消较高的开销设计。
             */
        private void removeWaiter(WaitNode node) {
            if (node != null) {
                node.thread = null;
                retry:
                for (;;) {          // restart on removeWaiter race
                    //遍历整个链表
                    for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                        s = q.next;
                        //把q看成前一个节点,遍历下一个节点
                        if (q.thread != null)
                            pred = q;
                         // q.thread == null && pred != null,示意当前节点不是第一个节点,是一个中心节点
                         // 这里没有使用 CAS,若是泛起多个线程同时遍历,前一个节点变为null,则重新重新遍历
                         // 为什么没有使用 CAS 由于作者的想法是这个链表不会太长,以是我们使用时不应该使这个链表太长
                         // 操作:把下一个节点连接到前一个节点的后面
                        else if (pred != null) {
                            pred.next = s;//把s连接到pred后面
                            if (pred.thread == null) // check for race
                                continue retry;
                        }
                        // q.thread == null && pred == null,示意第一个节点的 thread == null,
                        // 这里使用 CAS,由于可能多个线程在操作
                        // 操作:把下一个节点设置为末尾节点,若是竞争失败则重新重新遍历
                        else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                              q, s))
                            continue retry;
                    }
                    break;
                }
            }
        }
  3.  建立Thread,把FutureTask实例放入组织方式中,start开启线程

 参考:https://www.jianshu.com/p/414cc2f0002c

原创文章,作者:28x29新闻网,如若转载,请注明出处:https://www.28x29.com/archives/7443.html