RedisTemplate Pipeline 封装和使用,优化Redis操作

发布时间 2023-04-05 19:54:05作者: 我有八千部下

前言

公司游戏项目服务端运行过程中,依赖 Redis 保存游戏过程数据。与客户端一次交互会对 Redis 进行大量的操作,在内网开发环境下测试,一次操作开销在 1-2 ms,建立连接和网络开销累积在一起大大降低了响应速度,这种场景下可以使用 Redis Pipeline 进行优化。

Redis Pipeline

Redis 服务本身并没有专门的 Pepeline 功能,而是客户端自行实现的一种交互方式。简单说就是与 Redis 服务器建立一次 Socket 连接,然后将多个操作指令发送给 Redis 服务器,并获取操作结果。一次连接,一次网络交互自然大大减少了开销。

思路

公司使用的是Spring中StringRedisTemplate提供的API,它底层已经实现了两种 Pipeline 操作,其最终都是建立连接,将设置的操作一次性提交给 Redis 服务器,然后获取到操作结果列表。只不过一种偏向底层操作,另一种经过了封装使用更加方便,我选择了后者。如下:

// 偏底层操作
public <T> T execute(RedisCallback<T> action) {
	return execute(action, isExposeConnection());
}
	
// 经过封装
public List<Object> executePipelined(final SessionCallback<?> session) {
	return executePipelined(session, valueSerializer);
}

但是,实际使用过程中,对缓存的操作往往和业务逻辑互相穿插,将代码写在 SessionCallback 实现中,代码扩展性和易读性会大大降低,再次封装是非常有必要的。

封装大致思路是,实现一个管道对象,通过管道对象记录需要执行的操作,最后调用 StringRedisTemplate 实现的 Pipeline 来提交这些操作并获取返回值。

实现

通过调用封装的方法,接收参数生成 Consumer 函数,函数记录了接收的参数,并记录需要执行的对应 Jedis RedisOperations 方法,放入操作队列中。最终管道提交的时候会将队列中的函数全部执行。

生成 Consumer 函数的同时,会返回一个 Supplier 函数,函数记录了获取返回值的方式,在管道提交后通过它来获取返回值。其实它只是记录了操作在队列中的序号,根据序号获取结果列表中对应的结果。关键代码如下:

import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

/**
 * Redis 管道操作,利用一次与Redis服务器交互,执行多个操作并获取返回结果
 * 步骤:
 * 1、构建管道对象,利用 **Ops 对象添加 Redis 操作,添加操作会返回 Supplier 函数
 * 2、利用 Supplier 函数获取对应操作返回值
 * 3、调用 Supplier 函数或者执行 execute() 都会提交管道中的操作
 */
public class StringRedisPipeline {

    /**
     * 为了避免一次性执行的操作数太多,占用过多 Redis 服务器缓存空间。程序会按批次提交,这个值代表一批操作最大数量
     */
    private static final int OPERATION_BATCH_SIZE = 50;

    private final StringRedisTemplate stringRedisTemplate;

    /**
     * 操作队列
     */
    private final Queue<Consumer<RedisOperations>> operationSetterQueue;

    /**
     * 操作结果
     */
    private final List<Object> opsResults;

    /**
     * 操作结果索引
     */
    private int opsResultIndex;

    private ValueOperations valueOperations;
    private HashOperations hashOperations;
    private ListOperations listOperations;
    private SetOperations setOperations;
    private ZSetOperations zSetOperations;
    private HyperLogLogOperations hyperLogLogOperations;
    private GeoOperations geoOperations;
    private StreamOperations streamOperations;

    /**
     * 构建一个管道对象
     */
    public static StringRedisPipeline build(StringRedisTemplate stringRedisTemplate) {
        return new StringRedisPipeline(requireNonNull(stringRedisTemplate));
    }

    /**
     * Pipeline 执行所有操作
     *
     * @return true:成功执行,false:管道中无操作
     */
    public boolean execute() {
        if (this.operationSetterQueue.isEmpty()) {
            return false;
        }
        for (List<Consumer<RedisOperations>> operationSetters = splitABatchOfOperation();
             !operationSetters.isEmpty(); operationSetters = splitABatchOfOperation()) {
            this.opsResults.addAll(executeWithPipeline(operationSetters));
        }
        return true;
    }

    /**
     * {@link RedisOperations#delete(Object)}
     */
    public StringRedisPipeline delete(String key) {
        addOperation(operations -> operations.delete(key));
        return this;
    }

