专业的JAVA编程教程与资源

网站首页 > java教程 正文

Redis全栈应用实战:从缓存到分布式系统全场景解析

temp10 2025-05-08 05:45:53 java教程 8 ℃ 0 评论

Redis全栈应用实战:从缓存到分布式系统全场景解析

引言与Redis基础概述

Redis是一种高性能的键值存储数据库,因其出色的性能、灵活的数据结构和丰富的功能,已成为现代应用架构中不可或缺的组件。本文将全面介绍Redis的各种使用场景,从基础缓存应用到复杂的分布式系统支持,帮助开发者充分发挥Redis的潜力。

Redis核心特性

在深入具体场景前,让我们先了解Redis的核心特性:

Redis全栈应用实战:从缓存到分布式系统全场景解析

上图展示了Redis的六大核心特性:

1. 高性能:Redis采用内存存储,能够提供极高的读写速度,平均响应时间仅为几微秒。

2. 丰富数据结构:Redis不仅支持简单的键值对,还支持字符串、哈希、列表、集合、有序集合等多种数据结构。

3. 原子操作:Redis的所有操作都是原子性的,即使是复杂的操作也能保证数据的一致性。

4. 持久化支持:通过RDB和AOF机制,Redis可以将内存中的数据持久化到磁盘,防止数据丢失。

5. 分布式支持:Redis提供了主从复制、哨兵模式和集群模式,支持高可用和水平扩展。

6. 事务支持:Redis支持事务操作,可以确保多个命令的原子执行。

Redis作为缓存的应用场景

Redis最常见的应用场景是作为缓存系统。下面我们将探讨几种典型的缓存应用模式:

上图展示了Redis作为缓存的典型工作流程:

1. 客户端向应用服务器发送请求,要求获取数据。

2. 应用服务器首先查询Redis缓存。

3. 如果缓存命中(3a路径),直接返回缓存数据。

4. 如果缓存未命中(3b路径),应用服务器查询数据库。

5. 数据库返回查询结果后,应用服务器将数据更新到Redis缓存。

6. 最后,应用服务器将数据返回给客户端。

缓存穿透防御

缓存穿透是指查询一个不存在的数据,导致每次都要查询数据库,失去了缓存的意义。下面通过布隆过滤器来解决这个问题:

上图展示了使用布隆过滤器防止缓存穿透的工作流程:

1. 客户端请求数据。

2. 应用服务器先通过布隆过滤器检查ID是否存在。

3. 处理分为两种情况:

o 3a:如果布隆过滤器判断ID一定不存在(红色路径),则直接返回空结果,避免无谓的缓存和数据库查询。

o 3b:如果布隆过滤器判断ID可能存在(绿色路径),则继续执行后续步骤。

4. 查询Redis缓存。

5. 如果缓存未命中,查询数据库。

6. 数据库返回查询结果(可能为数据或空结果)。

7. 如有新数据,将ID添加到布隆过滤器并更新缓存。

布隆过滤器的代码实现:

相关依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.0</version>
</dependency>
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class ProductService {
    
    private final RedissonClient redisson;
    private final RBloomFilter<String> bloomFilter;
    private final ProductRepository productRepository;
    
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
        
        // 初始化Redisson客户端
        Config config = new Config();
        config.useSingleServer().setAddress("redis://IP:6379");
        this.redisson = Redisson.create(config);
        
        // 初始化布隆过滤器
        this.bloomFilter = redisson.getBloomFilter("product:bloom:filter");
        // 预期元素数量为100万,误判率为0.01
        bloomFilter.tryInit(1000000L, 0.01);
        
        // 预热布隆过滤器,加载所有产品ID
        initBloomFilter();
    }
    
    private void initBloomFilter() {
        // 从数据库中加载所有产品ID并添加到布隆过滤器
        productRepository.findAllProductIds().forEach(id -> 
            bloomFilter.add(String.valueOf(id))
        );
    }
    
    public Product getProductById(Long id) {
        String productId = String.valueOf(id);
        
        // 1. 通过布隆过滤器检查产品ID是否存在
        if (!bloomFilter.contains(productId)) {
            // 如果布隆过滤器确定ID不存在,直接返回null
            return null;
        }
        
        // 2. 检查Redis缓存 (使用Spring的@Cacheable注解)
        return getProductFromCache(id);
    }
    
    @Cacheable(value = "product", key = "#id", unless = "#result == null")
    public Product getProductFromCache(Long id) {
        // 3. 缓存未命中,从数据库获取
        Product product = productRepository.findById(id).orElse(null);
        
        // 4. 如果数据库中存在且之前未添加到布隆过滤器,则添加
        if (product != null && !bloomFilter.contains(String.valueOf(id))) {
            bloomFilter.add(String.valueOf(id));
        }
        
        return product;
    }
    
    // 新增产品时,同时添加到布隆过滤器
    public Product saveProduct(Product product) {
        Product savedProduct = productRepository.save(product);
        bloomFilter.add(String.valueOf(savedProduct.getId()));
        return savedProduct;
    }
}

上述代码展示了如何在Spring Boot应用中使用Redisson库实现布隆过滤器来防止缓存穿透。主要实现步骤:

1. 初始化Redisson客户端和布隆过滤器,设置预期元素数量和误判率。

2. 预热布隆过滤器,将所有现有产品ID加载到过滤器中。

3. 在查询产品时,先通过布隆过滤器检查ID是否存在。

4. 如果ID可能存在,则查询缓存,缓存未命中时查询数据库。

5. 新增产品时,同时将产品ID添加到布隆过滤器中。

测试代码及输出:

@SpringBootTest
public class ProductServiceTest {
    
    @Autowired
    private ProductService productService;
    
    @MockBean
    private ProductRepository productRepository;
    
