网站首页 > java教程 正文
引言
在分布式系统中,消息队列常被用于实现业务解耦与异步处理,特别是配合事务消息机制,确保业务操作与消息投递的一致性。然而,当消息队列服务出现故障或短暂不可用时,如何确保事务消息的安全处理至关重要。本文将深入探讨基于Java环境下的解决方案,并提供相应的示例代码。
一、事务消息的基本原理
事务消息一般遵循两阶段提交(2PC)的过程:
- 预发送阶段:在业务数据库事务内,先将消息发送至消息队列,但并不立即投递,而是暂存于“预提交”状态。
- 确认阶段:业务数据库事务提交成功后,通知消息队列提交消息,消息队列随后将消息正式投递给消费者。
二、消息队列服务关闭时的挑战
当消息队列服务突然关闭时,可能会造成以下问题:
- 预发送的消息无法及时完成确认。
- 已经提交但未投递的消息丢失。
三、应对策略与示例代码
1. 使用本地事务与消息表
一种常见的解决方案是借助本地数据库建立一张消息表,记录消息的发送状态及必要信息。当消息队列服务恢复正常时,可以查询这张消息表,对未处理完的事务消息进行重新发送或确认。
// 假设已有本地消息表 MessageLog
@Table
public class MessageLog {
@Id
private Long id;
private String transactionId; // 事务ID
private String messageBody; // 消息体
private Boolean isCommitted; // 是否已提交到消息队列
// 其他字段如消息类型、创建时间等...
}
// 在业务服务中
@Service
public class TransactionalService {
@Autowired
private MessageLogRepository repository;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public void processBusinessLogicAndSendMsg(BusinessData data) {
// 执行业务逻辑...
// 存储消息到本地消息表
MessageLog messageLog = new MessageLog(transactionId, JSON.toJSONString(data));
repository.save(messageLog);
try {
// 尝试发送消息
rocketMQTemplate.sendMessageInTransaction("topic", data);
// 更新消息表状态
messageLog.setIsCommitted(true);
repository.save(messageLog);
} catch (Exception e) {
// 消息队列服务异常时,不做任何处理,等待后续定时任务检查并重新发送
}
}
// 定时任务检查未发送成功的消息并重新发送
@Scheduled(cron = "0 0/15 * * * ?")
public void checkAndResendUncommittedMessages() {
List<MessageLog> uncommittedMessages = repository.findByIsCommittedFalse();
for (MessageLog log : uncommittedMessages) {
try {
rocketMQTemplate.sendMessageInTransaction("topic", JSON.parseObject(log.getMessageBody(), BusinessData.class));
log.setIsCommitted(true);
repository.save(log);
} catch (Exception e) {
// 记录错误日志,可尝试再次重试或人工介入
}
}
}
}
2. 利用消息队列的事务消息特性
部分消息队列产品如RocketMQ提供了内置的事务消息支持。在Java中,可以使用org.apache.rocketmq.spring.core.RocketMQTemplate的sendMessageInTransaction方法实现事务消息的发送。
@Service
public class RocketMQTransactionalService implements RocketMQListener<BusinessData>, TransactionListener {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
@Transactional
public void onMessage(BusinessData message) {
// 处理消费逻辑...
}
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑,如果成功则返回COMMIT_MESSAGE,否则RETURN_MESSAGE
if (localTransactionSucceeds()) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public RocketMQTransactionListener getTransactionListener() {
return this;
}
public void sendTransactionalMessage(BusinessData data) {
rocketMQTemplate.executeInTransaction(
"topic",
JSON.toJSONString(data),
(message, arg) -> this.executeLocalTransaction(message, arg)
);
}
}
// 当消息队列服务恢复后,RocketMQ会自动处理未完成的事务消息
综上所述,在消息队列服务发生故障时,采用本地事务与消息表的方式或直接利用消息队列提供的事务消息特性,都可以有效解决事务消息的处理问题。此外,无论采取哪种方式,都需要确保消息消费者具备幂等性处理能力,以防止消息重复投递引发的问题。
猜你喜欢
- 2024-11-18 常见的消息队列对比
- 2024-11-18 java中的延迟队列——Redis
- 2024-11-18 Java重试队列-让服务更健壮
- 2024-11-18 SpringBoot消息队列
- 2024-11-18 临近期末考试,一篇消息队列和RocketMQ的总结送给你们(二)
- 2024-11-18 一文了解字节跳动消息队列演进之路
- 2024-11-18 消息队列中,如何保证消息的顺序性?
- 2024-11-18 Java开发中常用的消息队列工具 ActiveMQ
- 2024-11-18 临近期末考试,一篇消息队列和RocketMQ的总结送给你们(一)
- 2024-11-18 Java面试官:如何用Redis实现一个消息队列?直接上代码
你 发表评论:
欢迎- 最近发表
-
- 搞趣网:我的世界全新皮肤包原始居民下载地址
- 我的世界拔刀剑MOD下载(我的世界拔刀剑mod下载国际版)
- 我的世界无正版账号的简单联机方法(非网易版,仅适用于局域网)
- 一些可以显著提高大型 Java 项目启动速度的尝试
- 常见的java敏感异常介绍(java 常见的异常)
- Java 开发者必看!三招实现外部 Jar 包动态加载(含热更新方案)
- Java JAR 启动内存参数配置指南:从基础设置到性能优化
- 对Spring MVC接口进行Mock测试(springmvc对外接口)
- 还在用策略模式解决 if-else?Map+函数式接口方法才是YYDS
- 干掉OpenFeign,SpringBoot 3.0 自带的 HTTP 客户端真香!
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)