一、技术选型
MQ适用于消息堆积,消费端处理不过来;两点之间运行生命周期不同情况;
JMS支持5种消息类型,为什么不能用队列来替代呢?
原因:解耦、针对消息本身做队列操作、消息持久化
PS:如果消费很快,前端积压严重的话,不建议使用mq,可采用直连的TCP通信,如mina、netty,节省中间mq转发。
二、常用语法
add
增加一个元索,如果队列已满,则抛出一IIIegaISlabEepeplian
异常
remove
移除并返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException
异常
element
返回队列头部的元素,如果队列为空,则抛出一个NoSuchElementException
异常
offer
添加一个元素并返回true,如果队列已满,则返回false
poll
移除并返问队列头部的元素,如果队列为空,则返回null
peek
返回队列头部的元素,如果队列为空,则返回null
put
添加一个元素,如果队列满,则阻塞
take
移除并返回队列头部的元素,如果队列为空,则阻塞
remove
、element
、offer
、poll
、peek
其实是属于Queue
接口。
阻塞队列的操作可以根据它们的响应方式分为以下三类:
(1) add
、remove
和element
操作在你试图为一个已满的队列增加元素或从空队列取得元素时 抛出异常。
(2) 在多线程程序中,队列在任何时间都可能变成满的或空的,所以你可能想使用offer
、poll
、peek
方法。这些方法在无法完成任务时只是给出一个出错示而不会抛出异常。
注意:poll
和peek
方法出错进返回null
。因此,向队列中插入null
值是不合法的。
(3) put
:把Object
加到BlockingQueue
里,如果BlockingQueue
没有空间,则调用此方法的线程被阻断,直接有空间再继续。take
:取走BlockingQueue
里排在首位的对象,若BlockingQueue
为空,阻断进入等待状态直到BlockingQueue
有新的数据被加入。
三、并发Queue
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,都继承自Queue。
1、ConcurrentLinkedQueue
是一个适用高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,性能好于BlockingQueue,是一个基于链接节点的无界线程安全队列。
该队列的元素先进先出,头是先进的,尾是最近加入的,队列不允许为Null。
重要方法:
add()
、offer()
都是加入元素的方法,无任何区别。
poll()
、peek()
都是取头元素节点,前者会删除元素,后者不会。
2、ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护一个定长数组,以便缓存队列中的数据对象,内部没有实现读写分离,意味着生产者和消费者不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列。
/**有界队列**/
package demo3;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class UserQueue {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); // 必须传一个长度
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
array.add("f");
//System.out.println(array.offer("a", 3, TimeUnit.SECONDS));// 阻塞式
}
}
3、LinkedBlockingQueue
基于链表的阻塞队列,同ArrayBlockingQueue
类似,其内部也维持着一个数据缓冲队列(该队列是一个链表构成),LinkedBlockingQueue
之所以能够高效地处理并发数据,是因为内部实现采用了读写分离锁,从而实现生产者和消费者操作的完全并行,是一个无界队列。
/**无界队列**/
LinkedBlockingQueue<String> link = new LinkedBlockingQueue<String>(); // 可以不传,也可以传,传则定死;不传则无界
link.put("a");
link.put("b");
link.add("c");
link.add("d");
link.add("e");
link.add("f");
for (Iterator iterator = link.iterator(); iterator.hasNext();) {
String string = (String) iterator.next();
System.out.println(string);
}
//但如果初始化长度的话,也会有容量限制
4、SynchronousQueue
一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费。
/**无缓存队列**/
SynchronousQueue<String> queue = new SynchronousQueue<String>();
queue.add("a");
/**由于没有任何容量,直接抛异常**/
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at demo3.UserQueue.main(UserQueue.java:31)
/**可以调用add方法,但并不代表队列中加元素了,先take再add,直接扔给前面阻塞在take的线程**/
final SynchronousQueue<String> q = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(q.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
q.add("adsfa");
}
});
t2.start();
5、PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparator
对象来决定,也就是说传入队列的对象必须实现Comparable
接口),不遵循先进先出,在实现PriorityBlockingQueue
时,内部控制线程同步的锁采用的是公平锁,它是一个无界队列。
不是加一个元素时就进行排序,而是调用take/poll
方法时才进行将优先级最高的拿出来。
package demo3;
public class Task implements Comparable<Task> {
private int id;
/**
* @return the id
*/
public int getId() {
return id;
}
/**
* @param id the id to set
*/
public void setId(int id) {
this.id = id;
}
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* @param name the name to set
*/
public void setName(String name) {
this.name = name;
}
private String name;
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "Task [id=" + id + ", name=" + name + "]";
}
}
package demo3;
import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
Task t1 = new Task();
t1.setId(3);
t1.setName("任务1");
Task t2 = new Task();
t2.setId(6);
t2.setName("任务2");
Task t3 = new Task();
t3.setId(1);
t3.setName("任务3");
q.add(t1);
q.add(t2);
q.add(t3);
System.out.println(q);
for (Iterator iterator = q.iterator();iterator.hasNext();){
Task task = (Task)iterator.next();
System.out.println(task.getName());
}
System.out.println(q.poll());
System.out.println(q.poll());
System.out.println(q.poll());
}
}
6、 DelayQueue
带有延迟时间的Queue
,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue
中的元素必须实现Delay
接口,DelayQueue
是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等。
未超时肯定会阻塞。
四、场景
应用非常繁忙,并发量非常大,应用承载量1000个任务,单核,一个线程,类似于马路上的早晚高峰,采用ArrayBlockingQueue有界队列,做容量内存限制,超过的话,给予相应的拒绝措施。
平稳期,采用无界队列,车流量不是很大,可以放心使用LinkedBlockingQueue,能保证数据的实时性。
夜半时分,不需要存储到队列中了(浪费空间),数据量非常少,采用虚拟的队列(无容量)SynchronousQueue队列,直接提交给线程,效率更高。
网友评论