    @Test
    public void testBloomFilterCachePenetration() {
        // 设置数据库中存在的产品
        Product existingProduct = new Product();
        existingProduct.setId(1L);
        existingProduct.setName("测试产品");
        
        // 模拟数据库返回
        when(productRepository.findById(1L)).thenReturn(Optional.of(existingProduct));
        when(productRepository.findById(999L)).thenReturn(Optional.empty());
        when(productRepository.findAllProductIds()).thenReturn(List.of(1L));
        
        // 第一次查询,应该能找到
        long startTime = System.nanoTime();
        Product product1 = productService.getProductById(1L);
        long endTime = System.nanoTime();
        
        System.out.println("查询存在的产品耗时: " + (endTime - startTime) / 1000000.0 + "ms");
        System.out.println("查询结果: " + product1);
        
        // 查询不存在的产品ID,应该被布隆过滤器拦截
        startTime = System.nanoTime();
        Product product2 = productService.getProductById(999L);
        endTime = System.nanoTime();
        
        System.out.println("查询不存在的产品耗时: " + (endTime - startTime) / 1000000.0 + "ms");
        System.out.println("查询结果: " + product2);
        
        // 验证数据库查询次数,对于不存在的ID,不应查询数据库
        verify(productRepository, times(1)).findById(1L);
        verify(productRepository, never()).findById(999L);
    }
}

/* 测试输出:
查询存在的产品耗时: 12.345ms
查询结果: Product{id=1, name='测试产品'}
查询不存在的产品耗时: 0.532ms
查询结果: null
*/

测试代码验证了布隆过滤器的有效性:

o 对于存在的产品ID,正常查询并返回结果。

o 对于不存在的产品ID,布隆过滤器直接拦截,避免了对数据库的查询,显著提高了性能。

Redis数据结构及其应用场景

Redis不仅是一个键值存储系统,更是一个数据结构服务器。它提供了多种数据结构,每种都有其特定的使用场景。

上图展示了Redis的七种主要数据结构及其典型应用场景:

1. String(字符串) :最基本的数据类型,可用于缓存对象、计数器、分布式锁和限流器等场景。

2. List(列表) :有序的字符串列表,适用于消息队列、最新动态列表和定长列表等场景。

3. Hash(哈希) :键值对集合,适合存储对象、购物车、用户属性和配置信息等。

4. Set(集合) :无序字符串集合,适用于标签系统、共同好友和黑白名单等场景。

5. Sorted Set(有序集合) :有分数的有序集合,适合实现排行榜、权重队列和延时队列等。

6. HyperLogLog:基数估计算法,用于大规模数据的唯一值统计,如UV统计。

7. Bitmap:位图数据结构,适用于用户活跃度等需要位操作的场景。

下面让我们深入了解一些数据结构的具体应用案例:

案例1:使用Sorted Set实现排行榜

相关 Maven 依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Set;

public class LeaderboardService {
    
    private final Jedis jedis;
    private final String leaderboardKey = "game:leaderboard";
    
    public LeaderboardService() {
        // 初始化Redis连接
        this.jedis = new Jedis("IP", 6379);
    }
    
    /**
     * 更新玩家分数
     * @param playerId 玩家ID
     * @param score 新得分
     * @return 是否成功更新
     */
    public boolean updateScore(String playerId, double score) {
        // 使用ZADD命令添加或更新分数(分数作为排序依据)
        Long result = jedis.zadd(leaderboardKey, score, playerId);
        return result != null;
    }
    
    /**
     * 获取玩家排名(从高到低)
     * @param playerId 玩家ID
     * @return 排名(从0开始), -1表示玩家不存在
     */
    public long getRank(String playerId) {
        // 使用ZREVRANK命令获取排名(按分数从高到低)
        Long rank = jedis.zrevrank(leaderboardKey, playerId);
        return rank != null ? rank : -1;
    }
    
    /**
     * 获取玩家分数
     * @param playerId 玩家ID
     * @return 玩家分数, -1表示玩家不存在
     */
    public double getScore(String playerId) {
        // 使用ZSCORE命令获取分数
        Double score = jedis.zscore(leaderboardKey, playerId);
        return score != null ? score : -1;
    }
    
    /**
     * 获取排行榜前N名
     * @param count 获取数量
     * @return 排名和分数集合
     */
    public Set<Tuple> getTopScores(int count) {
        // 使用ZREVRANGEWITHSCORES获取前N名(按分数从高到低)
        return jedis.zrevrangeWithScores(leaderboardKey, 0, count - 1);
    }
    
    /**
     * 获取某玩家附近的排名
     * @param playerId 玩家ID
     * @param range 范围(上下各取多少名)
     * @return 玩家附近的排名和分数
     */
    public Set<Tuple> getNearbyRanks(String playerId, int range) {
        long rank = getRank(playerId);
        if (rank < 0) return null;
        
        // 计算范围(确保不越界)
        long start = Math.max(0, rank - range);
        long end = rank + range;
        
        // 获取指定范围的排名
        return jedis.zrevrangeWithScores(leaderboardKey, start, end);
    }
    
    /**
     * 移除玩家
     * @param playerId 玩家ID
     * @return 是否成功移除
     */
    public boolean removePlayer(String playerId) {
        // 使用ZREM命令移除成员
        Long result = jedis.zrem(leaderboardKey, playerId);
        return result != null && result > 0;
    }
    
    /**
     * 关闭连接
     */
    public void close() {
        if (jedis != null) {
            jedis.close();
        }
    }
    
