0%

阻塞队列

队列

fifo的一种数据结构

jdk中的阻塞队列

block queue
jdk中的block queue

下面以LinkedBlockingQueue为例来讲解

主要是利用ReentrantLock和条件变量来实现并发安全的,有一个AtomicInteger成员,记录容量。有插入锁和获取锁.对于队满和队空不管是offer还是take都会做通知

offer

主要流程

  1. 在入队前先获取锁,在入队后释放锁.如果队满和队空做下通知
    下面结合代码讲解
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
    return false;
    final int c;
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //获取put锁
    putLock.lock();
    try {
    //如果满了,则直接失败
    if (count.get() == capacity)
    return false;
    //入队
    enqueue(node);
    //这里是先get再add
    c = count.getAndIncrement();
    if (c + 1 < capacity)
    notFull.signal();
    } finally {
    //释放锁
    putLock.unlock();
    }
    //因为上面是先get再add,只有当从不空变为空时才通知消费者
    if (c == 0)
    signalNotEmpty();
    return true;
    }

offer限定时间

主要流程和前面的offer一样,主要是获取put锁变成了支持中断,如果在队满的时候,就加个超时等待,给定时间内队列依旧是满的,则失败
下面结合代码讲解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final int c;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

poll

主要流程

  1. 获取take锁,然后出队,并在队空和队满时做通知
    下面结合代码讲解
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
    return null;
    final E x;
    final int c;
    final ReentrantLock takeLock = this.takeLock;
    //获取take锁
    takeLock.lock();
    try {
    if (count.get() == 0)
    return null;
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)//通知
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)//如果前一步是等于队列容量,现在取出来了一个,就不满了,通知在入队的生产者
    signalNotFull();
    return x;
    }

poll限定时间

和前面poll的流程差不多,主要是加了个在队空时的超时等待
下面结合代码讲解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
final E x;
final int c;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断获取锁
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);//等待
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

Welcome to my other publishing channels