专业的JAVA编程教程与资源

网站首页 > java教程 正文

消息队列挂了,Java环境下如何妥善处理事务消息

temp10 2024-11-18 17:09:19 java教程 13 ℃ 0 评论

引言

在分布式系统中,消息队列常被用于实现业务解耦与异步处理,特别是配合事务消息机制,确保业务操作与消息投递的一致性。然而,当消息队列服务出现故障或短暂不可用时,如何确保事务消息的安全处理至关重要。本文将深入探讨基于Java环境下的解决方案,并提供相应的示例代码。

一、事务消息的基本原理

事务消息一般遵循两阶段提交(2PC)的过程:

消息队列挂了,Java环境下如何妥善处理事务消息

  • 预发送阶段:在业务数据库事务内,先将消息发送至消息队列,但并不立即投递,而是暂存于“预提交”状态。
  • 确认阶段:业务数据库事务提交成功后,通知消息队列提交消息,消息队列随后将消息正式投递给消费者。

二、消息队列服务关闭时的挑战

当消息队列服务突然关闭时,可能会造成以下问题:

  • 预发送的消息无法及时完成确认。
  • 已经提交但未投递的消息丢失。

三、应对策略与示例代码

1. 使用本地事务与消息表

一种常见的解决方案是借助本地数据库建立一张消息表,记录消息的发送状态及必要信息。当消息队列服务恢复正常时,可以查询这张消息表,对未处理完的事务消息进行重新发送或确认。

// 假设已有本地消息表 MessageLog
@Table
public class MessageLog {
    @Id
    private Long id;
    private String transactionId; // 事务ID
    private String messageBody; // 消息体
    private Boolean isCommitted; // 是否已提交到消息队列
    // 其他字段如消息类型、创建时间等...
}

// 在业务服务中
@Service
public class TransactionalService {
    @Autowired
    private MessageLogRepository repository;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Transactional
    public void processBusinessLogicAndSendMsg(BusinessData data) {
        // 执行业务逻辑...

        // 存储消息到本地消息表
        MessageLog messageLog = new MessageLog(transactionId, JSON.toJSONString(data));
        repository.save(messageLog);

        try {
            // 尝试发送消息
            rocketMQTemplate.sendMessageInTransaction("topic", data);
            // 更新消息表状态
            messageLog.setIsCommitted(true);
            repository.save(messageLog);
        } catch (Exception e) {
            // 消息队列服务异常时,不做任何处理,等待后续定时任务检查并重新发送
        }
    }

    // 定时任务检查未发送成功的消息并重新发送
    @Scheduled(cron = "0 0/15 * * * ?")
    public void checkAndResendUncommittedMessages() {
        List<MessageLog> uncommittedMessages = repository.findByIsCommittedFalse();
        for (MessageLog log : uncommittedMessages) {
            try {
               rocketMQTemplate.sendMessageInTransaction("topic", JSON.parseObject(log.getMessageBody(), BusinessData.class));
                log.setIsCommitted(true);
                repository.save(log);
            } catch (Exception e) {
                // 记录错误日志,可尝试再次重试或人工介入
            }
        }
    }
}

2. 利用消息队列的事务消息特性

部分消息队列产品如RocketMQ提供了内置的事务消息支持。在Java中,可以使用org.apache.rocketmq.spring.core.RocketMQTemplatesendMessageInTransaction方法实现事务消息的发送。

@Service
public class RocketMQTransactionalService implements RocketMQListener<BusinessData>, TransactionListener {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    @Transactional
    public void onMessage(BusinessData message) {
        // 处理消费逻辑...
    }

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑,如果成功则返回COMMIT_MESSAGE,否则RETURN_MESSAGE
        if (localTransactionSucceeds()) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public RocketMQTransactionListener getTransactionListener() {
        return this;
    }

    public void sendTransactionalMessage(BusinessData data) {
        rocketMQTemplate.executeInTransaction(
            "topic",
            JSON.toJSONString(data),
            (message, arg) -> this.executeLocalTransaction(message, arg)
        );
    }
}

// 当消息队列服务恢复后,RocketMQ会自动处理未完成的事务消息

综上所述,在消息队列服务发生故障时,采用本地事务与消息表的方式或直接利用消息队列提供的事务消息特性,都可以有效解决事务消息的处理问题。此外,无论采取哪种方式,都需要确保消息消费者具备幂等性处理能力,以防止消息重复投递引发的问题。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表