网站首页 > java教程 正文
在当今互联网大厂的复杂架构中,分布式系统已成为主流。而在分布式系统里,分布式事务的处理一直是个棘手难题。对于咱们互联网大厂开发人员而言,如何在保证系统数据一致性的同时,高效地完成业务操作,是日常工作中频繁面临的挑战。今天,咱们就来深入探讨一下,如何借助 RocketMQ 这一强大工具,巧妙地避免分布式事务操作。
分布式事务之痛
在分布式系统中,一个业务操作往往会涉及多个服务或节点。比如,在电商系统中,用户下单后,不仅要创建订单记录,还可能需要扣减库存、更新用户积分、通知物流系统等一系列操作。这些操作分布在不同的服务上,要确保它们要么全部成功,要么全部失败,这就是分布式事务的核心诉求。
传统的分布式事务解决方案,如基于 XA 协议的两阶段提交方案,虽然能保证强一致性,但存在性能开销大、协调成本高的问题。在高并发场景下,这种方案会严重影响系统的响应速度和吞吐量。还有 TCC 事务补偿型方案,它需要大量手动编写补偿代码,项目维护难度极大,除非是对一致性要求极高的核心业务场景,否则很少被采用。
RocketMQ 闪亮登场
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性等诸多优势。它为我们解决分布式事务问题提供了新的思路,即通过可靠消息最终一致性方案来避免直接的分布式事务操作。
可靠消息最终一致性方案原理
- 消息发送阶段:A 系统首先发送一个 prepared 消息到 RocketMQ。如果这个 prepared 消息发送失败,那么就直接取消操作,不再进行后续执行。这一步就像是给后续操作上了一道保险,确保消息服务正常可用,才会继续后续业务操作。
- 本地事务执行阶段:当 prepared 消息发送成功后,A 系统接着执行本地事务。如果本地事务执行成功,就告诉 RocketMQ 发送确认消息;如果失败,就告诉 RocketMQ 回滚消息。这里本地事务和消息状态的一致性非常关键,RocketMQ 提供了相应机制来保障。
- 消息消费阶段:如果 RocketMQ 收到了确认消息,B 系统会接收到该消息,然后执行本地事务。
- 消息重试与补偿:RocketMQ 会自动定时轮询所有 prepared 消息回调相应服务接口,询问这个消息是不是因为本地事务处理失败了,所以没发送确认消息。一般通过查询数据库查看之前的本地事务是否执行,如果回滚了,那么这里也回滚。这就有效避免了本地事务执行成功,而确认消息发送失败的情况。倘若 B 系统的事务失败了,RocketMQ 会自动不断重试,直到成功。如果实在无法成功,对于重要的资金类业务,可以针对 B 系统本地回滚后,想办法通知 A 系统也回滚;或者发送报警由人工来手工回滚和补偿。
实际应用场景举例
以电商业务中常见的 “订单支付” 场景来说,在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。在引入 RocketMQ 之前,这些操作可能都在一个分布式事务中进行,牵一发而动全身,任何一个环节出错,整个事务都要回滚。
引入 RocketMQ 后,订单支付服务只需要关注最重要的流程 —— 更新订单状态即可。其他诸如更新用户积分、通知商家、更新用户画像等操作,全部交给 RocketMQ 来异步通知。这样一来,不仅实现了系统解耦,各个服务之间的依赖关系大大降低,而且因为这些步骤变成了异步执行,能显著减少订单支付的整体耗时,提升订单系统的吞吐量。
RocketMQ 通用实现步骤
引入依赖
以 Java 项目为例,若使用 Maven 构建项目,在pom.xml文件中添加 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
确保版本与项目需求及 RocketMQ 服务端版本兼容。
配置生产者
创建生产者实例:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");// 根据实际RocketMQ集群地址配置
这里的producerGroup是生产者组名称,同一组内的生产者具有相同的角色和属性,NamesrvAddr指定了 RocketMQ 的 NameServer 地址,NameServer 负责管理 Broker 的路由信息。
启动生产者
try {
producer.start();
System.out.println("Producer started successfully");
} catch (MQClientException e) {
e.printStackTrace();
}
启动生产者后,它便可以向 RocketMQ 发送消息。
发送消息
构建消息对象
Message msg = new Message("TopicTest", // Topic名称
"TagA", // Tag标签
"OrderID188", // 消息键,用于消息查询等场景
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体
);
Topic用于对消息进行分类,不同业务场景可设置不同 Topic;Tag进一步细化消息类别,方便消费者过滤消息。
发送消息
try {
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
发送消息时,会返回SendResult对象,包含消息发送的状态、消息在 Broker 中的存储位置等信息,可据此判断消息是否发送成功。
配置消费者
创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumerGroup是消费者组名称,同一组内的消费者共同消费 Topic 中的消息。subscribe方法用于指定消费者订阅的 Topic 和 Tag。
注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Received message: " + messageBody);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
消息监听器用于处理接收到的消息,在consumeMessage方法中编写业务逻辑处理消息内容,处理完成后返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消息消费成功。
启动消费者
try {
consumer.start();
System.out.println("Consumer started successfully");
} catch (MQClientException e) {
e.printStackTrace();
}
启动消费者后,它会开始从 RocketMQ 拉取消息并进行消费。
总结
通过使用 RocketMQ 的可靠消息最终一致性方案,为开发人员提供了一种高效、可靠且易于维护的避免分布式事务操作的方法。它在保证系统数据最终一致性的同时,极大地提升了系统的性能和可扩展性。
当然,在实际应用中,还需要根据具体的业务场景和需求,对 RocketMQ 进行合理的配置和优化。比如,根据业务的并发量和消息量,合理调整 RocketMQ 的集群规模和参数;针对不同的业务消息,设置合适的 Topic、Tag 和分片键等。
随着互联网技术的不断发展,分布式系统的复杂度还会持续增加,分布式事务的处理也将面临更多新的挑战。但相信借助像 RocketMQ 这样优秀的技术工具,我们一定能够在复杂的技术环境中,找到最佳的解决方案,为用户提供更加稳定、高效的服务。
猜你喜欢
- 2025-06-04 分布式事务解决方案探析:从理论到实践
- 2025-06-04 SpringCloud分布式框架&分布式事务&分布式锁
- 2025-06-04 分布式协议与算法,你了解多少?(分布式协议 paxos)
- 2025-06-04 Spring Boot中的分布式事务解决方案
- 2025-06-04 Seata分布式事务详解(原理流程及4种模式)
- 2025-06-04 分布式事务怎么做?Spring Cloud Alibaba Seata告诉你
- 2025-06-04 Zookeeper:分布式架构详解、分布式技术详解、分布式事务
- 2025-06-04 JAVA分布式事务解决方案:掌控微服务间的事务一致性
- 2025-06-04 一文揭秘!Spring Boot3 分布式事务的高效实现与性能优化方案
- 2025-06-04 探秘分布式事务处理中的Java解决方案
你 发表评论:
欢迎- 06-04C++优先级调度队列(Priority Queue)
- 06-04数据结构与算法-优先队列(优先队列 数组实现)
- 06-04什么是优先队列?(优先队列原理)
- 06-04终于有架构大牛把分布式系统概念讲明白了,竟然用了足足800页
- 06-04分布式事物如何保证接口请求顺序性?
- 06-04微服务下分布式事务模式的详细对比
- 06-04彻底掌握分布式事务2PC、3PC模型(分布式事务 三阶段)
- 06-04分布式事务最全详解(看这篇就够了)
- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)