    /**
     * {@link RedisOperations#delete(Collection)}
     */
    public Supplier<Long> delete(Collection<String> keys) {
        return addOperation(operations -> operations.delete(keys));
    }

    /**
     * {@link RedisOperations#keys(Object)}
     */
    public Supplier<Set<String>> keys(String pattern) {
        return addOperation(operations -> operations.keys(pattern));
    }

    public ValueOperations opsForValue() {
        if (valueOperations == null) valueOperations = new ValueOperations(this);
        return valueOperations;
    }

    public HashOperations opsForHash() {
        if (hashOperations == null) hashOperations = new HashOperations(this);
        return hashOperations;
    }

    public ListOperations opsForList() {
        if (listOperations == null) listOperations = new ListOperations(this);
        return listOperations;
    }

    public SetOperations opsForSet() {
        if (setOperations == null) setOperations = new SetOperations(this);
        return setOperations;
    }

    public ZSetOperations opsForZSet() {
        if (zSetOperations == null) zSetOperations = new ZSetOperations(this);
        return zSetOperations;
    }

    public HyperLogLogOperations opsForHyperLogLog() {
        if (hyperLogLogOperations == null) hyperLogLogOperations = new HyperLogLogOperations(this);
        return hyperLogLogOperations;
    }

    public GeoOperations opsForGeo() {
        if (geoOperations == null) geoOperations = new GeoOperations(this);
        return geoOperations;
    }

    public StreamOperations opsForStream() {
        if (streamOperations == null) streamOperations = new StreamOperations(this);
        return streamOperations;
    }

    <T> Supplier<T> addOperation(Consumer<RedisOperations> operationSetter) {
        this.operationSetterQueue.offer(operationSetter);
        int resultIndex = this.opsResultIndex++;
        return () -> {
            if (resultIndex >= this.opsResults.size()) this.execute();
            return (T) opsResults.get(resultIndex);
        };
    }

    /**
     * 按配置大小分割出一组操作
     */
    private List<Consumer<RedisOperations>> splitABatchOfOperation() {
        if (operationSetterQueue.isEmpty()) {
            return Collections.emptyList();
        }
        List<Consumer<RedisOperations>> operationSetters = new LinkedList<>();
        for (int i = OPERATION_BATCH_SIZE; i > 0 && !operationSetterQueue.isEmpty(); i--) {
            operationSetters.add(operationSetterQueue.poll());
        }
        return operationSetters;
    }

    /**
     * 通过 Redis 管道执行所有操作
     *
     * @return 操作结果
     */
    private List<Object> executeWithPipeline(List<Consumer<RedisOperations>> operationSetters) {
        return this.stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) {
                operationSetters.forEach(operationSetter -> operationSetter.accept(operations));
                return null;
            }
        });
    }

    private StringRedisPipeline(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = requireNonNull(stringRedisTemplate);
        this.operationSetterQueue = new LinkedList<>();
        this.opsResults = new ArrayList<>();
    }
}

使用例子

/**
 * 部分无返回值的操作可以采用流式编程调用管道
 */
@Test
void add() {
  StringRedisPipeline jedisPipeline = StringRedisPipeline.build(stringRedisTemplate);
  jedisPipeline.opsForValue()
    .set(KEY_PREFIX + "key1", "1")
    .set(KEY_PREFIX + "key2", "2")
    .and().opsForHash()
    .put(KEY_PREFIX + "key3", "hashKey1", "1")
    .put(KEY_PREFIX + "key3", "hashKey2", "2")
    .execute();
}

/**
 * 多个获取返回值的操作通过管道执行
 */
@Test
void get() {
  StringRedisPipeline jedisPipeline = StringRedisPipeline.build(stringRedisTemplate);
  Supplier<String> valu1 = jedisPipeline.opsForValue().get(KEY_PREFIX + "key1");
  Supplier<String> valu2 = jedisPipeline.opsForValue().get(KEY_PREFIX + "key2");
  Supplier<String> hashValue1 = jedisPipeline.opsForHash().get(KEY_PREFIX + "key3", "hashKey1");
  Supplier<String> hashValue2 = jedisPipeline.opsForHash().get(KEY_PREFIX + "key3", "hashKey2");
  assertEquals("1", valu1.get());
  assertEquals("2", valu2.get());
  assertEquals("1", hashValue1.get());
  assertEquals("2", hashValue2.get());
}

全部源码见 GitHub 项目 Spring Redis Pipeline