0%

非阻塞队列

队列

fifo的一种数据结构

jdk中的非阻塞队列

ConcurrentLinkedQueue

主要流程

  1. 将节点加在链表尾部,用cas策略
    下面结合代码来解释

offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

poll

主要流程:

  1. 从链表头部取出,并cas修改头指针
    下面结合代码来解释
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public E poll() {
    restartFromHead: for (;;) {
    for (Node<E> h = head, p = h, q;; p = q) {
    final E item;
    if ((item = p.item) != null && p.casItem(item, null)) {
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
    updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
    }
    else if ((q = p.next) == null) {
    updateHead(h, p);
    return null;
    }
    else if (p == q)
    continue restartFromHead;
    }
    }
    }

    final void updateHead(Node<E> h, Node<E> p) {
    // assert h != null && p != null && (h == p || h.item == null);
    if (h != p && HEAD.compareAndSet(this, h, p))
    NEXT.setRelease(h, h);
    }

LinkedTransferQueue

LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();

restart: for (Node s = null, t = null, h = null;;) {
for (Node p = (t != (t = tail) && t.isData == haveData) ? t
: (h = head);; ) {
final Node q; final Object item;
if (p.isData != haveData
&& haveData == ((item = p.item) == null)) {
if (h == null) h = head;
if (p.tryMatch(item, e)) {
if (h != p) skipDeadNodesNearHead(h, p);
return (E) item;
}
}
if ((q = p.next) == null) {
if (how == NOW) return e;
if (s == null) s = new Node(e);
if (!p.casNext(null, s)) continue;
if (p != t) casTail(t, s);
if (how == ASYNC) return e;
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
if (p == (p = q)) continue restart;
}
}
}

offer

主要流程
下面结合代码来解释

1
2
3
4
5
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}

offer超时

1
2
3
4
5
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}

poll

主要流程
下面结合代码来解释

1
2
3
public E poll() {
return xfer(null, false, NOW, 0);
}

poll超时

1
2
3
4
5
6
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

xfer方法

前面的offer,poll用的都是这个方法,主要流程:

  1. 根据是take还是put来决定取元素还是插入元素
    下面结合代码来解释
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
    * Params:
    e – the item or null for take haveData – true if this is a put, else a take
    how – NOW, ASYNC, SYNC, or TIMED nanos – timeout in nanosecs, used only if mode is TIMED
    Returns:
    an item if matched, else e
    * */
    @SuppressWarnings("unchecked")
    private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
    throw new NullPointerException();

    restart: for (Node s = null, t = null, h = null;;) {
    for (Node p = (t != (t = tail) && t.isData == haveData) ? t
    : (h = head);; ) {
    final Node q; final Object item;
    if (p.isData != haveData
    && haveData == ((item = p.item) == null)) {
    if (h == null) h = head;
    if (p.tryMatch(item, e)) {
    if (h != p) skipDeadNodesNearHead(h, p);
    return (E) item;
    }
    }
    if ((q = p.next) == null) {
    if (how == NOW) return e;
    if (s == null) s = new Node(e);
    if (!p.casNext(null, s)) continue;
    if (p != t) casTail(t, s);
    if (how == ASYNC) return e;
    return awaitMatch(s, p, e, (how == TIMED), nanos);
    }
    if (p == (p = q)) continue restart;
    }
    }
    }

Welcome to my other publishing channels