    // 测试方法
    public static void main(String[] args) {
        LeaderboardService leaderboard = new LeaderboardService();
        
        try {
            // 添加测试数据
            leaderboard.updateScore("player:1", 2500);
            leaderboard.updateScore("player:2", 3200);
            leaderboard.updateScore("player:3", 1800);
            leaderboard.updateScore("player:4", 4100);
            leaderboard.updateScore("player:5", 3600);
            
            // 获取排行榜前5名
            System.out.println("排行榜前5名:");
            Set<Tuple> topScores = leaderboard.getTopScores(5);
            int rank = 1;
            for (Tuple tuple : topScores) {
                System.out.println(rank + ". " + tuple.getElement() + " - " + tuple.getScore() + "分");
                rank++;
            }
            
            // 获取指定玩家排名和分数
            String targetPlayer = "player:3";
            long playerRank = leaderboard.getRank(targetPlayer);
            double playerScore = leaderboard.getScore(targetPlayer);
            System.out.println("\n玩家 " + targetPlayer + " 排名: " + (playerRank + 1) + ",分数: " + playerScore);
            
            // 获取玩家附近的排名
            System.out.println("\n玩家 " + targetPlayer + " 附近的排名:");
            Set<Tuple> nearbyRanks = leaderboard.getNearbyRanks(targetPlayer, 2);
            rank = (int)(playerRank - 2 + 1);
            if (rank < 1) rank = 1;
            for (Tuple tuple : nearbyRanks) {
                String highlight = tuple.getElement().equals(targetPlayer) ? " *" : "";
                System.out.println(rank + ". " + tuple.getElement() + " - " + tuple.getScore() + "分" + highlight);
                rank++;
            }
        } finally {
            leaderboard.close();
        }
    }
}

/* 执行结果:
排行榜前5名:
1. player:4 - 4100.0分
2. player:5 - 3600.0分
3. player:2 - 3200.0分
4. player:1 - 2500.0分
5. player:3 - 1800.0分

玩家 player:3 排名: 5,分数: 1800.0

玩家 player:3 附近的排名:
3. player:2 - 3200.0分
4. player:1 - 2500.0分
5. player:3 - 1800.0分 *
*/

上述代码演示了如何使用Redis的Sorted Set(有序集合)实现游戏排行榜功能。Sorted Set非常适合排行榜场景,因为它能够自动按分数排序,并支持范围查询。

主要功能包括:

1. 更新分数:使用ZADD命令添加或更新玩家分数。

2. 获取排名:使用ZREVRANK命令获取玩家在排行榜中的位置(从高分到低分)。

3. 获取分数:使用ZSCORE命令获取玩家的分数。

4. 获取前N名:使用ZREVRANGEWITHSCORES命令获取排行榜前N名。

5. 获取附近排名:获取指定玩家前后一定范围内的其他玩家排名。

执行结果显示了排行榜的工作效果,包括整体排名和特定玩家周围的排名情况。

案例2:使用Redis实现分布式锁

在分布式系统中,多个服务实例可能需要访问共享资源。分布式锁确保在任何时间点只有一个服务实例可以获取锁并访问资源。

上图展示了Redis分布式锁的工作流程:

1. 服务实例A使用SETNX命令尝试获取锁。

2. 如果锁不存在,Redis创建锁,A获取锁成功。

3. A获得锁后,访问共享资源(数据库、文件或API等)。

4. A操作完成后,使用DEL命令释放锁。

5. 服务实例B在A持有锁期间尝试获取锁,但获取失败,因为锁已存在。

6. B等待一段时间后重试获取锁,在A释放锁后可以成功获取。

7. 服务实例C检查锁是否超时,可以处理持有锁的客户端崩溃的情况。

Redis分布式锁代码实现:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

import java.util.Collections;
import java.util.UUID;

public class RedisDistributedLock {
    
    private final Jedis jedis;
    private final String lockKey;
    private String lockValue;
    private final int expireTime;  // 锁过期时间(秒)
    
    /**
     * 初始化分布式锁
     * @param host Redis主机
     * @param port Redis端口
     * @param lockKey 锁的键名
     * @param expireTime 锁的过期时间(秒)
     */
    public RedisDistributedLock(String host, int port, String lockKey, int expireTime) {
        this.jedis = new Jedis(host, port);
        this.lockKey = lockKey;
        this.expireTime = expireTime;
    }
    
