专业的JAVA编程教程与资源

网站首页 > java教程 正文

06.整合rabbitmq异步处理

temp10 2025-05-22 13:24:12 java教程 4 ℃ 0 评论

1.登录后续流程改造为基于rabbitmq异步处理 第八期的rabbitmq
2.结业项目整合rabbitmq
3.落地代码解决MQ如何保证消息不丢失?
4.落地代码解决MQ如果消费失败了?重试策略
5.落地代码解决MQ重试过程如何避免幂等性问题

6.落地代码实现MQ消费者批量消费

06.整合rabbitmq异步处理

如果是基于多线程异步

优点:实现简单、延迟概率不是很高

缺点:服务宕机 挂了 同一个jvm里面-----导致数据丢失 可能会消耗到cpu资源 重试代码需要自己写 没有完全隔离

如果基于MQ做异步

优点:完全隔离、保证数据不丢失、抗高并发 内置重试次数配置

缺点:实现复杂 延迟比较高


rabbitmq里面 消费者 消费成功的情况下 则该msg 会从rabbitmq服务器端删除

kafka 提交offset----消息日志 后期通过定时策略 淘汰算法 清除msg



mq服务器端是如何知道 mq消费者消费失败呢?

1.直接抛出异常

2.手动ack


mq消费者消费失败的情况下 则该msg 一直存放mq服务器端。

sys-user 新增openid字段

