BlockingQueue
,这个类是一个接Queue
接口,这两个接口都是在JDK5
中加入的 。BlockingQueue
阻塞队列是线程安全的,在我们业务中是会经常频繁使用到的,如典型的生产者消费的场景,生产者只需要向队列中添加,而消费者负责从队列中获取。put
元素到队列,而消费者从中take
出元素处理,这样实现了任务与执行任务类之间的解耦,任务都被放入到了阻塞队列中,这样生产者和消费者之间就不会直接相互访问实现了隔离提高了安全性。JAVA
中队列Queue
类的类图,我们可以看到它分为两大类,阻塞队列与非阻塞队列BlockingQueue
而非阻塞队列的接口是 ConcurrentLinkedQueue
, 本文主要介绍阻塞队列,非阻塞队列不再过多阐述BlockingQueue
主要有下面六个实现类,分别是 ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
、DelayQueue
、PriorityBlockingQueue
、LinkedTransferQueue
。这些阻塞队列有着各自的特点和适用场景,后面详细介绍。ConcurrentLinkedQueue
, 它不会阻塞线程,而是利用了 CAS
来保证线程的安全。Queue
关系很紧密,那就是Deque
,这其实是 double-ended-queue
的缩写,意思是双端队列。它的特点是从头部和尾部都能添加和删除元素,而我们常见的普通队列Queue
则是只能一端进一端出,即FIFO
。Put
和 Take
方法take方法
take
方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take
方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:put方法
put
方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程如图所示:是否有界(容量有多大)
LinkedBlockingQueue
的上限是 Integer.MAX_VALUE
,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。ArrayBlockingQueue
如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。分类 | 方法 | 含义 | 特点 |
---|---|---|---|
抛出异常 | add | 添加一个元素 | 如果队列已满,添加则抛出 IllegalStateException 异常 |
remove | 删除队列头节点 | 当队列为空后,删除则抛出 NoSuchElementException 异常 |
|
element | 获取队列头元素 | 当队列为空时,则抛出 NoSuchElementException 异常 |
|
返回无异常 | offer | 添加一个元素 | 当队列已满,不会报异常,返回 false ,如果成功返回 true |
poll | 获取队列头节点,并且删除它 | 当队列空时,返回 Null |
|
peek | 单纯获取头节点 | 当队列为空时反馈 NULL |
|
阻塞 | put | 添加一个元素 | 如果队列已满则阻塞 |
take | 返回并删除头元素 | 如果队列为空则阻塞 |
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add(1);
queue.add(2);
queue.add(3);
add
向其中添加元素,当添加到第三个时则会抛出异常如下:remove
方法是从队列中删除队列的头节点,同时会返回该元素。当队列中为空时执行 remove
方法时则会抛出异常,代码如下:
private static void groupRemove() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.remove());
System.out.println(queue.remove());
}
i-code.online
, 之后通过 remove
方法进行删除,当执行第二次remove
时队列内已无元素,则抛出异常。如下:element
方法是获取队列的头元素,但是并不是删除该元素,这也是与 remove
的区别,当队列中没有元素后我们再执行 element
方法时则会抛出异常,代码如下:
private static void groupElement() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.element());
System.out.println(queue.element());
}
private static void groupElement2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.element());
}
element
的使用。在第一个方法中并不会报错,因为首元素一直存在的,第二个方法中因为空的,所以抛出异常,如下结果:offer
方法是向队列中添加元素, 同时反馈成功与失败,如果失败则返回 false
,当队列已满时继续添加则会失败,代码如下:
private static void groupOffer() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("i-code.online"));
System.out.println(queue.offer("云栖简码"));
System.out.println(queue.offer("AnonyStar"));
}
offer
添加元素,当添加第三个时,则会反馈 false
,如下结果:
true
true
false
poll
方法对应上面 remove
方法,两者的区别就在于是否会在无元素情况下抛出异常,poll
方法在无元素时不会抛出异常而是返回null
,如下代码:
private static void groupPoll() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("云栖简码")); //添加元素
System.out.println(queue.poll()); //取出头元素并且删除
System.out.println(queue.poll());
}
poll
方法来获取并删除头节点,发现第二次调用时为null
,因为队列中已经为空了,如下:
true
云栖简码
null
peek
方法与前面的 element
方法是对应的 ,获取元素头节点但不删除,与其不同的在于peek
方法在空队列下并不会抛出异常,而是返回 null
,如下:
private static void groupPeek() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer(1));
System.out.println(queue.peek());
System.out.println(queue.peek());
}
private static void groupPeek2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.peek());
}
peek
的使用,结果如下:put
方法是向队列中添加一个元素,这个方法是阻塞的,也就是说当队列已经满的情况下,再put
元素时则会阻塞,直到队列中有空位.take
方法是从队列中获取头节点并且将其移除,这也是一个阻塞方法,当队列中已经没有元素时,take
方法则会进入阻塞状态,直到队列中有新的元素进入。ArrayBlockingQueue
是一个我们常用的典型的有界队列,其内部的实现是基于数组来实现的,我们在创建时需要指定其长度,它的线程安全性由 ReentrantLock
来实现的。
public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}
ArrayBlockingQueue
提供的构造函数中,我们需要指定队列的长度,同时我们也可以设置队列是都是公平的,当我们设置了容量后就不能再修改了,符合数组的特性,此队列按照先进先出(FIFO
)的原则对元素进行排序。ReentrantLock
一样,如果 ArrayBlockingQueue
被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。Integer.MAX_VALUE
,这个值是非常大的,几乎无法达到,对此我们可以认为这个队列基本属于一个无界队列(也又认为是有界队列)。此队列按照先进先出的顺序进行排序。synchronousQueue
是一个不存储任何元素的阻塞队列,每一个put
操作必须等待take
操作,否则不能添加元素。同时它也支持公平锁和非公平锁。synchronousQueue
的容量并不是1,而是0。因为它本身不会持有任何元素,它是直接传递的,synchronousQueue
会把元素从生产者直接传递给消费者,在这个过程中能够是不需要存储的CachedThreadPool
就是利用了该队列。Executors.newCachedThreadPool()
,因为这个线程池它的最大线程数是Integer.MAX_VALUE
,它是更具需求来创建线程,所有的线程都是临时线程,使用完后空闲60秒则被回收,PriorityBlockingQueue
是一个支持优先级排序的无界阻塞队列,可以通过自定义实现 compareTo()
方法来指定元素的排序规则,或者通过构造器参数 Comparator
来指定排序规则。但是需要注意插入队列的对象必须是可比较大小的,也就是 Comparable
的,否则会抛出 ClassCastException
异常。take
方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put
方法永远不会阻塞,添加操作始终都会成功DelayQueue
是一个实现PriorityBlockingQueue
的延迟获取的无界队列。具有“延迟”的功能。DelayQueue
应用场景:1. 缓存系统的设计:可以用DelayQueue
保存缓存元素的有效期,使用一个线程循环查询DelayQueue
,一旦能从DelayQueue
中获取元素时,表示缓存有效期到了。2. 定时任务调度。使用DelayQueue
保存当天将会执行的任务和执行时间,一旦从DelayQueue
中获取到任务就开始执行,从比如TimerQueue
就是使用DelayQueue
实现的。Delayed
接口,而 Delayed
接口又继承了 Comparable
接口,所以自然就拥有了比较和排序的能力,代码如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed
接口继承 Comparable
,里面有一个需要实现的方法,就是 getDelay
。这里的 getDelay
方法返回的是“还剩下多长的延迟时间才会被执行”,如果返回 0 或者负数则代表任务已过期。本文由AnonyStar 发布,可转载但需声明原文出处。
欢迎关注微信公账号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online