Java并发同步之阻塞队列与Condition接口

Java中的阻塞队列主要是用在线程池中,通过阻塞队列实现线程的添加和获取以及删除等操作。阻塞队列支持在队列为空时,获取队列元素的线程处于阻塞状态直到队列变为非空(noEmpty);此外,当队列满时,不可向队列添加元素,一直阻塞到队列可用(notFull)。

阻塞队列主要有以下几种:

ArrayBlockingQueue:是一个数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

LinkedBlockingQueue:是一个单向链表实现的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,Executors.newCachedThreadPool使用这个队列。这个有点像golang中的channel。

PriorityBlockingQueue:优先级队列,时基于数组的堆结构,进入队列的元素按照优先级会进行排序。

DelayQueue:一个使用优先级队列实现的无界阻塞队列。

而阻塞队列的实现主要依赖于Condition接口。

Condition主要有两个基本方法:await方法和signal方法,一个用于等待,一个用户唤醒线程。

Condition必须依赖于Lock接口,生成Condition可通过可重入锁lock.newCondition()生成。

一个简单的例子:

@Slf4j
public class ConditionPra {

    private Integer size = 10;

    private List<Integer> list = new ArrayList<>();
    private Integer num = 1;

    private ReentrantLock lock = new ReentrantLock();

    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public class Producer implements Runnable {

        @Override
        public void run() {
            for (int i=0;i<15;i++){
                lock.lock();
                try{
                    while(size == list.size()){
                        try{
                            log.info("线程已经满了,生产者阻塞等待:{}",Thread.currentThread().getName());
                            notFull.await();
                        }catch (Exception e){
                            log.error(e.getMessage());
                            e.printStackTrace();
                        }
                    }
                    list.add(num);
                    num++;
                    notEmpty.signal();
                    log.info("生产者向队列中添加了元素:{}",num);
                }catch (Exception e){
                    log.info("{} error",e);
                }finally {
                    lock.unlock();
                }
            }
        }
    }

    public class Consumer implements Runnable {

        @Override
        public void run() {
            for(int i=0;i<15;i++){
                lock.lock();
                try{
                    while(list.size() == 0){
                        try{
                            log.info("线程已经空了,消费者阻塞等待:{}",Thread.currentThread().getName());
                            notEmpty.await();
                        }catch (Exception e){
                            log.error(e.getMessage());
                            e.printStackTrace();
                        }
                    }
                    Integer s = list.remove(0);
                    notFull.signal();
                    log.info("消费者从队列中获取了元素:{}",s);
                }catch (Exception e){
                    log.info("{} error",e);
                }finally {
                    lock.unlock();
                }
            }
        }
    }


    public void runm() {
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new Consumer());
        t1.start();
        t2.start();
        try{
            Thread.sleep(100L);
        }catch (Exception e){

        }

    }

}

Condition它是基于AbstractQueuedSynchronizer(AQS)的,它内部的实现是基于一个FIFO的等待队列,主要用来发送signal信号的,当调用await后,会将线程放入到等待队列;当调用signal后,会从等待队列获取首节点。每一个Contidion对象都会有一个等待队列。

看一张往上看到的图,说明了等待队列和同步队列的结构:

我们都知道在使用锁时,AQS底部使用了一个同步队列,这是一个双向链表。主要是用于线程获取排她锁的排队过程。之所以设计成双向链表主要是为了检测前驱节点的SIGNAL状态。如果是则休眠,如果不是则尝试获取锁。

上面Condition有一个firstWaiter和lasterwaiter,表示首节点和尾节点,当有新的线程加入,就直接利用lasterWaiter操作即可。

其基本流程:

1、当上述的Produder(或者Consumer)线程1调用lock.lock,获取锁成功,那么线程2会被加入到AQS同步队列中;

2、当Produder(或者Consumer)线程1调用Condition的await方法时,线程1会释放锁,并添加到Condition等待队列中,加入到等待队列的尾部,此时并不会使用CAS,因为只有获取锁的线程才会进入,所以这里不需要考虑线程安全,一定是安全的;

3、锁被释放后,会自动唤醒AQS同步队列中的首节点线程,被唤醒的线程2成功获取锁。

4、当线程2调用了signal方法,那此时会唤醒Condition等待队列的线程,本例子中就是线程1,即将该线程从等待队列中取出,并添加到AQS同步队列。这里注意的是线程1只是被添加到同步队列,并没有唤醒。

5、当线程2显示地调用lock,unlock之后,锁被释放了,此时会唤醒同步队列中的首节点,该例子就是线程1.

其实Condition的await和signal很像Object的wait和notify,只是object的wait和nofity是与synchorized共同使用的,Condition是与Reentrantlock等显示锁共用的。

现在看一下ArrayBlockingQueue的一段源码:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {


    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;




 /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

}

ArrayBlockingQueue主要维护了两个Condition对象,一个是notEmpty,一个是notFull。表示ArrayBlockingQueue是不是空,是不是满。

在put操作时,如果已经满了,就将该线程放到notFull等待队列,在take方法中,如果ArrayBlockingQueue是空的,那么就将放到notEmpty等待队列中,等待唤醒。

对应的线程被唤醒的是上面两个方法中调用的enqueue和dequeue的两个方法:

  /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

当入队的时候,就会发送notEmpty的signal,用于唤醒在该等待队列中等待的对象。当然,这里是FIFO。

当出队的时候,就会发送notFull的signal,用于唤醒在该等待队列中的对象。

--------EOF---------
本文微信分享/扫码阅读