logo头像

From zero to HERO

Java 中的 FutureTask

1. 前言

上一文简单介绍了 Java 中的 Future 接口,留了一个坑,今天就来补一补这个坑。Future 的实现非常多,而 java.util.concurrent.FutureTask 是最经常被提及的一个。今天我们来了解一下这个实现。

2. FutureTask

从上面可以看出 FutureTask 既有 Runnable 的特点又有 Future 的特点。 可以看出设计者就是通过在线程周期中去进行异步计算并对异步计算进行状态控制和结果获取。而且我们上一文在对 Future 异步计算的不同状态在 FutureTask 中使用了状态机来进行状态描述:

    /** Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

从源码注释中也注明了 FutureTask 的状态转换流程,简单画了一张图来更加清晰的描述其中的关系:

上面的状态流程贯穿 FutureTask 的整个生命周期。接下来来对这些状态的流转进行一些分析。

3. FutureTask的状态

3.1 NEW

该状态就是 FutureTask 利用构造初始化的状态。FutureTask 共有两个构造函数,一个是 Callable 作为参数的构造函数;另一个则是 Runnable 和泛型结果容器 result 作为参数的构造函数,其中 Runnable 最终也被转成了 CallableNEW 状态干了三件事:

  • 初始化 FutureTask
  • 初始化执行任务的逻辑线程 Callable
  • 将当前状态设置为 NEW

3.2 CANCELLED

这个其实在讲Future 接口时已经说了,将计算取消意味着异步计算生命周期的结束。详情可以看上一篇文章相关的说明。但是我们还是想来看看是如何取消的:

    public boolean cancel(boolean mayInterruptIfRunning) {
        //如果正处于NEW状态,希望请求正在运行也希望中断就设置为INTERRUTPTING,否则直接设置CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { 
                    // 更新到最终的打断状态
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 唤醒等待线程 来处理结束的状态
            finishCompletion();
        }
        return true;
    }

这里用了 CAS 原子操作来尝试进行取消。当前如果是 NEW 状态然后结合另一个策略参数 mayInterruptIfRunning 来看看是不是正在中断或者已经取消,决定是否进行取消操作。如果允许运行时中断首先将状态更新为 INTERRUPTING 状态,然后线程中断的会把状态更新为 INTERRUPTED

3.3 COMPLETING

正在完成中,开始我以为是任务正在进行,然而我错了,该状态意思是计算已经完毕,但是没有暴露出去,而且正在设置给暴露的 outcome 变量。那么 RUNNING 状态哪里去了?

The run state transitions to a terminal state only in methods set,setException, and cancel.

这是相关的注释说明,RUN 状态仅仅是所有的 set 方法和 cancel 时的一个过度状态。其实想想也对运行状态如果不变其实也没有什么需要我们关心的。isDone() 方法说明了一切,只要不是 NEW 状态就任务任务完成了,但是没有结束

3.4 NORMAL

当状态由 NEW 转为 COMPLETING (这又是一个CAS 操作)后,计算结果暴露出去赋值给 outcome ,然后使用自旋锁去不停向等待队列发出已经计算完毕的信号。有个地方非常有趣,作者的结束逻辑写的非常巧妙:

   private void finishCompletion() {
       // 已经 断言当前状态 肯定是已经开始执行任务了,即不是初始化 NEW 状态
        // assert state > COMPLETING;
       // 当前有线程挂起在等着拿结果
        for (WaitNode q; (q = waiters) != null;) {
            // 抢占线程 这里跟 for 结合的很巧妙 直接设置null 简单实用
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 不停的自旋 ,当然 LockSupport和Thread. interrupted搭配必须要自旋
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    // 优化回收
                    q.next = null; // unlink to help gc
                    // 赋值以快速获取线程快速拿到结果  这里也很巧妙
                    q = next;
                }
                break;
            }
        }
        // 空方法 你可以覆写来做一些记录
        done();

        callable = null;        // to reduce footprint
    }

3.5 EXCEPTIONAL

当抛出中断异常或者其它异常时发出设置异常状态,没有太多可说的。

3.6 INTERRUPTING

其它线程在得知当前的状态为 INTERRUPTING 时,通过 Thread.yield 让出当前的 CPU 时间片,并重新就绪竞争 CPU 调度权。就像发现某件事情正在处理,我们先出去重新等待,等人家处理完我们再来试一试一样。

3.7 INTERRUPTED

参见对 CANCELLED 的分析。

4. FutureTask 如何运作

FutureTask 除了控制状态外,其他都是根据状态来进行判断进而执行具体的策略。我们实际用到的有以下两个方法。

4.1 run

异步任务主要在该方法中进行计算,记住计算是另外一个线程中进行计算的。

    public void run() {
      // 如果当前不是NEW,或者当前任务已经有了其他执行线程执行 就不再重复 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
           // 在另一个线程中进行计算 并记录结果 处理异常
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            // runner 用来执行 callable 并进行 cas  
            // 状态被设置后 runner 被设置为null
            // 防止 run 方法并发
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            // 设置 runner 为 null 后需要检查一下状态 防止泄露中断
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

4.2 get

get 方法就是一个等待任务结果的过程 核心方法为 awaitDone, 当异步任务执行时线程会挂起要么直接等到任务完成,要么直接等到超时放弃。具体看下面的源码分析:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        // 如果当前线程被中断就不停尝试移除等待队列
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 完成 取消或者发生异常 直接返回 不再等待
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 如果还在完成中 就先让出调度继续等待  yield 效率要高 它会继续抢占调度来尝试
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
             // 如果刚刚进入等待状态就 初始化一个等待队列
            else if (q == null)
                q = new WaitNode();
            // 尝试将没有入队的等待线程加入等待队列  基于cas 
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                            q.next = waiters, q);
            // 处理超时逻辑                
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
            // 不设置超时将一直阻塞到当前等待结果线程
                LockSupport.park(this);
        }
    }

本文对FutureTask 在异步计算进行状态控制、运行、获取结果进行了分析,限于我个人能力有限,如果有不妥之处还希望多多指教。

评论系统未开启,无法评论!