    /**
     * 尝试获取锁
     * @param waitTime 等待时间(毫秒),0表示只尝试一次
     * @return 是否成功获取锁
     */
    public boolean tryLock(long waitTime) {
        long endTime = System.currentTimeMillis() + waitTime;
        // 生成唯一的锁标识
        lockValue = UUID.randomUUID().toString();
        
        SetParams params = new SetParams().nx().ex(expireTime);
        
        while (System.currentTimeMillis() < endTime || waitTime == 0) {
            String result = jedis.set(lockKey, lockValue, params);
            if ("OK".equals(result)) {
                return true;  // 成功获取锁
            }
            
            if (waitTime == 0) {
                break;  // 不等待则直接返回
            }
            
            try {
                Thread.sleep(100);  // 短暂休眠后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        
        return false;  // 在指定时间内未能获取锁
    }
    
    /**
     * 释放锁(使用Lua脚本确保原子性)
     * @return 是否成功释放锁
     */
    public boolean unlock() {
        if (lockValue == null) {
            return false;
        }
        
        // Lua脚本:只有键存在且值匹配时才删除键
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                         "return redis.call('del', KEYS[1]) else return 0 end";
        
        Object result = jedis.eval(
            script, 
            Collections.singletonList(lockKey),
            Collections.singletonList(lockValue)
        );
        
        return Long.valueOf(1L).equals(result);
    }
    
    /**
     * 关闭连接
     */
    public void close() {
        if (jedis != null) {
            jedis.close();
        }
    }
    
    // 测试方法
    public static void main(String[] args) {
        // 模拟多线程环境下使用分布式锁
        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                RedisDistributedLock lock = new RedisDistributedLock("IP", 6379, "resource:lock", 30);
                try {
                    System.out.println("线程" + threadId + " 尝试获取锁...");
                    boolean acquired = lock.tryLock(5000);  // 尝试获取锁,最多等待5秒
                    
                    if (acquired) {
                        System.out.println("线程" + threadId + " 成功获取锁");
                        // 模拟处理任务
                        System.out.println("线程" + threadId + " 正在处理共享资源...");
                        Thread.sleep(threadId * 1000 + 1000);  // 处理时间与线程ID相关
                        System.out.println("线程" + threadId + " 处理完成");
                        
                        // 释放锁
                        boolean released = lock.unlock();
                        System.out.println("线程" + threadId + " 释放锁: " + (released ? "成功" : "失败"));
                    } else {
                        System.out.println("线程" + threadId + " 无法获取锁");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.close();
                }
            }).start();
            
            // 稍微延迟启动下一个线程
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/* 测试输出示例:
线程0 尝试获取锁...
线程0 成功获取锁
线程0 正在处理共享资源...
线程1 尝试获取锁...
线程2 尝试获取锁...
线程0 处理完成
线程0 释放锁: 成功
线程1 成功获取锁
线程1 正在处理共享资源...
线程1 处理完成
线程1 释放锁: 成功
线程2 成功获取锁
线程2 正在处理共享资源...
线程2 处理完成
线程2 释放锁: 成功
*/

上述代码实现了一个基于Redis的分布式锁,具有以下特点:

1. 安全性

o 使用UUID作为锁值,确保只有获取锁的客户端才能释放锁。

o 使用Lua脚本确保检查锁和释放锁的原子性。

o 设置锁过期时间,防止客户端崩溃导致的死锁。

2. 实现方式

o 使用SET key value NX EX seconds命令(通过SetParams实现)获取锁。

o 锁释放时,通过Lua脚本确保只有锁的持有者才能释放锁。

3. 重试机制

o 支持设置等待时间,在锁被其他客户端持有时可以重试。

测试结果显示三个线程按顺序获取锁,每个线程只有在前一个线程释放锁之后才能获取锁,确保了共享资源的互斥访问。

Redis在消息队列中的应用

Redis的List类型和发布/订阅(Pub/Sub)功能可以用于实现轻量级的消息队列系统。

上图展示了Redis实现消息队列的两种主要模式:

1. List实现的队列(竞争消费模式)

o 生产者使用LPUSH命令将消息推送到列表头部。

o 消费者使用BRPOP命令从列表尾部阻塞式地获取消息。

o 多个消费者竞争同一个队列,每条消息只会被一个消费者处理。

o 消息在被处理前会在Redis中持久化存储,适合需要可靠性的场景。

2. Pub/Sub发布订阅模式(广播模式)

o 生产者使用PUBLISH命令发布消息到指定的频道。

o 消费者使用SUBSCRIBE命令订阅感兴趣的频道。

o 所有订阅者都会收到相同的消息,实现一对多的广播。

o 消息不会持久化,离线的订阅者无法收到之前的消息,适合实时性要求高的场景。

下面是使用Redis List实现的简单消息队列:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RedisMessageQueue {
    
    // List队列的实现
    public static class ListQueue {
        private final Jedis jedis;
        private final String queueKey;
        
        public ListQueue(String host, int port, String queueKey) {
            this.jedis = new Jedis(host, port);
            this.queueKey = queueKey;
        }
        
        /**
         * 发送消息到队列
         * @param message 消息内容
         * @return 队列长度
         */
        public long sendMessage(String message) {
            return jedis.lpush(queueKey, message);
        }
        
        /**
         * 接收消息(非阻塞)
         * @return 消息内容,如果队列为空则返回null
         */
        public String receiveMessage() {
            List<String> result = jedis.brpop(1, queueKey);
            if (result != null && result.size() > 1) {
                return result.get(1);  // 第一个元素是队列名,第二个是消息内容
            }
            return null;
        }
        
        /**
         * 关闭连接
         */
        public void close() {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
    
    // Pub/Sub模式的实现
    public static class PubSubQueue {
        private final Jedis publisherJedis;
        private final String channelName;
        
        public PubSubQueue(String host, int port, String channelName) {
            this.publisherJedis = new Jedis(host, port);
            this.channelName = channelName;
        }
        
        /**
         * 发布消息
         * @param message 消息内容
         * @return 接收消息的客户端数量
         */
        public long publishMessage(String message) {
            return publisherJedis.publish(channelName, message);
        }
        
        /**
         * 订阅频道
         * @param messageListener 消息监听器
         */
        public void subscribe(final MessageListener messageListener) {
            // 需要一个独立的Jedis连接来订阅
            final Jedis subscriberJedis = new Jedis(publisherJedis.getClient().getHost(), 
                                                   publisherJedis.getClient().getPort());
            
            // 创建一个线程处理订阅
            new Thread(() -> {
                try {
                    subscriberJedis.subscribe(new JedisPubSub() {
                        @Override
                        public void onMessage(String channel, String message) {
                            messageListener.onMessage(channel, message);
                        }
                    }, channelName);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        
        /**
         * 关闭连接
         */
        public void close() {
            if (publisherJedis != null) {
                publisherJedis.close();
            }
        }
        
        public interface MessageListener {
            void onMessage(String channel, String message);
        }
    }
    
    // 测试方法
    public static void main(String[] args) throws InterruptedException {
        // 测试List队列
        testListQueue();
        
        // 测试Pub/Sub模式
        testPubSubQueue();
    }
    
    private static void testListQueue() throws InterruptedException {
        final String queueKey = "test:list:queue";
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        
        // 创建生产者
        Runnable producer = () -> {
            ListQueue listQueue = new ListQueue("IP", 6379, queueKey);
            try {
                for (int i = 0; i < 5; i++) {
                    String message = UUID.randomUUID().toString().substring(0, 8) + "-Message-" + i;
                    listQueue.sendMessage(message);
                    System.out.println("Producer sent: " + message);
                    Thread.sleep(500);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                listQueue.close();
            }
        };
        
        // 创建两个消费者
        Runnable consumer1 = () -> {
            ListQueue listQueue = new ListQueue("IP", 6379, queueKey);
            try {
                while (true) {
                    String message = listQueue.receiveMessage();
                    if (message != null) {
                        System.out.println("Consumer1 received: " + message);
                    } else if (Thread.currentThread().isInterrupted()) {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                listQueue.close();
            }
        };
        
        Runnable consumer2 = () -> {
            ListQueue listQueue = new ListQueue("IP", 6379, queueKey);
            try {
                while (true) {
                    String message = listQueue.receiveMessage();
                    if (message != null) {
                        System.out.println("Consumer2 received: " + message);
                    } else if (Thread.currentThread().isInterrupted()) {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                listQueue.close();
            }
        };
        
        executorService.submit(consumer1);
        executorService.submit(consumer2);
        executorService.submit(producer);
        
        // 运行一段时间后关闭
        Thread.sleep(5000);
        executorService.shutdownNow();
        executorService.awaitTermination(1, TimeUnit.SECONDS);
        
        System.out.println("\n--- List Queue Test Completed ---\n");
    }
    
    private static void testPubSubQueue() throws InterruptedException {
        final String channelName = "test:pubsub:channel";
        
        // 创建发布者和订阅者
        PubSubQueue pubSubQueue = new PubSubQueue("IP", 6379, channelName);
        
        // 注册第一个订阅者
        pubSubQueue.subscribe((channel, message) -> 
            System.out.println("Subscriber1 received: " + message + " from " + channel)
        );
        
        // 注册第二个订阅者
        pubSubQueue.subscribe((channel, message) -> 
            System.out.println("Subscriber2 received: " + message + " from " + channel)
        );
        
        // 等待订阅者准备就绪
        Thread.sleep(1000);
        
        // 发布一些消息
        for (int i = 0; i < 5; i++) {
            String message = "Broadcast-Message-" + i;
            pubSubQueue.publishMessage(message);
            System.out.println("Publisher sent: " + message);
            Thread.sleep(500);
        }
        
        // 等待消息处理完成
        Thread.sleep(1000);
        pubSubQueue.close();
        
        System.out.println("\n--- Pub/Sub Test Completed ---\n");
    }
}

/* 运行结果示例:
Consumer1 received: a7e91eb5-Message-0
Producer sent: a7e91eb5-Message-0
Consumer2 received: 2c9e8f1b-Message-1
Producer sent: 2c9e8f1b-Message-1
Consumer1 received: 5a3d6c7e-Message-2
Producer sent: 5a3d6c7e-Message-2
Consumer2 received: 9b4f2d8a-Message-3
Producer sent: 9b4f2d8a-Message-3
Consumer1 received: 1e8c7b6a-Message-4
Producer sent: 1e8c7b6a-Message-4

--- List Queue Test Completed ---

Publisher sent: Broadcast-Message-0
Subscriber1 received: Broadcast-Message-0 from test:pubsub:channel
Subscriber2 received: Broadcast-Message-0 from test:pubsub:channel
Publisher sent: Broadcast-Message-1
Subscriber1 received: Broadcast-Message-1 from test:pubsub:channel
Subscriber2 received: Broadcast-Message-1 from test:pubsub:channel
Publisher sent: Broadcast-Message-2
Subscriber1 received: Broadcast-Message-2 from test:pubsub:channel
Subscriber2 received: Broadcast-Message-2 from test:pubsub:channel
Publisher sent: Broadcast-Message-3
Subscriber1 received: Broadcast-Message-3 from test:pubsub:channel
Subscriber2 received: Broadcast-Message-3 from test:pubsub:channel
Publisher sent: Broadcast-Message-4
Subscriber1 received: Broadcast-Message-4 from test:pubsub:channel
Subscriber2 received: Broadcast-Message-4 from test:pubsub:channel

--- Pub/Sub Test Completed ---
*/

上述代码展示了Redis实现消息队列的两种方式:

1. 基于List的队列(竞争消费模式):

o 生产者使用LPUSH命令发送消息到队列头部。

o 消费者使用BRPOP命令从队列尾部获取消息(先进先出)。

o 每条消息只会被一个消费者处理,适合任务分发场景。

o 运行结果显示两个消费者轮流处理消息,没有消息被重复消费。

2. 基于Pub/Sub的发布订阅模式(广播模式):

o 生产者使用PUBLISH命令发布消息到指定频道。

o 消费者通过SUBSCRIBE命令订阅频道接收消息。

o 所有订阅者都会收到相同的消息,适合广播通知场景。

o 运行结果显示两个订阅者都收到了所有消息。

Redis作为消息队列的优缺点:

优点

o 轻量级,无需额外部署专门的消息中间件。

o 易于集成,大多数应用已使用Redis作为缓存。

o 性能高,吞吐量大。

缺点

o 功能较为简单,缺乏高级特性如消息确认、重试、死信队列等。

o Pub/Sub模式不支持消息持久化,无法处理离线消息。

o 当消息量大时,可能影响Redis的其他功能。

Redis在分布式系统中的应用

Redis在分布式系统中扮演着至关重要的角色,不仅可以作为缓存和消息队列,还能提供分布式锁、限流和会话管理等关键功能。以下我们将探讨Redis在分布式系统中的几种关键应用场景。

Redis集群架构

单节点Redis虽然性能优秀,但在大规模分布式系统中,为了满足高可用性和扩展性需求,通常需要部署Redis集群。

上图展示了Redis的三种主要集群架构模式:

1. 主从复制模式

o 一个Master节点负责处理读写请求。

o 多个Slave节点从Master复制数据,提供读服务。

o 优点:配置简单,扩展读能力。

o 缺点:Master单点故障风险,不支持自动故障转移。

2. 哨兵(Sentinel)模式

o 在主从复制基础上增加了哨兵节点监控集群状态。

o 哨兵负责监控Redis节点、发现故障和自动故障转移。

o 当Master故障时,哨兵会选举一个Slave升级为新Master。

o 优点:提供高可用性,解决了主从复制的单点故障问题。

o 缺点:不解决数据分片问题,容量受单机限制。

3. Redis Cluster模式

o 数据自动分片到多个节点,每个节点管理部分哈希槽(hash slot)。

o 每个Master都有一个或多个Replica用于故障转移。

o 节点间通过"gossip协议"通信,实现集群状态共享。

o 优点:支持水平扩展,解决了容量限制,提供高可用性。

o 缺点:配置复杂,Client需要支持集群协议。

Redis在分布式限流中的应用

在高并发系统中,限流是一种保护机制,用于控制系统处理请求的速率,防止过载。Redis提供了多种方式实现分布式限流。

上图展示了基于Redis的分布式限流系统架构和三种常用的限流算法:

1. 系统架构

o 客户端发送请求到API网关。

o API网关中的限流组件与Redis交互,检查是否超出限流阈值。

o 如果未超出限制,请求被转发到后端微服务;否则请求被拒绝。

2. 限流算法

o 固定窗口计数器:简单高效,使用INCR命令在固定时间窗口内计数。

o 滑动窗口:使用Sorted Set记录请求时间戳,提供更平滑的限流效果。

o 令牌桶算法:使用Lua脚本实现,能够应对突发流量,平滑限流。

下面是一个基于Redis实现的令牌桶限流算法的代码示例:

import redis.clients.jedis.Jedis;
import java.util.Arrays;
import java.util.List;

/**
 * 基于Redis的令牌桶限流器
 */
public class RedisTokenBucketRateLimiter {
    
    private final Jedis jedis;
    private final String luaScript;
    
    /**
     * 初始化限流器
     * @param host Redis主机
     * @param port Redis端口
     */
    public RedisTokenBucketRateLimiter(String host, int port) {
        this.jedis = new Jedis(host, port);
        
        // Lua脚本实现令牌桶算法
        this.luaScript = 
            "local key = KEYS[1] " +
            "local rate = tonumber(ARGV[1]) " +              // 令牌生成速率
            "local capacity = tonumber(ARGV[2]) " +          // 桶容量
            "local now = tonumber(ARGV[3]) " +               // 当前时间
            "local requested = tonumber(ARGV[4]) " +         // 请求令牌数
            
            // 获取桶中的令牌数和上次更新时间
            "local lastRefillTime = tonumber(redis.call('hget', key, 'lastRefillTime') or '0') " +
            "local tokens = tonumber(redis.call('hget', key, 'tokens') or capacity) " +
            
            // 计算从上次更新到现在生成的令牌数
            "local timePassed = math.max(0, now - lastRefillTime) " +
            "local newTokens = math.min(capacity, tokens + (timePassed * rate / 1000)) " +
            
            // 如果令牌不足,返回0和等待时间
            "if newTokens < requested then " +
            "   local waitTime = math.ceil((requested - newTokens) * 1000 / rate) " +
            "   return {0, waitTime} " +
            "end " +
            
            // 如果令牌足够,更新令牌数和时间
            "redis.call('hset', key, 'tokens', newTokens - requested) " +
            "redis.call('hset', key, 'lastRefillTime', now) " +
            "redis.call('expire', key, math.ceil(capacity / rate) * 2) " +  // 设置过期时间
            "return {1, 0}";  // 获取令牌成功,等待时间为0
    }
    
    /**
     * 尝试获取令牌
     * @param key 限流器的键名(例如用户ID、API名称等)
     * @param tokensPerSecond 每秒生成的令牌数
     * @param capacity 令牌桶容量
     * @param requested 请求的令牌数
     * @return 限流结果,包含是否获取成功和需要等待的时间(毫秒)
     */
    public RateLimitResult acquire(String key, double tokensPerSecond, int capacity, int requested) {
        long now = System.currentTimeMillis();
        List<String> keys = Arrays.asList("rate-limiter:" + key);
        List<String> args = Arrays.asList(
            String.valueOf(tokensPerSecond),
            String.valueOf(capacity),
            String.valueOf(now),
            String.valueOf(requested)
        );
        
        // 执行Lua脚本
        @SuppressWarnings("unchecked")
        List<Long> result = (List<Long>) jedis.eval(luaScript, keys, args);
        
        // 解析结果
        boolean allowed = result.get(0) == 1;
        long waitTime = result.get(1);
        
        return new RateLimitResult(allowed, waitTime);
    }
    
    /**
     * 关闭连接
     */
    public void close() {
        if (jedis != null) {
            jedis.close();
        }
    }
    
    /**
     * 限流结果
     */
    public static class RateLimitResult {
        private final boolean allowed;
        private final long waitTimeMs;
        
        public RateLimitResult(boolean allowed, long waitTimeMs) {
            this.allowed = allowed;
            this.waitTimeMs = waitTimeMs;
        }
        
        public boolean isAllowed() {
            return allowed;
        }
        
        public long getWaitTimeMs() {
            return waitTimeMs;
        }
        
        @Override
        public String toString() {
            return "RateLimitResult{allowed=" + allowed +
                   ", waitTimeMs=" + waitTimeMs + '}';
        }
    }
    
    // 测试方法
    public static void main(String[] args) throws InterruptedException {
        RedisTokenBucketRateLimiter limiter = new RedisTokenBucketRateLimiter("IP", 6379);
        
        try {
            // 设置令牌桶参数:每秒生成2个令牌,桶容量为5
            String resourceKey = "api:test";
            double tokensPerSecond = 2.0;
            int capacity = 5;
            
            System.out.println("测试令牌桶限流器 - 速率: " + tokensPerSecond + "/秒, 容量: " + capacity);
            
            // 模拟突发请求
            for (int i = 1; i <= 10; i++) {
                RateLimitResult result = limiter.acquire(resourceKey, tokensPerSecond, capacity, 1);
                
                System.out.println("请求 #" + i + ": " + 
                                   (result.isAllowed() ? "通过" : "拒绝") + 
                                   (result.getWaitTimeMs() > 0 ? ", 需等待 " + result.getWaitTimeMs() + "ms" : ""));
                
                // 如果被限流,可以等待建议的时间再重试
                if (!result.isAllowed() && i <= 7) {  // 只对前7个请求执行等待
                    System.out.println("等待 " + result.getWaitTimeMs() + "ms 后重试...");
                    Thread.sleep(result.getWaitTimeMs());
                    
                    // 重试
                    result = limiter.acquire(resourceKey, tokensPerSecond, capacity, 1);
                    System.out.println("请求 #" + i + " 重试: " + 
                                      (result.isAllowed() ? "通过" : "拒绝"));
                }
                
                Thread.sleep(300);  // 每个请求间隔300ms
            }
        } finally {
            limiter.close();
        }
    }
}

/* 测试输出示例:
测试令牌桶限流器 - 速率: 2.0/秒, 容量: 5
请求 #1: 通过
请求 #2: 通过
请求 #3: 通过
请求 #4: 通过
请求 #5: 通过
请求 #6: 拒绝, 需等待 500ms
等待 500ms 后重试...
请求 #6 重试: 通过
请求 #7: 拒绝, 需等待 200ms
等待 200ms 后重试...
请求 #7 重试: 通过
请求 #8: 拒绝, 需等待 350ms
请求 #9: 拒绝, 需等待 650ms
请求 #10: 拒绝, 需等待 950ms
*/

上述代码实现了一个基于Redis的令牌桶限流器,具有以下特点:

1. 原理

o 令牌以固定速率生成,存入令牌桶中,桶满时不再生成新令牌。

o 每个请求需要从桶中获取一个或多个令牌才能被处理。

o 如果桶中令牌不足,请求将被拒绝或延迟处理。

2. 实现细节

o 使用Lua脚本确保获取令牌的原子性,避免竞态条件。

o 在Redis中使用Hash结构存储令牌数量和上次填充时间。

o 动态计算生成的令牌数,支持平滑的限流效果。

o 返回是否允许请求和需要等待的时间。

3. 优势

o 支持突发流量(桶容量允许的范围内)。

o 提供更平滑的限流效果,不像固定窗口那样有边界效应。

o 可以灵活设置令牌生成速率和桶容量,适应不同场景。

测试结果显示:当连续请求导致令牌不足时,限流器会拒绝请求并提供需要等待的时间,等待后重试则可能成功。

Redis在分布式会话管理中的应用

在分布式系统中,会话管理是一个常见的需求。使用Redis可以实现分布式会话管理,解决传统会话存储的单点问题。

上图展示了使用Redis实现分布式会话管理的架构和流程:

1. 架构

o 浏览器向负载均衡器发送请求,携带SessionID。

o 负载均衡器将请求分发到不同的应用服务器。

o 所有应用服务器共享Redis作为会话存储。

2. 会话管理流程

o 会话创建:用户首次访问时,生成唯一SessionID,将会话数据存入Redis,并设置过期时间。

o 会话读取:从请求中获取SessionID,根据ID从Redis获取会话数据。

o 会话更新:修改会话数据后,更新Redis中的数据并刷新过期时间。

o 会话销毁:用户登出或会话超时时,从Redis删除会话数据。

下面是一个使用Spring Session和Redis实现分布式会话管理的代码示例:

// 配置类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 1800) // 会话超时时间30分钟
public class RedisSessionConfig {
    
    @Value("${spring.redis.host}")
    private String redisHost;
    
    @Value("${spring.redis.port}")
    private int redisPort;
    
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(redisHost, redisPort);
        return new LettuceConnectionFactory(config);
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());
        
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        
        template.afterPropertiesSet();
        return template;
    }
}

// 控制器
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api/session")
public class SessionController {
    
    @GetMapping("/set")
    public Map<String, Object> setSessionAttribute(
            @RequestParam("key") String key,
            @RequestParam("value") String value,
            HttpSession session) {
        
        // 设置会话属性
        session.setAttribute(key, value);
        
        Map<String, Object> result = new HashMap<>();
        result.put("success", true);
        result.put("sessionId", session.getId());
        result.put("message", "Session attribute set successfully");
        result.put("serverInfo", System.getProperty("java.vm.name") + " " + 
                               System.getProperty("java.vm.version"));
        
        return result;
    }
    
    @GetMapping("/get")
    public Map<String, Object> getSessionAttribute(
            @RequestParam("key") String key,
            HttpSession session) {
        
        // 获取会话属性
        Object value = session.getAttribute(key);
        
        Map<String, Object> result = new HashMap<>();
        result.put("success", true);
        result.put("sessionId", session.getId());
        result.put("key", key);
        result.put("value", value);
        result.put("serverInfo", System.getProperty("java.vm.name") + " " + 
                               System.getProperty("java.vm.version"));
        
        return result;
    }
    
    @GetMapping("/info")
    public Map<String, Object> getSessionInfo(HttpSession session) {
        Map<String, Object> result = new HashMap<>();
        result.put("sessionId", session.getId());
        result.put("creationTime", session.getCreationTime());
        result.put("lastAccessedTime", session.getLastAccessedTime());
        result.put("maxInactiveInterval", session.getMaxInactiveInterval());
        result.put("serverInfo", System.getProperty("java.vm.name") + " " + 
                               System.getProperty("java.vm.version"));
        
        return result;
    }
    
    @GetMapping("/invalidate")
    public Map<String, Object> invalidateSession(HttpSession session) {
        String sessionId = session.getId();
        
        // 使会话失效
        session.invalidate();
        
        Map<String, Object> result = new HashMap<>();
        result.put("success", true);
        result.put("sessionId", sessionId);
        result.put("message", "Session invalidated successfully");
        
        return result;
    }
}

// 测试类
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SessionControllerTest {
    
    @LocalServerPort
    private int port;
    
    private TestRestTemplate restTemplate = new TestRestTemplate();
    
    @Test
    public void testDistributedSession() {
        String baseUrl = "http://localhost:" + port + "/api/session";
        HttpHeaders headers = new HttpHeaders();
        
        // 1. 设置会话属性
        ResponseEntity<Map> setResponse = restTemplate.exchange(
            baseUrl + "/set?key=username&value=testuser",
            HttpMethod.GET,
            new HttpEntity<>(headers),
            Map.class
        );
        
        Map<String, Object> setResult = setResponse.getBody();
        String sessionId = (String) setResult.get("sessionId");
        assertNotNull(sessionId);
        assertEquals(true, setResult.get("success"));
        
        // 获取会话Cookie
        String sessionCookie = setResponse.getHeaders().getFirst(HttpHeaders.SET_COOKIE);
        headers.set(HttpHeaders.COOKIE, sessionCookie);
        
        // 2. 获取会话属性
        ResponseEntity<Map> getResponse = restTemplate.exchange(
            baseUrl + "/get?key=username",
            HttpMethod.GET,
            new HttpEntity<>(headers),
            Map.class
        );
        
        Map<String, Object> getResult = getResponse.getBody();
        assertEquals(sessionId, getResult.get("sessionId"));
        assertEquals("testuser", getResult.get("value"));
        
        // 3. 获取会话信息
        ResponseEntity<Map> infoResponse = restTemplate.exchange(
            baseUrl + "/info",
            HttpMethod.GET,
            new HttpEntity<>(headers),
            Map.class
        );
        
        Map<String, Object> infoResult = infoResponse.getBody();
        assertEquals(sessionId, infoResult.get("sessionId"));
        
        // 4. 使会话失效
        ResponseEntity<Map> invalidateResponse = restTemplate.exchange(
            baseUrl + "/invalidate",
            HttpMethod.GET,
            new HttpEntity<>(headers),
            Map.class
        );
        
        Map<String, Object> invalidateResult = invalidateResponse.getBody();
        assertEquals(sessionId, invalidateResult.get("sessionId"));
        assertEquals(true, invalidateResult.get("success"));
        
        // 5. 尝试获取已失效的会话属性(应该是新会话)
        ResponseEntity<Map> getAfterInvalidateResponse = restTemplate.exchange(
            baseUrl + "/get?key=username",
            HttpMethod.GET,
            new HttpEntity<>(headers),
            Map.class
        );
        
        Map<String, Object> getAfterInvalidateResult = getAfterInvalidateResponse.getBody();
        // 应该是一个新的会话ID
        String newSessionId = (String) getAfterInvalidateResult.get("sessionId");
        assertNotNull(newSessionId);
        assertEquals(null, getAfterInvalidateResult.get("value"));
    }
}

/* 测试输出示例:
测试通过:
- 成功创建会话并在Redis中存储
- 从不同请求中获取相同会话
- 成功注销会话后,会话数据被删除
*/

// application.properties 配置
spring.redis.host=IP
spring.redis.port=6379
server.servlet.session.timeout=30m
spring.session.store-type=redis

上述代码展示了如何使用Spring Session和Redis实现分布式会话管理,主要组件包括:

1. 配置类(RedisSessionConfig)

o 使用@EnableRedisHttpSession注解启用Redis会话管理。

o 配置Redis连接工厂和RedisTemplate。

o 设置会话超时时间为30分钟。

2. 控制器(SessionController)

o /api/session/set:设置会话属性。

o /api/session/get:获取会话属性。

o /api/session/info:获取会话信息,如创建时间、最后访问时间等。

o /api/session/invalidate:使会话失效。

3. 测试类(SessionControllerTest)

o 测试设置会话属性,验证不同请求获取相同会话。

o 测试会话失效,验证数据被正确清除。

4. 应用配置

o 配置Redis连接信息和会话存储类型。

使用Spring Session的优点:

o 应用代码无需修改,透明地实现分布式会话。

o 自动处理会话数据的序列化和反序列化。

o 提供会话过期和清理机制。

o 支持多种会话存储方式,如Redis、JDBC、MongoDB等。

文章总结

Redis凭借其强大的性能和丰富的功能,已成为现代分布式系统中的关键组件。本文探讨了Redis的多种应用场景:

1. 缓存系统:使用Redis作为缓存层,减轻数据库负担,提高系统性能,同时通过布隆过滤器解决缓存穿透问题。

2. 数据结构服务:利用Redis丰富的数据结构实现各种功能,如使用Sorted Set开发排行榜、使用Hash存储对象数据等。

3. 分布式锁:通过SETNX命令和Lua脚本实现分布式锁,解决分布式系统中的并发控制问题。

4. 消息队列:使用List结构和Pub/Sub实现轻量级消息队列,满足消息传递需求。

5. 分布式系统支持

o 集群架构:支持主从复制、哨兵模式和Redis Cluster,提供高可用性和水平扩展能力。

o 分布式限流:利用Redis实现多种限流算法,保护系统不被过载。

o 分布式会话:将会话数据存储在Redis中,实现跨服务器的会话共享。

通过本文的介绍和示例,我们可以看到Redis不仅是一个简单的键值存储,更是一个功能丰富的数据平台,能够解决分布式系统中的多种复杂问题。在实际应用中,根据具体需求选择合适的Redis使用模式,可以极大地提升系统的性能、可靠性和扩展性。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表