public E poll() { finalAtomicIntegercount=this.count; if (count.get() == 0) returnnull; final E x; finalint c; finalReentrantLocktakeLock=this.takeLock; //获取take锁 takeLock.lock(); try { if (count.get() == 0) returnnull; x = dequeue(); c = count.getAndDecrement(); if (c > 1)//通知 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity)//如果前一步是等于队列容量,现在取出来了一个,就不满了,通知在入队的生产者 signalNotFull(); return x; }
publicbooleanoffer(E e) { final Node<E> newNode = newNode<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (NEXT.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time; failure is OK TAIL.weakCompareAndSet(this, t, newNode); returntrue; } // Lost CAS race to another thread; re-read next } elseif (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;; p = q) { final E item; if ((item = p.item) != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } elseif ((q = p.next) == null) { updateHead(h, p); returnnull; } elseif (p == q) continue restartFromHead; } } }
finalvoidupdateHead(Node<E> h, Node<E> p) { // assert h != null && p != null && (h == p || h.item == null); if (h != p && HEAD.compareAndSet(this, h, p)) NEXT.setRelease(h, h); }
/** * Params: e – the item or null for take haveData – true if this is a put, else a take how – NOW, ASYNC, SYNC, or TIMED nanos – timeout in nanosecs, used only if mode is TIMED Returns: an item if matched, else e * */ @SuppressWarnings("unchecked") private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) thrownewNullPointerException();
restart: for (Nodes=null, t = null, h = null;;) { for (Nodep= (t != (t = tail) && t.isData == haveData) ? t : (h = head);; ) { final Node q; final Object item; if (p.isData != haveData && haveData == ((item = p.item) == null)) { if (h == null) h = head; if (p.tryMatch(item, e)) { if (h != p) skipDeadNodesNearHead(h, p); return (E) item; } } if ((q = p.next) == null) { if (how == NOW) return e; if (s == null) s = newNode(e); if (!p.casNext(null, s)) continue; if (p != t) casTail(t, s); if (how == ASYNC) return e; return awaitMatch(s, p, e, (how == TIMED), nanos); } if (p == (p = q)) continue restart; } } }
for (;;) { //校验最大线程数在这里 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) returnfalse; //如果修改workerCount成功了,就表示增加woker成功了, if (compareAndIncrementWorkerCount(c)) break retry; //如果workerCount没有修改成功,继续获取状态,继续修改 c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { //创建worker w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intc= ctl.get();
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) thrownewIllegalThreadStateException(); //用锁保护的woker set workers.add(w); workerAdded = true; ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } //让worker开始执行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
finalvoidrunWorker(Worker w) { Threadwt= Thread.currentThread(); Runnabletask= w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts booleancompletedAbruptly=true; try { //不断取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //前置后置钩子函数 beforeExecute(wt, task); try { //执行 task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { booleantimedOut=false; // Did the last poll() time out?
for (;;) { intc= ctl.get();
// Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); returnnull; }
intwc= workerCountOf(c);
// Are workers subject to culling? booleantimed= allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) returnnull; continue; }