网站首页 > java教程 正文
阻塞队列
阻塞队列通过添加两组方法来扩展队列:

- 一组方法无限期地阻塞
- 另一组方法允许您指定要阻止的时间段。
BlockingQueue 接口的实例表示阻塞队列。 BlockingQueue 接口继承自 Queue 接口。
put()和 offer()方法在阻塞队列的尾部添加一个元素。如果阻塞队列已满,则put()方法将无限期阻塞,直到队列中的空间可用。offer()方法允许您指定等待空间可用的时间段。 如果指定的元素添加成功,则返回true; 否则为假。
take()和poll()方法检索和删除阻塞队列的头。如果阻塞队列为空,take()方法将无限期阻塞。poll()方法允许您指定在阻塞队列为空时要等待的时间段; 如果在元素可用之前过去了指定的时间,则返回null。
来自 BlockingQueue 中 Queue 接口的方法就像使用 Queue 。
BlockingQueue 被设计为线程安全的并且可以使用在生产者/消费者的情况下。
阻塞队列不允许空元素和可以是有界的或无界的。
BlockingQueue 中的 remainingCapacity()返回可以添加到阻止队列中而不阻塞的元素数。
BlockingQueue 可以控制多个线程被阻塞时的公平性。 如果阻塞队列是公平的,它可以选择最长的等待线程来执行操作。如果阻塞队列不公平,则不指定选择的顺序。
BlockingQueue 接口及其所有实现类都在 java.util.concurrent 包中。 以下是 BlockingQueue接口的实现类:
由数组支持的 ArrayBlockingQueue 是 BlockingQueue 的有界实现类。 我们可以在其构造函数中指定阻塞队列的公平性。 默认情况下,它不公平。
LinkedBlockingQueue 可以用作有界或无界阻塞队列。 它不允许为阻塞队列指定公平规则。
PriorityBlockingQueue 是 BlockingQueue 的无界实现类。 它的工作方式与 PriortyQueue 相同,用于排序阻塞队列中的元素,并将阻塞特性添加到 PriorityQueue 中。
SynchronousQueue 实现 BlockingQueue ,没有任何容量。 put操作等待take操作以获取元素。 它可以在两个线程之间进行握手,并在两个线程之间交换对象。 它的isEmpty()方法总是返回true。
DelayQueue是BlockingQueue的无界实现类。它保持一个元素,直到该元素经过指定的延迟。 如果有超过一个元素的延迟已经过去,那么其延迟最早传递的元素将被放置在队列的头部。
上面的代码生成以下结果。
延迟队列
DelayQueue 实现 BlockingQueue 接口。 DelayQueue 中的元素必须保留一定的时间。
DelayQueue 使用一个名为 Delayed 的接口来获取延迟时间。
该接口在java.util.concurrent包中。 其声明如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit timeUnit);
}
它扩展了 Comparable 接口,它的 compareTo()方法接受一个Delayed对象。
DelayQueue 调用每个元素的 getDelay()方法来获取元素必须保留多长时间。 DelayQueue 将传递TimeUnit 到此方法。
当 getDelay()方法返回一个零或一个负数时,是元素离开队列的时间。
队列通过调用元素的 compareTo()方法确定要弹出的那个。 此方法确定要从队列中删除的过期元素的优先级。
以下代码显示了如何使用DelayQueue。
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayedJob implements Delayed {
private Instant scheduledTime;
String jobName;
public DelayedJob(String jobName, Instant scheduledTime) {
this.scheduledTime = scheduledTime;
this.jobName = jobName;
}
@Override
public long getDelay(TimeUnit unit) {
long delay = MILLIS.between(Instant.now(), scheduledTime);
long returnValue = unit.convert(delay, MILLISECONDS);
return returnValue;
}
@Override
public int compareTo(Delayed job) {
long currentJobDelay = this.getDelay(MILLISECONDS);
long jobDelay = job.getDelay(MILLISECONDS);
int diff = 0;
if (currentJobDelay > jobDelay) {
diff = 1;
} else if (currentJobDelay < jobDelay) {
diff = -1;
}
return diff;
}
@Override
public String toString() {
String str = this.jobName + ", " + "Scheduled Time: "
+ this.scheduledTime;
return str;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<DelayedJob> queue = new DelayQueue<>();
Instant now = Instant.now();
queue.put(new DelayedJob("A", now.plusSeconds(9)));
queue.put(new DelayedJob("B", now.plusSeconds(3)));
queue.put(new DelayedJob("C", now.plusSeconds(6)));
queue.put(new DelayedJob("D", now.plusSeconds(1)));
while (queue.size() > 0) {
System.out.println("started...");
DelayedJob job = queue.take();
System.out.println("Job: " + job);
}
System.out.println("Finished.");
}
}
上面的代码生成以下结果。
传输队列
传输队列扩展阻塞队列。
生产者使用 TransferQueue 的 transfer(E element)方法将元素传递给消费者。
当生产者调用传递(E元素)方法时,它等待直到消费者获取其元素。 tryTransfer()方法提供了该方法的非阻塞和超时版本。
getWaitingConsumerCount()方法返回等待消费者的数量。
如果有一个等待消费者, hasWaitingConsumer()方法返回true; 否则,返回false。LinkedTransferQueue 是 TransferQueue 接口的实现类。 它提供了一个无界的 TransferQueue 。
以下代码显示如何使用 TransferQueue 。
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
class TQProducer extends Thread {
private String name;
private TransferQueue<Integer> tQueue;
private AtomicInteger sequence;
public TQProducer(String name, TransferQueue<Integer> tQueue,
AtomicInteger sequence) {
this.name = name;
this.tQueue = tQueue;
this.sequence = sequence;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
int nextNum = this.sequence.incrementAndGet();
if (nextNum % 2 == 0) {
System.out.format("%s: Enqueuing: %d%n", name, nextNum);
tQueue.put(nextNum); // Enqueue
} else {
System.out.format("%s: Handing off: %d%n", name, nextNum);
System.out.format("%s: has a waiting consumer: %b%n", name,
tQueue.hasWaitingConsumer());
tQueue.transfer(nextNum); // A hand off
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TQConsumer extends Thread {
private final String name;
private final TransferQueue<Integer> tQueue;
public TQConsumer(String name, TransferQueue<Integer> tQueue) {
this.name = name;
this.tQueue = tQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
int item = tQueue.take();
System.out.format("%s removed: %d%n", name, item);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
final AtomicInteger sequence = new AtomicInteger();
for (int i = 0; i < 5; i++) {
try {
tQueue.put(sequence.incrementAndGet());
System.out.println("Initial queue: " + tQueue);
new TQProducer("Producer-1", tQueue, sequence).start();
new TQConsumer("Consumer-1", tQueue).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上面的代码生成以下结果。
猜你喜欢
- 2024-09-08 java队列之LinkedBlockingQueue和ConcurrentLinkedQueue
- 2024-09-08 Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析
- 2024-09-08 100个Java工具类之61:队列类Queue
- 2024-09-08 阿里架构师浅析数据结构:队列在线程池等有限资源池中的应用
- 2024-09-08 【每日一学】Java数据结构探秘:队列与List的强大应用与性能优化
- 2024-09-08 使用Redis实现消息队列功能在Java中的应用
- 2024-09-08 『并发包入坑指北』之阻塞队列(阻塞队列poll方法)
- 2024-09-08 工作了这么久,你知道Java线程池容量应该设置多少么
- 2024-09-08 一文读懂,Java内置的延迟队列DelayQueue,原理及使用方法
- 2024-09-08 Java 消息队列的简单实现(java如何实现消息队列的监听)
欢迎 你 发表评论:
- 11-10开心手机恢复大师免费版(开心手机恢复大师软件下载)
- 11-10怎么修改电脑开机启动项(电脑更改开机启动项)
- 11-10都市超级全能系统(都市全能超级学生 百度百科)
- 11-10为什么明明有网却打不开网页
- 11-10杀毒清理软件哪个好(杀毒清理软件推荐)
- 11-10win7蓝屏怎么修复(win7蓝屏如何处理)
- 11-10重装系统从xp重装成win7(xp重装系统win10)
- 11-10万界最强签到系统(万界最强签到系统5的7次方)
- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)

本文暂时没有评论,来添加一个吧(●'◡'●)