队列
fifo的一种数据结构
jdk中的非阻塞队列
ConcurrentLinkedQueue
主要流程
- 将节点加在链表尾部,用cas策略
下面结合代码来解释
offer
1 | public boolean offer(E e) { |
poll
主要流程:
- 从链表头部取出,并cas修改头指针
下面结合代码来解释1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public E poll() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
if ((item = p.item) != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// assert h != null && p != null && (h == p || h.item == null);
if (h != p && HEAD.compareAndSet(this, h, p))
NEXT.setRelease(h, h);
}
LinkedTransferQueue
LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。
核心方法
1 | private E xfer(E e, boolean haveData, int how, long nanos) { |
offer
主要流程
下面结合代码来解释
1 | public boolean offer(E e) { |
offer超时
1 | public boolean offer(E e, long timeout, TimeUnit unit) { |
poll
主要流程
下面结合代码来解释
1 | public E poll() { |
poll超时
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
xfer方法
前面的offer,poll用的都是这个方法,主要流程:
- 根据是take还是put来决定取元素还是插入元素
下面结合代码来解释1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* Params:
e – the item or null for take haveData – true if this is a put, else a take
how – NOW, ASYNC, SYNC, or TIMED nanos – timeout in nanosecs, used only if mode is TIMED
Returns:
an item if matched, else e
* */
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
restart: for (Node s = null, t = null, h = null;;) {
for (Node p = (t != (t = tail) && t.isData == haveData) ? t
: (h = head);; ) {
final Node q; final Object item;
if (p.isData != haveData
&& haveData == ((item = p.item) == null)) {
if (h == null) h = head;
if (p.tryMatch(item, e)) {
if (h != p) skipDeadNodesNearHead(h, p);
return (E) item;
}
}
if ((q = p.next) == null) {
if (how == NOW) return e;
if (s == null) s = new Node(e);
if (!p.casNext(null, s)) continue;
if (p != t) casTail(t, s);
if (how == ASYNC) return e;
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
if (p == (p = q)) continue restart;
}
}
}