跳到主要内容

20、Java JUC 源码分析 - FutureTask源码分析

使用

FutureTask<String> futureTask = new FutureTask(() -> "success");
new Thread(futureTask).start();
futureTask.get();

源码分析

FutureTask提供了两个构造方法,分别是传入一个Callable,和传入一个Runnable加返回值result。如果传入的是Runnable加返回值,那么会通过适配器RunnableAdapter将其包装成一个Callable。

public FutureTask(Runnable runnable, V result) {

 
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

通过调用Executors.callable方法包装Callable:

public static <T> Callable<T> callable(Runnable task, T result) {

 
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

RunnableAdapter实现了Callable接口,其call方法的返回值就是result:

static final class RunnableAdapter<T> implements Callable<T> {

 
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {

 
        this.task = task;
        this.result = result;
    }
    public T call() {

 
        task.run();
        return result;
    }
}

在一个FutureTask创建成功后,其默认状态state为NEW。在FutureTask中为state定义了7个状态,并且state被volatile修饰以保证可见性,这7个状态分别是:

  • NEW:新建状态(初始默认状态)
  • COMPLETING:正在完成中,这个状态表示任务已经执行完成,但是还没有设置结果(正常结果或者异常)
  • NORMAL:普通状态,状态变为COMPLETING并成功设置结果之后会变成这个状态
  • EXCEPTIONAL:抛出了异常,任务执行抛出了异常则会变为这个状态,和NORMAL一样,在变为此状态之前也有个COMPLETING中间状态
  • CANCELLED:被取消
  • INTERRUPTING:正在被中断中,表示准备中断线程,但是还未中断
  • INTERRUPTED:被中断

state可能的状态转换情况如下:

  • NEW -> COMPLETING -> NORMAL :新建->完成中(任务已经完成,但是结果还未保存)->普通状态(结果保存完成)
  • NEW -> COMPLETING -> EXCEPTIONAL :新建->完成中(任务抛出异常,但是异常还未保存)->异常状态(异常保存完成)
  • NEW -> CANCELLED :新建->取消
  • NEW -> INTERRUPTING -> INTERRUPTED:新建->中断中(准备中断,但还未中断)->完成中断(调用了interrupt方法)

FutureTask实现了Runnable接口,所以可以直接通过其run方法在当前线程执行任务,其run方法就是调用callable的call方法,call方法执行完毕之后,任务状态变为COMPLETING,并且call方法有一个返回值,正常执行得到返回值之后将其保存到outcome属性中,然后任务状态从COMPLETING变为NORMAL;如果任务执行抛出了异常, 那么任务状态也先变为COMPLETING,然后将异常对象设置到outcome属性,接着任务状态从COMPLETING变为EXCEPTIONAL。FutureTask的run方法如下:

public void run() {

 
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        //检查任务状态和执行线程,如果状态不是NEW,或者通过CAS设置当前线程为任务执行线程失败,那么直接返回
        return;
    try {

 
        Callable<V> c = callable;
        if (c != null && state == NEW) {

 
            //在此判断任务状态
            V result;
            boolean ran;
            try {

 
                //执行任务
                result = c.call();
                //表示任务执行成功
                ran = true;
            } catch (Throwable ex) {

 
                //抛出了异常,任务状态:NEW->COMPLETING->EXCEPTIONAL
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                //任务执行成功,任务状态:NEW->COMPLETING->NORMAL
                set(result);
        }
    } finally {

 
        //将runner设置为null,保证其它线程能够再次执行此任务
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            //线程被中断
            handlePossibleCancellationInterrupt(s);
    }
}

注:将FutureTask交给Thread去执行和将Runnable交给Thread去执行是一样的,Thread.start方法就是负责调用本地方法启动一个线程去异步执行提交的任务,差别就是FutureTask可以调用get方法获取任务执行的结果,而结果保存在outcome属性中。

run()方法的大体逻辑相对来说比较简单,就是调用Callable的call方法获取返回值,然后将返回值设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->NORMAL;如果call方法抛出了异常,那么将异常设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->EXCEPTIONAL。
任务执行失败的处理逻辑在setException方法中:

protected void setException(Throwable t) {

 
    //先将任务修改为COMPLETING状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

 
        //保存异常
        outcome = t;
        //将任务修改为EXCEPTIONAL状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

任务执行成功设置返回值是通过调用set方法实现的:

protected void set(V v) {

 
    //先将任务修改为COMPLETING状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

 
        //保存返回值
        outcome = v;
        //将任务修改为NORMAL状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

在setException和set方法中都体现了FutureTask任务的状态转化,不过我们注意到两个方法中都调用了一个finishCompletion方法,这个finishCompletion的逻辑如下:

private void finishCompletion() {

 
    for (WaitNode q; (q = waiters) != null;) {

 
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

 
            for (;;) {

 
                //循环唤醒waiters队列中的所有阻塞线程
                Thread t = q.thread;
                if (t != null) {

 
                    q.thread = null;
                    //唤醒阻塞的线程
                    LockSupport.unpark(t);
                }
                //唤醒了一个节点之后将其从链表中移除
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    //空方法
    done();
    callable = null;        // to reduce footprint
}

从状态的转换流程中可以看出,不论任务是正常完成还是抛出了异常,在FutureTask中都认为是任务完成。在finishCompletion方法中的主要逻辑是唤醒所有阻塞的线程,这些阻塞的线程是怎么来的呢?其实就是调用FutureTask的get方法生成的。FutureTask支持异步获取任务执行的结果,主要提供了两个方法get()和get(long timeout,TimeUnit unit),分别表示永久阻塞等待和超时阻塞等待,来看看get方法的实现:

public V get() throws InterruptedException, ExecutionException {

 
    int s = state;
    if (s <= COMPLETING)
        //阻塞线程,传入false表示不用超时阻塞
        s = awaitDone(false, 0L);
    //任务已经完成了,处于COMPLETING之后的状态
    return report(s);
}

get(long timeout,TimeUnit unit)方法的实现:

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {

 
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        //传入true表示需要超时阻塞
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

如果任务处于COMPLETING之后的状态,表示任务已经完成了(不论是正常完成还是抛出异常),否则进入超时阻塞等待,如果超时唤醒后发现任务还未完成,那么抛出TimeoutException异常。
我们注意到,永久阻塞和超时阻塞都是通过awaitDone方法实现的,该方法有两个入参,分别是:

  • timed:布尔型参数,表示是否需要超时阻塞
  • nanos:超时阻塞的时间
    那么我们进入awaitDone方法看看是如何实现的:
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {

 
    //计算线程阻塞的终止时间:当前时间+最大阻塞时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {

 
        if (Thread.interrupted()) {

 
            //如果线程被中断了,从阻塞队列中移除,抛出中断异常
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {

 
            //如果任务已经结束,那么返回当前任务状态,后续会根据任务状态解析结果
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            //如果任务正在完成中,那么只需要等待任务完成即可
            //这里使用yield释放cpu时间片,等待下一次cpu执行
            Thread.yield();
        else if (q == null)
            //如果q为null,那么新建一个WaitNode,q是awaitDone方法的局部变量,初始为null
            q = new WaitNode();
        else if (!queued)
            //如果q还没有入队,那么通过CAS将其入队(waiters相当于是单向链表的头结点)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {

 
            //如果需要超时等待,根据阻塞截止时间和当前时间计算需要阻塞的时间
            //因为此方法的逻辑中,排队节点的创建、入队、和阻塞等都是是通过for循环一次一次的进行推进
            //所以在阻塞之前要重新计算一下剩余需要阻塞的时间
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {

 
                //如果剩余阻塞时间小于等于0 ,说明阻塞时间已经过了,直接移除节点,然后返回任务状态
                removeWaiter(q);
                return state;
            }
            //超时阻塞
            LockSupport.parkNanos(this, nanos);
        }
        else
            //不带超时的阻塞
            LockSupport.park(this);
    }
}

一个线程要获取FutureTask任务的执行结果,如果任务还未完成,那么线程就需要被阻塞。首先线程会被封装成一个WaitNode对象,WaitNode是FutureTask的一个静态内部类,其结构很简单,就是包含一个线程对象(thread)和一个WaitNode对象(next):

static final class WaitNode {

 
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() {

  thread = Thread.currentThread(); }
}

FutureTask通过这个WaitNode构建了一个单向链表(waiters属性),所有通过获取FutureTask任务结果而阻塞的线程都排在这个单向链表中,每个WaitNode是通过CAS入队的,这个和AQS中的同步队列和条件队列基本是一样的逻辑。
awaitDone方法有个特点,WaitNode的创建、入队、阻塞,包括线程被唤醒之后各种处理的都是通过for循环自旋推进的。比如:

  • 第一次循环判断局部变量q为null,那么创建一个WaitNode
  • 第二次循环判断!queue为true,说明上一次创建的WaitNode还没有入队,那么WaitNode通过CAS入队,如果CAS失败,说明有多个线程在并发入队,留待下一次for循环再进行入队
  • 第三次循环判断是否需要超时阻塞,如果有超时,会重新计算一次剩余阻塞时间, 通过parkNanos方法进行阻塞;如果超时时间已经过了,则直接返回任务状态。如果不使用超时,那么直接通过park方法阻塞线程

由于通过for循环推进也需要消耗时间,所以在awaitDone方法开始的时候会先保存线程阻塞的终止时间,等待线程真正开始parkNanos阻塞的时候,会再计算一下剩余需要阻塞的时间,如果阻塞时间已经过了,那么直接返回任务状态。

可以看到,awaitDone方法的返回值是任务的状态state,最终还会调用report方法根据任务的状态返回任务的最终结果。而awaitDone方法返回有几种情况:

  • 线程被中断:线程醒来后会在下次循环中判断发现线程被中断,进而抛出InterruptedException
  • 线程被unpark唤醒,然后发现任务已经完成:返回任务当时的状态(>COMPLETING)
  • 线程超时唤醒:线程等待超时时间到了也没有等到任务完成,方法返回后会在外层get方法里判断如果任务状态小于等于COMPLETING,表示是超时唤醒,那么抛出TimeoutException

在分析了awaitDone方法之后,我们就能明白在run方法中任务完成之后,在set方法保存结果之后调用finishCompletion方法的作用了,就是循环唤醒waiters链表中的所有线程,线程被唤醒后发现任务状态大于COMPLETING,那么awaitDone方法得以返回。awaitDone方法返回的是任务的状态,还需要在report方法中决定get方法最终的返回值:

private V report(int s) throws ExecutionException {

 
    Object x = outcome;
    if (s == NORMAL)
        //任务正常完成,返回结果,其它都是异常结束
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

最后我们还需要看一下cancel方法的逻辑:

public boolean cancel(boolean mayInterruptIfRunning) {

 
    //mayInterruptIfRunning表示是否中断任务线程
    //如果需要中断线程,那么需要把任务状态先调整为一个中间状态:INTERRUPTING
    //否则直接将状态调整为CANCELLED
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //cas失败了,说明同时有其它线程在操作
        return false;
    try {

     // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {

 
            try {

 
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {

  // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {

 
        //触发任务完成的逻辑,唤醒waiters链表中的所有阻塞线程
        finishCompletion();
    }
    return true;
}

之后NEW状态下的任务才能被cancel,该方法同时提供了一个布尔参数mayInterruptIfRunning,表示是否需要中断执行任务的线程,如果需要中断线程,那么任务状态会先调整为一个中间状态:INTERRUPTING,之后调用线程的interrupt方法后再将状态调整为INTERRUPTED,这个中间状态和COMPLETING类似;如果不需要中断线程,那么直接将任务修改为CANCELLED状态,如果CAS修改失败,那么直接返回,不再执行后面的逻辑,因为这代表有多个线程在并发cancel,只需要一个线程处理这个逻辑。
如果CAS修改状态成功,那么可以执行后面cancel
从cancel的逻辑中可以发现,通过FutureTask提供的cancel方法,如果传入的mayInterruptIfRunning为false,那么对于任务线程本身并不会做什么操作,而只是将状态修改为CANCELLED状态,然后调用finishCompletion方法唤醒阻塞线程;即使是入参为true,那么也是中断线程,如果任务没有响应中断,那么任务也不会退出,即使任务已经是CANCELED状态。

总结

FutureTask提供了两种构建方式,分别是传入Callable和传入Runnable,如果传入的是Runnable,也会通过适配器将其包装为一个Callable,FutureTask间接实现了Runnable接口,将其提交给Thread之后执行的就是其run方法,在FutureTask的run方法中调用的是Callable的call方法并且获取结果,对于同一个FutureTask,同时只能有一个线程执行该任务,这点体现在Thread类型的runner属性上,在run方法执行之前,会通过CAS将runner以null为期望值修改为当前线程,修改成功才会进入后续的逻辑。
FutureTask中对于任务规定了7种状态,任务初始默认为NEW状态,大体来说FutureTask中的任务可能会:正常执行结束、任务抛出异常结束、被取消、被中断,这些行为也反映在了定义的7种状态中。
任务在FutureTask中,正常完成或者抛出异常都算是任务完成,需要保存任务结果,如果任务正常完成,那么结果就是Callable的call方法的返回值;如果抛出异常,那么结果就是异常对象。当任务完成之后需要唤醒所有由于调用get方法获取结果而阻塞的线程,这些线程保存在一个通过WaitNode构建的单向链表中,唤醒的时候从链表头节点(waiters)开始依次unpark即可。
通过get方法获取任务执行结果而阻塞的线程,会在awaitDone方法中完成WaitNode的创建、入队和阻塞等操作,阻塞分为超时阻塞和永久阻塞,如果是超时阻塞,那么线程被唤醒的方式有中断线程、达到超时时间、unpark;如果是永久阻塞,那么线程被唤醒的方式有中断线程和unpark。当线程被唤醒后,会返回当前任务的状态,或者抛出中断异常,返回任务状态state之后,会通过report方法决定get方法最终返回结果,如果任务是正常结束的,那么返回结果值,否则会抛出相应的异常。
最后还需要注意,FutureTask的cancel方法并不能保证任务线程立即退出,不过无论如何都会唤醒阻塞线程。当然这点在线程池中也一样,即使调用了线程池的shutdownNow方法,也不能保证工作线程能够立即退出,这个要取决于任务如何响应中断请求,如果要强制结束一个线程,那么可以调用Thread类的stop方法,虽然该方法不建议调用,但是这个方法在处理一些顽固"僵死"线程时很有用。