CREATE TABLE `sys_user_1` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `dept_id` varchar(50) DEFAULT NULL COMMENT '部门ID',
  `name` varchar(50) DEFAULT NULL COMMENT '姓名',
  `user_name` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '登陆名称',
  `email` varchar(50) DEFAULT '' COMMENT '用户邮箱',
  `phone_number` varchar(11) DEFAULT '' COMMENT '手机号码',
  `sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',
  `avatar` varchar(100) DEFAULT '' COMMENT '头像地址',
  `password` varchar(100) DEFAULT '' COMMENT '密码',
  `salt` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '盐值',
  `birthday` date DEFAULT NULL COMMENT '生日',
  `status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',
  `del_flag` char(1) DEFAULT '0' COMMENT '删除标志(0代表存在 2代表删除)',
  `login_ip` varchar(128) DEFAULT '' COMMENT '最后登录IP',
  `login_date` datetime DEFAULT NULL COMMENT '最后登录时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_by` varchar(64) DEFAULT '' COMMENT '更新者',
  `remark` varchar(500) DEFAULT NULL COMMENT '备注',
  `version` int DEFAULT NULL COMMENT '版本',
  `open_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3 COMMENT='用户信息表';


实体类中也需要加上openid


安装rabbitmq环境

docker run -it -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3-management

-- RABBITMQ_DEFAULT_USER:账号

-- RABBITMQ_DEFAULT_PASS:密码

-- 如果RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS没填写,默认用户guest 密码guest -- 15672:控制台端口 界面管理rabbitmq

-- 5672: AMQP端口 -----投递msg


关闭防火墙

systemctl stop firewalld


账户admin 密码 admin

rabbitmq架构原理

整合rabbitmq

maven依赖

        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置文件内容

spring:
  main:
    allow-bean-definition-overriding: true
  rabbitmq:
    ####连接地址
    host: 192.168.75.139
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: admin
    ### 地址
    virtual-host: /
  redis:
    host: 127.0.0.1
    port: 6379
server:
  port: 60
  #spring:
  #  application:
  #    ###服务的名称
  #    name: mayikt-main
  #  datasource:
  #    password: root
  #    username: root
  #    driver-class-name: com.mysql.jdbc.Driver
  #    url: jdbc:mysql://localhost:3306/sys_admin?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8

logging:
  level:
    ###打印mybatis日志
    com.mayikt.main.mapper: debug
# 数据源 mayiktdb
sharding:
  jdbc:
    datasource:
      names: sysadmin
      # 第一个数据库
      sysadmin:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.jdbc.Driver
        jdbc-url: jdbc:mysql://127.0.0.1:3306/sys_admin?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
        username: root
        password: root
    # 水平拆分的数据库(表) 配置分库 + 分表策略 行表达式分片策略
    config:
      sharding:
        tables:
          sys_user: ##虚拟表名称  mayikt_user_0 mayikt_user_1
            actual-data-nodes: sysadmin.sys_user_$->{1..2}  # 没有带上 分片字段
            table-strategy:
              standard: ##SELECT * FROM  mayikt_user_1 where id =1
                precise-algorithm-class-name: com.mayikt.main.config.MayiktRangeShardingAlgorithm
                sharding-column: id
      # 打印执行的数据库
      props:
        sql:
          show: true

# 打印执行的sql语句
spring:
  main:
    allow-bean-definition-overriding: true
  rabbitmq:
    ####连接地址
    host: 192.168.75.139
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: admin
    ### 地址
    virtual-host: /
  redis:
    host: 127.0.0.1
    port: 6379
mayikt:
  thread:
    corePoolSize: 32
    maxPoolSize: 32
    queueCapacity: 1000  # 最大线程数---生效  32-20
    keepAlive: 60 # 如果线程60s 没有任何执行任务的话 直接死亡避免 浪费cpu资源
  ##对线程池不了解的话 可以看下往期第九期 或者java程序员面试 线程池原理
  log:
    threads: 3  # 需要初始化的线程数
    threadName: mayikt-log  # 线程名称 方便生产环境发生了cpu飙高的问题能够解决
  wx:
    main:
      login:
        templateId: NjZakAOs_NZ5CzltyQ0ppjMqsI0a-Fxa2wKPXbFd_DA
wx:
  mp:
    configs:
      - appId: wx5c43fde3c9733d9e # 改成你的 申请公众号测试 appid
        secret: b8b217126c33a5fb7074927d5e72a81a  # 改成你的 申请公众号测试 秘钥
        token: meite  # token的值需要改成你在公众号 配置的一样的

生产者

package com.mayikt.main.producer;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author 余胜军
 * @ClassName LoginProducer
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Component
@Slf4j
public class LoginProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 登录后续处理流程
     */
    public void sendLoginFollowUp(Integer userId, String loginIp, Date loginTime, String loginToken,
                                  String channel, String equipment) {
        /**
         * 1.交换机名称
         * 2.路由key名称
         * 3.发送内容
         */
        JSONObject data = new JSONObject();
        data.put("userId", userId);
        data.put("loginIp", loginIp);
        data.put("loginTime", loginTime);
        data.put("loginToken", loginToken);
        data.put("channel", channel);
        data.put("equipment", equipment);
//        data.put("openId", openid);
//        data.put("phone", phone);
        String dataJSON = data.toJSONString();
        amqpTemplate.convertAndSend("/mayikt_ex", "", dataJSON);
        log.info(">>登录之后投递mq消息,异步处理后续操作..dataJSON:{}<<", dataJSON);
    }
}

package com.mayikt.main.config; /**
 * 作    者 :蚂蚁课堂-余胜军
 * 版 本 号 :v1.0.0.0
 * ******************************************************************
 * 版权由每特教育-蚂蚁课堂-余胜军所有 微信yushengjun644 QQ644064779
 * 官方网址:www.mayikt.com
 * ******************************************************************
 * //----------------------------------------------------------------
 */

import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Queue;


/**
 * RabbitmqConfig
 */
@Component
public class RabbitMQConfig {
    /**
     * 定义交换机
     */
    private String EXCHANGE_MAYIKT_NAME = "/mayikt_ex";


    /**
     * 唯一登录队列 日志功能
     */
    private String MAYIKT_UNIQUELOGIN_QUEUE = "fanout_uniquelogin_queue";


    /**
     * 微信模板队列
     */
    private String MAYIKT_WECHAT_TEMPLATE_QUEUE = "fanout_wechattemplate_queue";


    /**
     * 配置MAYIKT_UNIQUELOGIN_QUEUE
     *
     * @return
     */
    @Bean
    public Queue fanoutUniqueloginQueue() {
        return new Queue(MAYIKT_UNIQUELOGIN_QUEUE);
    }

    /**
     * 配置MAYIKT_UNIQUELOGIN_QUEUE
     *
     * @return
     */
    @Bean
    public Queue fanoutWechattemplateQueue() {
        return new Queue(MAYIKT_WECHAT_TEMPLATE_QUEUE);
    }

    /**
     * 配置fanoutExchange
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_MAYIKT_NAME);
    }


    // 绑定交换机 fanoutUniqueloginQueue
    @Bean
    public Binding bindingUniqueLogFanoutExchange(Queue fanoutUniqueloginQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutUniqueloginQueue).to(fanoutExchange);
    }

    // 绑定交换机 Wechattemplate
    @Bean
    public Binding bindingWechattemplateFanoutExchange(Queue fanoutWechattemplateQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutWechattemplateQueue).to(fanoutExchange);
    }
}


        loginProducer.sendLoginFollowUp(sysUser.getId(), sysUserLoginLog.getLoginIp(), sysUserLoginLog.getLoginTime(),
                sysUserLoginLog.getLoginToken(), sysUserLoginLog.getChannel(), sysUserLoginLog.getEquipment());

消费者

mayikt-admin-mq\mayikt-admin-consumer

将代码拷贝到mayikt-admin-consumer

mayikt-admin-consumer.rar


异步微信发送消息消费者

package com.mayikt.main.consumer;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mayikt.main.consumer.entity.SysUser;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.consumer.mapper.SysUserMapper;
import com.mayikt.main.manage.WechatTemplateManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 余胜军
 * @ClassName WechatTemplateConsumer
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Component
@RabbitListener(queues = "fanout_wechattemplate_queue")
@Slf4j
public class WechatTemplateConsumer {

    @Autowired
    private WechatTemplateManage wechatTemplateManage;
    @Autowired
    private SysUserMapper sysUserMapper;

    //    @RabbitHandler
//    public void process(String msg) {
//        log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
//        JSONObject data = JSONObject.parseObject(msg);
//        SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
//        // 根据userid 获取手机号码和openid
//        Integer userId = sysUserLoginLog.getUserId();
//        SysUser sysUser = sysUserMapper.selectById(userId);
//        if (sysUser == null) {
//            return;
//        }
//        boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
//        if (!result) {
//            // 如果发送失败则 需要返回ack 失败告诉给mq 让它重试。
//            return;
//        }
//        // 成功则告诉我们rabbitmq 让它从mq中将该消息从mq中移除
//    }
    @RabbitHandler
    public void process(String msg, Message message, Channel channel) throws IOException {
        log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            JSONObject data = JSONObject.parseObject(msg);
            SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
            // 根据userid 获取手机号码和openid
            Integer userId = sysUserLoginLog.getUserId();
            SysUser sysUser = sysUserMapper.selectById(userId);
            if (sysUser == null) {
                return;
            }
            boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
            if (!result) {
                // 如果发送失败则 需要返回ack 失败告诉给mq 让它重试。
                return;
            }
//            int j = 1 / 0;
            // 成功则告诉我们rabbitmq 让它从mq中将该消息从mq中移除
            channel.basicAck(deliveryTag, true);// 手动ack成功
        } catch (Exception e) {
            log.error("e:{}", e);
//            拒绝签收
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

异步写入登录日志消费者

package com.mayikt.main.consumer;

import com.alibaba.fastjson.JSONObject;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.manage.LoginLogManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 余胜军
 * @ClassName UniqueLoginConsumer
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Component
@RabbitListener(queues = "fanout_uniquelogin_queue")
@Slf4j
public class UniqueLoginConsumer {
    @Autowired
    private LoginLogManage loginLogManage;
     @RabbitHandler
    public void process(String msg) {
        log.info(">>使用MQ异步的形式记录日志:{}<<", msg);
        SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
        loginLogManage.asynLoginLog(sysUserLoginLog);
    }

}


避免一直消费者消费失败一直重试 改成手动ack


1.rabbitmq 手动ack

1.自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

2.手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

3.根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。

优化消费者代码

配置文件中新增 开启手动ack

# 打印执行的sql语句
spring:
  main:
    allow-bean-definition-overriding: true
  rabbitmq:
    ####连接地址
    host: 192.168.75.139
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: admin
    ### 地址
    virtual-host: /
    listener:
      simple: ## 修改配置为手动ack模式
        acknowledge-mode: manual
package com.mayikt.main.consumer;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mayikt.main.consumer.entity.SysUser;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.consumer.mapper.SysUserMapper;
import com.mayikt.main.manage.WechatTemplateManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 余胜军
 * @ClassName WechatTemplateConsumer
 * @qq 644064779
 * @addres www.mayikt.com
 * 微信:yushengjun644
 */
@Component
@RabbitListener(queues = "fanout_wechattemplate_queue")
@Slf4j
public class WechatTemplateConsumer {

    @Autowired
    private WechatTemplateManage wechatTemplateManage;
    @Autowired
    private SysUserMapper sysUserMapper;

    //    @RabbitHandler
//    public void process(String msg, Channel channel) {
//        log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
//        // 解析json
//        SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
//        Integer userId = sysUserLoginLog.getUserId();
//        // 在用户userid实时查询对应手机号码和openid
//        SysUser sysUser = sysUserMapper.selectById(userId);
//        boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
//        if (!result) {
//            // 该msg 不能够从mq删除 mq服务器端给mq消费者触发重试策略
//            // 手动抛出异常----
//            return;
//        }
//        // 发送微信模板消息成功了 再通知mq服务器端删除该msg
//    }
    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 解析json
            SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
            Integer userId = sysUserLoginLog.getUserId();
            // 在用户userid实时查询对应手机号码和openid
            SysUser sysUser = sysUserMapper.selectById(userId);
            boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
            if (!result) {
                // 该msg 不能够从mq删除 mq服务器端给mq消费者触发重试策略
                // 手动抛出异常----
//            int j = 1 / 0;
                channel.basicNack(deliveryTag, true, true); // 给mq发送通知 这个msg消息消费失败了 需要重试! 不会从mq删除
                return;
            }
            // 发送微信模板消息成功了 再通知mq服务器端删除该msg
            channel.basicAck(deliveryTag, true);// 手动ack成功 发送通知给 mq服务器端说 消费成功了 msg 从mq服务器端删除
        } catch (Exception e) {
            log.error("<e:>{}", e);
            channel.basicNack(deliveryTag, true, true); // 给mq发送通知 这个msg消息消费失败了 需要重试! 不会从mq删除
        }
    }
    /**
     * 如果消费者消费msg 一直是失败---网络连接超时 可以重试
     * 但是如果是代码bug  将该msg 存入在另外一个队列中(死信队列)中   重新变写新消费者消费。
     */

}

day06.rar

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表