网站首页 > java教程 正文
1.登录后续流程改造为基于rabbitmq异步处理 第八期的rabbitmq
2.结业项目整合rabbitmq
3.落地代码解决MQ如何保证消息不丢失?
4.落地代码解决MQ如果消费失败了?重试策略
5.落地代码解决MQ重试过程如何避免幂等性问题
6.落地代码实现MQ消费者批量消费
如果是基于多线程异步
优点:实现简单、延迟概率不是很高
缺点:服务宕机 挂了 同一个jvm里面-----导致数据丢失 可能会消耗到cpu资源 重试代码需要自己写 没有完全隔离
如果基于MQ做异步
优点:完全隔离、保证数据不丢失、抗高并发 内置重试次数配置
缺点:实现复杂 延迟比较高
rabbitmq里面 消费者 消费成功的情况下 则该msg 会从rabbitmq服务器端删除
kafka 提交offset----消息日志 后期通过定时策略 淘汰算法 清除msg
mq服务器端是如何知道 mq消费者消费失败呢?
1.直接抛出异常
2.手动ack
mq消费者消费失败的情况下 则该msg 一直存放mq服务器端。
sys-user 新增openid字段
CREATE TABLE `sys_user_1` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
`dept_id` varchar(50) DEFAULT NULL COMMENT '部门ID',
`name` varchar(50) DEFAULT NULL COMMENT '姓名',
`user_name` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '登陆名称',
`email` varchar(50) DEFAULT '' COMMENT '用户邮箱',
`phone_number` varchar(11) DEFAULT '' COMMENT '手机号码',
`sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',
`avatar` varchar(100) DEFAULT '' COMMENT '头像地址',
`password` varchar(100) DEFAULT '' COMMENT '密码',
`salt` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '盐值',
`birthday` date DEFAULT NULL COMMENT '生日',
`status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',
`del_flag` char(1) DEFAULT '0' COMMENT '删除标志(0代表存在 2代表删除)',
`login_ip` varchar(128) DEFAULT '' COMMENT '最后登录IP',
`login_date` datetime DEFAULT NULL COMMENT '最后登录时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(64) DEFAULT '' COMMENT '更新者',
`remark` varchar(500) DEFAULT NULL COMMENT '备注',
`version` int DEFAULT NULL COMMENT '版本',
`open_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3 COMMENT='用户信息表';
实体类中也需要加上openid
安装rabbitmq环境
docker run -it -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3-management
-- RABBITMQ_DEFAULT_USER:账号
-- RABBITMQ_DEFAULT_PASS:密码
-- 如果RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS没填写,默认用户guest 密码guest -- 15672:控制台端口 界面管理rabbitmq
-- 5672: AMQP端口 -----投递msg
关闭防火墙
systemctl stop firewalld
账户admin 密码 admin
rabbitmq架构原理
整合rabbitmq
maven依赖
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件内容
spring:
main:
allow-bean-definition-overriding: true
rabbitmq:
####连接地址
host: 192.168.75.139
####端口号
port: 5672
####账号
username: admin
####密码
password: admin
### 地址
virtual-host: /
redis:
host: 127.0.0.1
port: 6379
server:
port: 60
#spring:
# application:
# ###服务的名称
# name: mayikt-main
# datasource:
# password: root
# username: root
# driver-class-name: com.mysql.jdbc.Driver
# url: jdbc:mysql://localhost:3306/sys_admin?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
logging:
level:
###打印mybatis日志
com.mayikt.main.mapper: debug
# 数据源 mayiktdb
sharding:
jdbc:
datasource:
names: sysadmin
# 第一个数据库
sysadmin:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/sys_admin?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
# 水平拆分的数据库(表) 配置分库 + 分表策略 行表达式分片策略
config:
sharding:
tables:
sys_user: ##虚拟表名称 mayikt_user_0 mayikt_user_1
actual-data-nodes: sysadmin.sys_user_$->{1..2} # 没有带上 分片字段
table-strategy:
standard: ##SELECT * FROM mayikt_user_1 where id =1
precise-algorithm-class-name: com.mayikt.main.config.MayiktRangeShardingAlgorithm
sharding-column: id
# 打印执行的数据库
props:
sql:
show: true
# 打印执行的sql语句
spring:
main:
allow-bean-definition-overriding: true
rabbitmq:
####连接地址
host: 192.168.75.139
####端口号
port: 5672
####账号
username: admin
####密码
password: admin
### 地址
virtual-host: /
redis:
host: 127.0.0.1
port: 6379
mayikt:
thread:
corePoolSize: 32
maxPoolSize: 32
queueCapacity: 1000 # 最大线程数---生效 32-20
keepAlive: 60 # 如果线程60s 没有任何执行任务的话 直接死亡避免 浪费cpu资源
##对线程池不了解的话 可以看下往期第九期 或者java程序员面试 线程池原理
log:
threads: 3 # 需要初始化的线程数
threadName: mayikt-log # 线程名称 方便生产环境发生了cpu飙高的问题能够解决
wx:
main:
login:
templateId: NjZakAOs_NZ5CzltyQ0ppjMqsI0a-Fxa2wKPXbFd_DA
wx:
mp:
configs:
- appId: wx5c43fde3c9733d9e # 改成你的 申请公众号测试 appid
secret: b8b217126c33a5fb7074927d5e72a81a # 改成你的 申请公众号测试 秘钥
token: meite # token的值需要改成你在公众号 配置的一样的
生产者
package com.mayikt.main.producer;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author 余胜军
* @ClassName LoginProducer
* @qq 644064779
* @addres www.mayikt.com
* 微信:yushengjun644
*/
@Component
@Slf4j
public class LoginProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 登录后续处理流程
*/
public void sendLoginFollowUp(Integer userId, String loginIp, Date loginTime, String loginToken,
String channel, String equipment) {
/**
* 1.交换机名称
* 2.路由key名称
* 3.发送内容
*/
JSONObject data = new JSONObject();
data.put("userId", userId);
data.put("loginIp", loginIp);
data.put("loginTime", loginTime);
data.put("loginToken", loginToken);
data.put("channel", channel);
data.put("equipment", equipment);
// data.put("openId", openid);
// data.put("phone", phone);
String dataJSON = data.toJSONString();
amqpTemplate.convertAndSend("/mayikt_ex", "", dataJSON);
log.info(">>登录之后投递mq消息,异步处理后续操作..dataJSON:{}<<", dataJSON);
}
}
package com.mayikt.main.config; /**
* 作 者 :蚂蚁课堂-余胜军
* 版 本 号 :v1.0.0.0
* ******************************************************************
* 版权由每特教育-蚂蚁课堂-余胜军所有 微信yushengjun644 QQ644064779
* 官方网址:www.mayikt.com
* ******************************************************************
* //----------------------------------------------------------------
*/
import com.rabbitmq.client.impl.AMQImpl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Queue;
/**
* RabbitmqConfig
*/
@Component
public class RabbitMQConfig {
/**
* 定义交换机
*/
private String EXCHANGE_MAYIKT_NAME = "/mayikt_ex";
/**
* 唯一登录队列 日志功能
*/
private String MAYIKT_UNIQUELOGIN_QUEUE = "fanout_uniquelogin_queue";
/**
* 微信模板队列
*/
private String MAYIKT_WECHAT_TEMPLATE_QUEUE = "fanout_wechattemplate_queue";
/**
* 配置MAYIKT_UNIQUELOGIN_QUEUE
*
* @return
*/
@Bean
public Queue fanoutUniqueloginQueue() {
return new Queue(MAYIKT_UNIQUELOGIN_QUEUE);
}
/**
* 配置MAYIKT_UNIQUELOGIN_QUEUE
*
* @return
*/
@Bean
public Queue fanoutWechattemplateQueue() {
return new Queue(MAYIKT_WECHAT_TEMPLATE_QUEUE);
}
/**
* 配置fanoutExchange
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_MAYIKT_NAME);
}
// 绑定交换机 fanoutUniqueloginQueue
@Bean
public Binding bindingUniqueLogFanoutExchange(Queue fanoutUniqueloginQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutUniqueloginQueue).to(fanoutExchange);
}
// 绑定交换机 Wechattemplate
@Bean
public Binding bindingWechattemplateFanoutExchange(Queue fanoutWechattemplateQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutWechattemplateQueue).to(fanoutExchange);
}
}
loginProducer.sendLoginFollowUp(sysUser.getId(), sysUserLoginLog.getLoginIp(), sysUserLoginLog.getLoginTime(),
sysUserLoginLog.getLoginToken(), sysUserLoginLog.getChannel(), sysUserLoginLog.getEquipment());
消费者
mayikt-admin-mq\mayikt-admin-consumer
将代码拷贝到mayikt-admin-consumer
mayikt-admin-consumer.rar
异步微信发送消息消费者
package com.mayikt.main.consumer;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mayikt.main.consumer.entity.SysUser;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.consumer.mapper.SysUserMapper;
import com.mayikt.main.manage.WechatTemplateManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 余胜军
* @ClassName WechatTemplateConsumer
* @qq 644064779
* @addres www.mayikt.com
* 微信:yushengjun644
*/
@Component
@RabbitListener(queues = "fanout_wechattemplate_queue")
@Slf4j
public class WechatTemplateConsumer {
@Autowired
private WechatTemplateManage wechatTemplateManage;
@Autowired
private SysUserMapper sysUserMapper;
// @RabbitHandler
// public void process(String msg) {
// log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
// JSONObject data = JSONObject.parseObject(msg);
// SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
// // 根据userid 获取手机号码和openid
// Integer userId = sysUserLoginLog.getUserId();
// SysUser sysUser = sysUserMapper.selectById(userId);
// if (sysUser == null) {
// return;
// }
// boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
// if (!result) {
// // 如果发送失败则 需要返回ack 失败告诉给mq 让它重试。
// return;
// }
// // 成功则告诉我们rabbitmq 让它从mq中将该消息从mq中移除
// }
@RabbitHandler
public void process(String msg, Message message, Channel channel) throws IOException {
log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
JSONObject data = JSONObject.parseObject(msg);
SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
// 根据userid 获取手机号码和openid
Integer userId = sysUserLoginLog.getUserId();
SysUser sysUser = sysUserMapper.selectById(userId);
if (sysUser == null) {
return;
}
boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
if (!result) {
// 如果发送失败则 需要返回ack 失败告诉给mq 让它重试。
return;
}
// int j = 1 / 0;
// 成功则告诉我们rabbitmq 让它从mq中将该消息从mq中移除
channel.basicAck(deliveryTag, true);// 手动ack成功
} catch (Exception e) {
log.error("e:{}", e);
// 拒绝签收
channel.basicNack(deliveryTag, true, true);
}
}
}
异步写入登录日志消费者
package com.mayikt.main.consumer;
import com.alibaba.fastjson.JSONObject;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.manage.LoginLogManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 余胜军
* @ClassName UniqueLoginConsumer
* @qq 644064779
* @addres www.mayikt.com
* 微信:yushengjun644
*/
@Component
@RabbitListener(queues = "fanout_uniquelogin_queue")
@Slf4j
public class UniqueLoginConsumer {
@Autowired
private LoginLogManage loginLogManage;
@RabbitHandler
public void process(String msg) {
log.info(">>使用MQ异步的形式记录日志:{}<<", msg);
SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
loginLogManage.asynLoginLog(sysUserLoginLog);
}
}
避免一直消费者消费失败一直重试 改成手动ack
1.rabbitmq 手动ack
1.自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。
2.手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。
3.根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。
优化消费者代码
配置文件中新增 开启手动ack
# 打印执行的sql语句
spring:
main:
allow-bean-definition-overriding: true
rabbitmq:
####连接地址
host: 192.168.75.139
####端口号
port: 5672
####账号
username: admin
####密码
password: admin
### 地址
virtual-host: /
listener:
simple: ## 修改配置为手动ack模式
acknowledge-mode: manual
package com.mayikt.main.consumer;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mayikt.main.consumer.entity.SysUser;
import com.mayikt.main.consumer.entity.SysUserLoginLog;
import com.mayikt.main.consumer.mapper.SysUserMapper;
import com.mayikt.main.manage.WechatTemplateManage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 余胜军
* @ClassName WechatTemplateConsumer
* @qq 644064779
* @addres www.mayikt.com
* 微信:yushengjun644
*/
@Component
@RabbitListener(queues = "fanout_wechattemplate_queue")
@Slf4j
public class WechatTemplateConsumer {
@Autowired
private WechatTemplateManage wechatTemplateManage;
@Autowired
private SysUserMapper sysUserMapper;
// @RabbitHandler
// public void process(String msg, Channel channel) {
// log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
// // 解析json
// SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
// Integer userId = sysUserLoginLog.getUserId();
// // 在用户userid实时查询对应手机号码和openid
// SysUser sysUser = sysUserMapper.selectById(userId);
// boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
// if (!result) {
// // 该msg 不能够从mq删除 mq服务器端给mq消费者触发重试策略
// // 手动抛出异常----
// return;
// }
// // 发送微信模板消息成功了 再通知mq服务器端删除该msg
// }
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
log.info(">>调用微信模板接口发送微信模板提醒:{}<<", msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 解析json
SysUserLoginLog sysUserLoginLog = JSONObject.parseObject(msg, SysUserLoginLog.class);
Integer userId = sysUserLoginLog.getUserId();
// 在用户userid实时查询对应手机号码和openid
SysUser sysUser = sysUserMapper.selectById(userId);
boolean result = wechatTemplateManage.sendLoginTemplate(sysUser, sysUserLoginLog);
if (!result) {
// 该msg 不能够从mq删除 mq服务器端给mq消费者触发重试策略
// 手动抛出异常----
// int j = 1 / 0;
channel.basicNack(deliveryTag, true, true); // 给mq发送通知 这个msg消息消费失败了 需要重试! 不会从mq删除
return;
}
// 发送微信模板消息成功了 再通知mq服务器端删除该msg
channel.basicAck(deliveryTag, true);// 手动ack成功 发送通知给 mq服务器端说 消费成功了 msg 从mq服务器端删除
} catch (Exception e) {
log.error("<e:>{}", e);
channel.basicNack(deliveryTag, true, true); // 给mq发送通知 这个msg消息消费失败了 需要重试! 不会从mq删除
}
}
/**
* 如果消费者消费msg 一直是失败---网络连接超时 可以重试
* 但是如果是代码bug 将该msg 存入在另外一个队列中(死信队列)中 重新变写新消费者消费。
*/
}
day06.rar
猜你喜欢
- 2025-05-22 @Async引发线上服务内存溢出如何处理
- 2025-05-22 RabbitMQ与Java集成的典型用例:从消息传递到任务调度的全面探索
- 2025-05-22 JAVA面试|Redis原理及应用场景
- 2025-05-22 并发编程:CompletableFuture异步编程没有那么难
- 2025-05-22 同步 vs 异步性能差100倍!SpringBoot3 高吞吐接口实现终极方案
- 2025-05-22 Java高并发处理的艺术:让程序飞起来!
- 2025-05-22 HttpClient的异步调用,你造吗?
- 2025-05-22 @Async:一个异步方法调用另一个异步方法难道不是异步吗?
- 2025-05-22 Serverless革命:Java函数计算性能突破
- 2025-05-22 使用Quarkus开发响应式REST API,异步异步异步
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)