网站首页 > java教程 正文
想要实现一个自定义的消息队列需要用到线程同步、队列管理等多种技术手段,下面我们就来介绍一下如何在Java中实现一个简单、安全的消息队列。
实现思路
首先来讲,消息队列最为重要的一个结构就是消息的存储结构,我们可以通过Java内置的LinkedList来存储消息,LinkedList 是一个链表实现的双端队列,插入和删除操作的时间复杂度为 O(1),比较适合作为消息队列的底层结构。
接下来需要考虑的事情就是如何能够保证线程的安全性。为了确保线程安全,我们可以使用synchronized关键字来同步操作,或者使用java.util.concurrent包中的工具类,比如ReentrantLock和Condition,以实现更高级别的控制。
接下来就是消息队列的核心功能管理,需要有生产者线程将消息放入队列,消费者线程从队列中获取消息。为了避免生产者过多生产导致队列溢出,或者消费者过快消费导致队列为空,需要引入等待机制(如wait()和notify()或者Condition变量的await()和signal()方法)。
下面我们就通过使用 ReentrantLock 和 Condition 来实现自定义的线程安全消息队列。相比较于synchronized,使用ReentrantLock 提供了更细粒度的锁控制,并且Condition也可以实现类似wait()和notify()的功能,如下所示。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CustomMessageQueue<T> {
private Queue<T> queue;
private int capacity;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
// 构造方法,初始化消息队列和容量
public CustomMessageQueue(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
}
// 向队列中添加消息(生产者方法)
public void produce(T message) throws InterruptedException {
lock.lock();
try {
// 如果队列已满,生产者需要等待
while (queue.size() == capacity) {
System.out.println("Queue is full, producer waiting...");
notFull.await(); // 生产者等待,直到队列有空位
}
// 添加消息到队列
queue.add(message);
System.out.println("Produced: " + message);
// 通知消费者队列中有消息可取
notEmpty.signalAll(); // 唤醒等待的消费者
} finally {
lock.unlock();
}
}
// 从队列中获取消息(消费者方法)
public T consume() throws InterruptedException {
lock.lock();
try {
// 如果队列为空,消费者需要等待
while (queue.isEmpty()) {
System.out.println("Queue is empty, consumer waiting...");
notEmpty.await(); // 消费者等待,直到队列有消息
}
// 从队列中取出消息
T message = queue.poll();
System.out.println("Consumed: " + message);
// 通知生产者队列中有空位
notFull.signalAll(); // 唤醒等待的生产者
return message;
} finally {
lock.unlock();
}
}
// 返回队列当前的大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
消息生产者和消费者示例
我们可以创建生产者和消费者线程来验证队列的工作情况,如下所示。
public class MessageQueueTest {
public static void main(String[] args) {
// 创建一个容量为5的消息队列
CustomMessageQueue<String> messageQueue = new CustomMessageQueue<>(5);
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
messageQueue.produce("Message " + i);
Thread.sleep(500); // 模拟生产者生产时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
String message = messageQueue.consume();
Thread.sleep(1000); // 模拟消费者处理时间
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动生产者和消费者线程
producerThread.start();
consumerThread.start();
}
}
在上面的代码中,我们使用了ReentrantLock来确保队列的操作是线程安全的。生产者和消费者都必须获取锁才能操作队列,从而避免数据竞争问题。为了模拟真实的生产和消费过程,我们在生产和消费操作中加入了Thread.sleep(),分别模拟了生产和消费的延时。
改进思路
实际上,Java自带了BlockingQueue,如ArrayBlockingQueue、LinkedBlockingQueue,这些类已经实现了高效的线程安全队列,可以直接使用,而无需手动控制锁和条件变量。
当然,如果在开发中我们需要对消息进行持久化的存储,例如日志系统、分布式消息队列,这个时候我们就可以考虑将消息写入文件系统或数据库中。
在某些情况下,如果需要对于某些消息进行优先处理,我们可以通过PriorityQueue来实现优先级消息队列。
总结
以上代码展示了如何在Java中实现一个简单的自定义消息队列,主要使用了LinkedList作为底层数据结构,并借助ReentrantLock和Condition实现线程安全的生产者-消费者模型。有兴趣的读者可以深入的了解一下还有没有通过其他方式来实现自定义消息队列的方式,可以一起交流相互提升。
- 上一篇: 在Java中的哪些阻塞队列?详细介绍一下?
- 下一篇: Java中牛逼哄哄的消息队列到底有什么用?
猜你喜欢
- 2024-11-18 常见的消息队列对比
- 2024-11-18 java中的延迟队列——Redis
- 2024-11-18 Java重试队列-让服务更健壮
- 2024-11-18 SpringBoot消息队列
- 2024-11-18 临近期末考试,一篇消息队列和RocketMQ的总结送给你们(二)
- 2024-11-18 一文了解字节跳动消息队列演进之路
- 2024-11-18 消息队列中,如何保证消息的顺序性?
- 2024-11-18 Java开发中常用的消息队列工具 ActiveMQ
- 2024-11-18 临近期末考试,一篇消息队列和RocketMQ的总结送给你们(一)
- 2024-11-18 消息队列挂了,Java环境下如何妥善处理事务消息
你 发表评论:
欢迎- 最近发表
-
- 搞趣网:我的世界全新皮肤包原始居民下载地址
- 我的世界拔刀剑MOD下载(我的世界拔刀剑mod下载国际版)
- 我的世界无正版账号的简单联机方法(非网易版,仅适用于局域网)
- 一些可以显著提高大型 Java 项目启动速度的尝试
- 常见的java敏感异常介绍(java 常见的异常)
- Java 开发者必看!三招实现外部 Jar 包动态加载(含热更新方案)
- Java JAR 启动内存参数配置指南:从基础设置到性能优化
- 对Spring MVC接口进行Mock测试(springmvc对外接口)
- 还在用策略模式解决 if-else?Map+函数式接口方法才是YYDS
- 干掉OpenFeign,SpringBoot 3.0 自带的 HTTP 客户端真香!
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)