Java并发同步之阻塞队列与Condition接口
Java中的阻塞队列主要是用在线程池中,通过阻塞队列实现线程的添加和获取以及删除等操作。阻塞队列支持在队列为空时,获取队列元素的线程处于阻塞状态直到队列变为非空(noEmpty);此外,当队列满时,不可向队列添加元素,一直阻塞到队列可用(notFull)。
阻塞队列主要有以下几种:
ArrayBlockingQueue:是一个数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序
LinkedBlockingQueue:是一个单向链表实现的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。
SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,Executors.newCachedThreadPool使用这个队列。这个有点像golang中的channel。
PriorityBlockingQueue:优先级队列,时基于数组的堆结构,进入队列的元素按照优先级会进行排序。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
而阻塞队列的实现主要依赖于Condition接口。
Condition主要有两个基本方法:await方法和signal方法,一个用于等待,一个用户唤醒线程。
Condition必须依赖于Lock接口,生成Condition可通过可重入锁lock.newCondition()生成。
一个简单的例子:
@Slf4j
public class ConditionPra {
private Integer size = 10;
private List<Integer> list = new ArrayList<>();
private Integer num = 1;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public class Producer implements Runnable {
@Override
public void run() {
for (int i=0;i<15;i++){
lock.lock();
try{
while(size == list.size()){
try{
log.info("线程已经满了,生产者阻塞等待:{}",Thread.currentThread().getName());
notFull.await();
}catch (Exception e){
log.error(e.getMessage());
e.printStackTrace();
}
}
list.add(num);
num++;
notEmpty.signal();
log.info("生产者向队列中添加了元素:{}",num);
}catch (Exception e){
log.info("{} error",e);
}finally {
lock.unlock();
}
}
}
}
public class Consumer implements Runnable {
@Override
public void run() {
for(int i=0;i<15;i++){
lock.lock();
try{
while(list.size() == 0){
try{
log.info("线程已经空了,消费者阻塞等待:{}",Thread.currentThread().getName());
notEmpty.await();
}catch (Exception e){
log.error(e.getMessage());
e.printStackTrace();
}
}
Integer s = list.remove(0);
notFull.signal();
log.info("消费者从队列中获取了元素:{}",s);
}catch (Exception e){
log.info("{} error",e);
}finally {
lock.unlock();
}
}
}
}
public void runm() {
Thread t1 = new Thread(new Producer());
Thread t2 = new Thread(new Consumer());
t1.start();
t2.start();
try{
Thread.sleep(100L);
}catch (Exception e){
}
}
}
Condition它是基于AbstractQueuedSynchronizer(AQS)的,它内部的实现是基于一个FIFO的等待队列,主要用来发送signal信号的,当调用await后,会将线程放入到等待队列;当调用signal后,会从等待队列获取首节点。每一个Contidion对象都会有一个等待队列。
看一张往上看到的图,说明了等待队列和同步队列的结构:
我们都知道在使用锁时,AQS底部使用了一个同步队列,这是一个双向链表。主要是用于线程获取排她锁的排队过程。之所以设计成双向链表主要是为了检测前驱节点的SIGNAL状态。如果是则休眠,如果不是则尝试获取锁。
上面Condition有一个firstWaiter和lasterwaiter,表示首节点和尾节点,当有新的线程加入,就直接利用lasterWaiter操作即可。
其基本流程:
1、当上述的Produder(或者Consumer)线程1调用lock.lock,获取锁成功,那么线程2会被加入到AQS同步队列中;
2、当Produder(或者Consumer)线程1调用Condition的await方法时,线程1会释放锁,并添加到Condition等待队列中,加入到等待队列的尾部,此时并不会使用CAS,因为只有获取锁的线程才会进入,所以这里不需要考虑线程安全,一定是安全的;
3、锁被释放后,会自动唤醒AQS同步队列中的首节点线程,被唤醒的线程2成功获取锁。
4、当线程2调用了signal方法,那此时会唤醒Condition等待队列的线程,本例子中就是线程1,即将该线程从等待队列中取出,并添加到AQS同步队列。这里注意的是线程1只是被添加到同步队列,并没有唤醒。
5、当线程2显示地调用lock,unlock之后,锁被释放了,此时会唤醒同步队列中的首节点,该例子就是线程1.
其实Condition的await和signal很像Object的wait和notify,只是object的wait和nofity是与synchorized共同使用的,Condition是与Reentrantlock等显示锁共用的。
现在看一下ArrayBlockingQueue的一段源码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
}
ArrayBlockingQueue主要维护了两个Condition对象,一个是notEmpty,一个是notFull。表示ArrayBlockingQueue是不是空,是不是满。
在put操作时,如果已经满了,就将该线程放到notFull等待队列,在take方法中,如果ArrayBlockingQueue是空的,那么就将放到notEmpty等待队列中,等待唤醒。
对应的线程被唤醒的是上面两个方法中调用的enqueue和dequeue的两个方法:
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
当入队的时候,就会发送notEmpty的signal,用于唤醒在该等待队列中等待的对象。当然,这里是FIFO。
当出队的时候,就会发送notFull的signal,用于唤醒在该等待队列中的对象。
微信分享/微信扫码阅读