
30、Java JUC源码分析 - 线程池-Exchanger




public class Hello {
    public static void main(String[] args) throws InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
        final Exchanger<String> presents = new Exchanger<String>();

        Thread boy = new Thread(new Runnable() {

            public void run() {
                System.out.println(Thread.currentThread()+ "送美女一朵:鲜花");
                try {
                    String gift = presents.exchange("flower");
                    System.out.println(Thread.currentThread() + "获得美女赠送的:" + gift);
                } catch (InterruptedException e) {

        Thread girl = new Thread(new Runnable() {

            public void run() {
                System.out.println(Thread.currentThread()+ "送哥们一个kiss");
                try {
                    String gift = presents.exchange("kiss");
                    System.out.println(Thread.currentThread() + "获得帅哥赠送的:" + gift);
                } catch (InterruptedException e) {








The basic idea is to maintain a "slot", which is a reference to
a Node containing both an Item to offer and a "hole" waiting to
get filled in.  If an incoming "occupying" thread sees that the
slot is null, it CAS'es (compareAndSets) a Node there and waits
for another to invoke exchange.  That second "fulfilling" thread
sees that the slot is non-null, and so CASes it back to null,
also exchanging items by CASing the hole, plus waking up the
occupying thread if it is blocked.  In each case CAS'es may
fail because a slot at first appears non-null but is null upon
CAS, or vice-versa.  So threads may need to retry these



/** cup个数,控制slot数量和自旋 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();

/** slot数量 */
private static final int CAPACITY = 32;

 * slot最大用到多少
private static final int FULL =
    Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);

/** spin次数 */
private static final int SPINS = (NCPU == 1) ? 0 : 2000;

/** 超时exchange在park前的自旋次数 */
private static final int TIMED_SPINS = SPINS / 20;

/** tryCancel时更新,表示取消 */
private static final Object CANCEL = new Object();

/** 代表null入参或exchange返回null */
private static final Object NULL_ITEM = new Object();

/** 要交换数据的节点 */
private static final class Node extends AtomicReference<Object> {
    /** 创建该Node的线程要交换的数据 */
    public final Object item;

    /** 绑定的线程 */
    public volatile Thread waiter;

     * Creates node with given item and empty hole.
     * @param item the item
    public Node(Object item) {
        this.item = item;

 * slot(坑位)就是数据交换的地方,用了缓存行填充,避免伪共享
private static final class Slot extends AtomicReference<Object> {
    // Improve likelihood of isolation on <= 64 byte cache lines
    long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;

 * Slot 数组
private volatile Slot[] arena = new Slot[CAPACITY];

 * slot数组最大有几个可用
private final AtomicInteger max = new AtomicInteger();

/** 空构造 */
public Exchanger() {



public V exchange(V x) throws InterruptedException {
    if (!Thread.interrupted()) {
        Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
        if (v == NULL_ITEM)
            return null;
        if (v != CANCEL)
            return (V)v;
        Thread.interrupted(); // Clear interrupt status on IE throw
    throw new InterruptedException();

public V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    if (!Thread.interrupted()) {
        Object v = doExchange((x == null) ? NULL_ITEM : x,
                              true, unit.toNanos(timeout));
        if (v == NULL_ITEM)
            return null;
        if (v != CANCEL)
            return (V)v;
        if (!Thread.interrupted()) //线程没有被中断抛超时异常
            throw new TimeoutException();
    throw new InterruptedException();

1、 交换到null;
2、 正常值;
3、 被中断,抛异常;


/** 实现交换的方法,timed和nanos控制超时 */
private Object doExchange(Object item, boolean timed, long nanos) {
    Node me = new Node(item);                 // 创建node
    int index = hashIndex();                  // hasIndex近似对max的mode操作,获取slot的index
    int fails = 0;                            // cas失败次数

    for (;;) {
        Object y;                             // Contents of current slot
        Slot slot = arena[index]; //arena并没有初始化,所以slot用到的时候需要create
        if (slot == null)                     // 如果slot为null,那就创建一个
            createSlot(index);                // Continue loop to reread
        else if ((y = slot.get()) != null &&  // 如果某个slot不会null,那就说明已经有其他线程占用在等待交换
                 slot.compareAndSet(y, null)) { //先cas设置为null,防止其他线程
            Node you = (Node)y;               // Transfer item
            if (you.compareAndSet(null, item)) { 
                LockSupport.unpark(you.waiter); //交换自己的值,然后唤醒节点等待的线程
                return you.item; //返回交换后的值
            }                                 // Else cancelled; continue
        else if (y == null &&                 // 如果slot还没有被占用,那就占用
                 slot.compareAndSet(null, me)) {
            if (index == 0)                   // 在0 slot上阻塞
                return timed ?
                    awaitNanos(me, slot, nanos) :
                    await(me, slot);
            Object v = spinWait(me, slot);    // Spin wait for non-0
            if (v != CANCEL)
                return v;
            me = new Node(item);              // 丢弃之前的cancel节点,新建一个
            int m = max.get();
            if (m > (index >>>= 1))           // 有可能是max太大了,减少下
                max.compareAndSet(m, m - 1);  
        else if (++fails > 1) {               // 如果在一个index slot上2次都失败
            int m = max.get();
            if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //失败3次,那就扩大max,index换个位置试试
                index = m + 1;                // Grow on 3rd failed slot
            else if (--index < 0)
                index = m;                    // Circularly traverse


private final int hashIndex() {
    long id = Thread.currentThread().getId();
    int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;

    int m = max.get();
    int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
                 ((0x000001f8 >>> m) & 2) | // The constants hold
                 ((0xffff00f2 >>> m) & 1)); // a lookup table
    int index;
    while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
        hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
    return index;



/** 创建一个slot */
private void createSlot(int index) {
    // Create slot outside of lock to narrow sync region
    Slot newSlot = new Slot();
    Slot[] a = arena;
    synchronized (a) {
        if (a[index] == null)
            a[index] = newSlot;


/** slot 0上的阻塞 */
private static Object await(Node node, Slot slot) {
    Thread w = Thread.currentThread();
    int spins = SPINS; //跟awaitNanos不一样
    for (;;) {
        Object v = node.get();
        if (v != null) //这里判断node.get,如果有其他线程交换会cas这个值
            return v;
        else if (spins > 0)                 // 先自旋
        else if (node.waiter == null)       // 自旋次数够了后,如果节点的等待线程null,设置当前
            node.waiter = w;
        else if (w.isInterrupted())         // 如果线程被中断,cancel节点
            tryCancel(node, slot);
        else                                // park等待唤醒

/** 响应超时的await */
private Object awaitNanos(Node node, Slot slot, long nanos) {
    int spins = TIMED_SPINS; //跟await不一样
    long lastTime = 0;
    Thread w = null;
    for (;;) {
        Object v = node.get();
        if (v != null)
            return v;
        long now = System.nanoTime();
        if (w == null)
            w = Thread.currentThread();
            nanos -= now - lastTime;
        lastTime = now;
        if (nanos > 0) { //还没超时
            if (spins > 0)
            else if (node.waiter == null)
                node.waiter = w;
            else if (w.isInterrupted())
                tryCancel(node, slot);
                LockSupport.parkNanos(node, nanos);
        else if (tryCancel(node, slot) && !w.isInterrupted()) 
            return scanOnTimeout(node);



/** cancel指定slot位置的节点,清空slot、设置节点的值cancel */
private static boolean tryCancel(Node node, Slot slot) {
    if (!node.compareAndSet(null, CANCEL))
        return false;
    if (slot.get() == node) // 再次校验node是否变化,估计是怕节点被交换处理掉了
        slot.compareAndSet(node, null);
    return true;


/** 在非0slot自旋 */
private static Object spinWait(Node node, Slot slot) {
    int spins = SPINS; //这里的spins跟awaitNanos里面的超时自旋不一样
    for (;;) {
        Object v = node.get();
        if (v != null)
            return v;
        else if (spins > 0)
            tryCancel(node, slot); //自旋次数达到还没有交换,那就cancel


