RedisTemplate使用PipeLine

发布时间 2023-04-20 14:10:43作者: 何苦->

SpringBoot集成RedisTemplate

  1. 导入依赖
		<!--Redis-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

2.yaml文件配置
先查部署服务器上redis是集群还是单机
(1)单机

spring:
  redis:
     host: 127.0.0.1
     port: 7001
     timeout=5000
     #有密码加上密码
     password: 

(2)集群

  redis:
    cluster:
      nodes: 127.0.0.1:7001,127.0.0.1:7002   #这里可以配置多个

3.建议配置上"序列化",方便后期操作使用

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // key序列化
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        // value序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

没有序列化的数据存进去如下图所示,这是redis本身序列化后的数据

4.RedisTemplate使用和redis几种类型数据操作
(1)使用@Autowired注入RedisTemplate

@Autowired
RedisTemplate redisTemplate;

(2)String类型

//1、通过redisTemplate设置值和获取值
redisTemplate.boundValueOps("StringKey").set("StringValue");
//设置值的同时设置过期时间
redisTemplate.boundValueOps("StringKey").set("StringValue",30, TimeUnit.MINUTES);
//获取值
String str1 = (String) redisTemplate.boundValueOps("StringKey").get();

//2、通过BoundValueOperations设置值和获取值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
stringKey.set("StringVaule");
//设置值的同时设置过期时间
stringKey.set("StringValue",30, TimeUnit.MINUTES);
//获取值
BoundValueOperations stringKey = redisTemplate.boundValueOps("StringKey");
String str2 = (String) stringKey.get();

//3、通过ValueOperations设置值和获取值
ValueOperations ops = redisTemplate.opsForValue();
ops.set("StringKey", "StringVaule");
//设置值的同时设置过期时间
ops.set("StringValue","StringVaule",30, TimeUnit.MINUTES);
//获取值
ValueOperations ops = redisTemplate.opsForValue();
String str3 = (String) ops.get("StringKey");

//4、单独设置过期时间
redisTemplate.boundValueOps("StringKey").expire(30,TimeUnit.MINUTES); // 缓存30分钟
redisTemplate.expire("StringKey",30,TimeUnit.MINUTES);

//5、删除Key
Boolean result = redisTemplate.delete("StringKey");

//6、顺序递增和递减
redisTemplate.boundValueOps("StringKey").increment(3L);
redisTemplate.boundValueOps("StringKey").increment(-3L);

(3)List类型

//1、通过redisTemplate设置值
redisTemplate.boundListOps("listKey").leftPush("listLeftValue1");  //左进
redisTemplate.boundListOps("listKey").rightPush("listRightValue2");  //右进

//2、通过BoundValueOperations设置值
BoundListOperations listKey = redisTemplate.boundListOps("listKey");
listKey.leftPush("listLeftValue3"); //左进
listKey.rightPush("listRightValue4"); //右进

//3、通过ValueOperations设置值
ListOperations opsList = redisTemplate.opsForList();
opsList.leftPush("listKey", "listLeftValue5"); //左进
opsList.rightPush("listKey", "listRightValue6"); //右进

//4、设置过期时间
redisTemplate.boundValueOps("listKey").expire(30,TimeUnit.MINUTES);  // 缓存30分钟
redisTemplate.expire("listKey",30,TimeUnit.MINUTES); 

//5、获取List缓存全部内容
List listKey1 = redisTemplate.boundListOps("listKey").range(0, -1); 

//6、从左或从右弹出一个元素
String listKey2 = (String) redisTemplate.boundListOps("listKey").leftPop();  //从左侧弹出一个元素
String listKey3 = (String) redisTemplate.boundListOps("listKey").rightPop(); //从右侧弹出一个元素

(4)Hash类型

//1、通过redisTemplate设置值 
redisTemplate.boundHashOps("HashKey").put("HashKey1", "HashVaue1");
//获取值
Set keys1 = redisTemplate.boundHashOps("HashKey").keys();//获取Map的Key
List values1 = redisTemplate.boundHashOps("HashKey").values();//获取Map的values
Map entries = redisTemplate.boundHashOps("HashKey").entries();//map 键值对

//2、通过BoundValueOperations设置值
BoundHashOperations hashKey = redisTemplate.boundHashOps("HashKey");
hashKey.put("HashKey2", "HashVaue2");
//获取值
Set keys2 = hashKey.keys();//获取Map的Key
List values2 = hashKey.values();//获取Map的values
Map entries1 = hashKey.entries();//map 键值对


//3、通过ValueOperations设置值和获取值
HashOperations hashOps = redisTemplate.opsForHash();
hashOps.put("HashKey","HashKey3", "HashVaue3");
//获取值
Set keys3 = hashOps.keys();//获取Map的Key
List values3 = hashOps.values();//获取Map的values
Map entries2 = hashOps.entries();//map 键值对


//4、设置过期时间
redisTemplate.boundValueOps("HashKey").expire(30,TimeUnit.MINUTES);  // 缓存30分钟
redisTemplate.expire("HashKey",30,TimeUnit.MINUTES); 

(5)set类型

//1、通过redisTemplate设置值
redisTemplate.boundSetOps("setKey").add("setValue1", "setValue2", "setValue3");
//获取值
Set set1 = redisTemplate.boundSetOps("setKey").members();


//2、通过BoundValueOperations设置值
BoundSetOperations setKey = redisTemplate.boundSetOps("setKey");
setKey.add("setValue1", "setValue2", "setValue3");
//获取值
Set set2 = setKey.members();


//3、通过ValueOperations设置值
SetOperations setOps = redisTemplate.opsForSet();
setOps.add("setKey", "SetValue1", "setValue2", "setValue3");
//获取值
Set set3 = setOps.members("setKey");

//4、设置过期时间
redisTemplate.boundValueOps("setKey").expire(30,TimeUnit.MINUTES);  // 缓存30分钟
redisTemplate.expire("setKey",30,TimeUnit.MINUTES); 

5.存储新建文件夹
一个 ‘ :’相当于一个文件夹

//在新加入数据的时候 设置Key
String key = "152::TILAKE::";
//以Srting的为例   其他类型一样 修改key就可以
redisTemplate.boundValueOps(key).set("StringValue");

单机模式 使用PipeLine

引入依赖

<properties>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
	<spring-boot.version>2.1.0.RELEASE</spring-boot.version>
	<jedis.version>2.9.0</jedis.version>
</properties>
<dependencyManagement>
	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-dependencies -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>${spring-boot.version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>${jedis.version}</version>
</dependency>
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.password=
spring.redis.database=1

RedisTemplate执行PipeLine

@Test
public void testRedisPipeline() {
    List<Object> resultList = customRedisTemplate.executePipelined(
            new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    // 1、通过connection打开pipeline
                    connection.openPipeline();
                    // 2、给本次pipeline添加一次性要执行的多条命令

                    // 2.1、一个 set key value 的操作
                    byte[] key = "name".getBytes();
                    byte[] value = "qinyi".getBytes();
                    connection.set(key, value);

                    // 2.2、执行一个错误的命令
                    connection.lPop("xyzabc".getBytes());

                    // 2.3、mset 操作
                    Map<byte[], byte[]> tuple = new HashMap<>();
                    tuple.put("id".getBytes(), "1".getBytes());
                    tuple.put("age".getBytes(), "19".getBytes());
                    connection.mSet(tuple);

                    /**
                     * 1、不能关闭pipeline
                     * 2、返回值为null
                     */
                    // 3. 关闭 pipeline
                    // connection.closePipeline();
                    return null;
                }
            }
    );
    resultList.forEach(System.out::println);
}
true
null
true

集群模式下会报如下错:
java.lang.UnsupportedOperationException: Pipeline is currently not supported for JedisClusterConnection.

RedisTemplate批量获取值的2种方式

import org.springframework.util.StopWatch;
@Autowired
private RedisTemplate customRedisTemplate;
@Test
public void redisPipeline() {
    StopWatch stopWatch = new StopWatch("redis测试");

    stopWatch.start("初始化key");
    RedisSerializer keySerializer = customRedisTemplate.getKeySerializer();
    List<String> redisKey = new ArrayList<>();
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 100; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).name(i + "").build());
        redisKey.add("xxx-yyy-zzz:pipLine:" + i);
    }
    customRedisTemplate.opsForValue().multiSet(redisData);
    stopWatch.stop();

    stopWatch.start("multiGet");
    List multiGetResult = customRedisTemplate.opsForValue().multiGet(redisKey);
    stopWatch.stop();

    stopWatch.start("Pipeline");
    List<RedisUser> pipeLineResult = customRedisTemplate.executePipelined(
            new RedisCallback<RedisUser>() {
                @Override
                public RedisUser doInRedis(RedisConnection connection) throws DataAccessException {
                    for (String key : redisKey) {
                        connection.get(keySerializer.serialize(key));
                    }
                    return null;
                }
            }
    );
    stopWatch.stop();
    System.out.println(stopWatch.prettyPrint());
    System.out.println("pipeLineResult => " + pipeLineResult.size());
    System.out.println("multiGetResult => " + multiGetResult.size());
}
StopWatch 'redis测试': running time (millis) = 2240
-----------------------------------------
ms     %     Task name
-----------------------------------------
01210  054%  初始化key
00503  022%  multiGet
00527  024%  Pipeline

pipeLineResult => 100
multiGetResult => 100

Jedis 执行PipeLine

import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisConnectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
@Test
public void redisPipeline() throws IOException {
    RedisConnection conn = RedisConnectionUtils.getConnection(customRedisTemplate.getConnectionFactory());
    Jedis jedis = (Jedis) conn.getNativeConnection();

    // 批量插入
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 1000; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    Pipeline wirtePipeline = jedis.pipelined();
    for (Map.Entry<String, RedisUser> entry : redisData.entrySet()) {
        wirtePipeline.set(entry.getKey(), JSON.toJSONString(entry.getValue()));
    }
    wirtePipeline.sync();

    // 批量读取
    List<Response<byte[]>> responses = new ArrayList<>();
    Pipeline readPipeline = jedis.pipelined();
    for (String key : redisData.keySet()) {
        responses.add(readPipeline.get(key.getBytes(StandardCharsets.UTF_8)));
    }
    readPipeline.close();
    for (Response<byte[]> response : responses) {
        byte[] data = response.get();
        System.out.println(new String(data));
    }
}

集群模式 使用PipeLine

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RedisUser {

    private Integer userId;

    private String name;
}

集群模式执行PipeLine

@Test
public void testClusterPipLineGet() {
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 0; i < 1000; i++) {
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    customRedisTemplate.opsForValue().multiSet(redisData);
    List<RedisUser> valueList = (List<RedisUser>)clusterPipLineGet(Lists.newArrayList(redisData.keySet()));
    System.out.println("valueList =>\n " + JSON.toJSONString(valueList, true));
}
public List<?> clusterPipLineGet(List<String> redisKeyList) {
    RedisSerializer<String> keySerializer = customRedisTemplate.getKeySerializer();
    RedisSerializer<?> valueSerializer = customRedisTemplate.getValueSerializer();
    // Map<redis节点, hash到该接口的key>
    HashMap<RedisClusterNode, List<String>> nodeKeyMap = new HashMap<>();
    List<Object> resultList = new ArrayList<>();
    RedisClusterConnection redisClusterConnection = customRedisTemplate.getConnectionFactory().getClusterConnection();
    try {
        //通过计算每个key的槽点,并获取相应节点
        Iterable<RedisClusterNode> redisClusterNodes = redisClusterConnection.clusterGetNodes();
        for (RedisClusterNode redisClusterNode : redisClusterNodes) {
            RedisClusterNode.SlotRange slotRange = redisClusterNode.getSlotRange();
            for (String redisKey : redisKeyList) {
                int slot = JedisClusterCRC16.getSlot(redisKey);
                if (slotRange.contains(slot)) {
                    List<String> list = nodeKeyMap.get(redisClusterNode);
                    if (null == list) {
                        list = new ArrayList<>();
                        nodeKeyMap.putIfAbsent(redisClusterNode, list);
                    }
                    list.add(redisKey);
                }
            }
        }
        // 每个节点执行pipeLine命令
        for (Map.Entry<RedisClusterNode, List<String>> clusterNodeEntry : nodeKeyMap.entrySet()) {
            RedisClusterNode redisClusterNode = clusterNodeEntry.getKey();
            List<String> nodeKeyList = clusterNodeEntry.getValue();
            JedisPool jedisPool = ((JedisCluster) redisClusterConnection.getNativeConnection()).getClusterNodes().get(new HostAndPort(redisClusterNode.getHost(), redisClusterNode.getPort()).toString());
            Jedis jedis = jedisPool.getResource();
            List<Response<byte[]>> responses = new ArrayList<>();
            try {
                Pipeline pipeline = jedis.pipelined();
                for (String nodeKey : nodeKeyList) {
                    responses.add(pipeline.get(keySerializer.serialize(nodeKey)));
                }
                pipeline.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }
            for (Response<byte[]> response : responses) {
                byte[] data = response.get();
                resultList.add(valueSerializer.deserialize(data));
            }
        }
    } finally {
        RedisConnectionUtils.releaseConnection(redisClusterConnection, customRedisTemplate.getConnectionFactory());
    }
    return resultList;
}

集群模式下性能测试

@Test
public void redisPipeline() throws InterruptedException {
    StopWatch stopWatch = new StopWatch("redis测试");
    int testSize = 10000;
    /**
     * 批量插入 ??
     */
    stopWatch.start("构建key");
    ArrayList<String> redisKey = new ArrayList<>();
    redisKey.add("xxx-yyy-zzz:pipLine:0");
    HashMap<String, RedisUser> redisData = new HashMap<>();
    for (int i = 1; i <= testSize; i++) {
        redisKey.add("xxx-yyy-zzz:pipLine:" + i);
        redisData.put("xxx-yyy-zzz:pipLine:" + i, RedisUser.builder().userId(i).build());
    }
    redisKey.add("xxx-yyy-zzz:pipLine:20001" + testSize + 1);
    stopWatch.stop();
    /**
     * 批量获取 ??
     */
    stopWatch.start("multiSet");
    customRedisTemplate.opsForValue().multiSet(redisData);
    stopWatch.stop();
    /**
     *
     */
    stopWatch.start("multiGet");
    List<RedisUser> multiGetResult = (List<RedisUser>) customRedisTemplate.opsForValue().multiGet(redisKey);
    System.out.println("multiGetResult => " + multiGetResult.size());
    System.out.println(multiGetResult.get(testSize));
    System.out.println(multiGetResult.get(testSize + 1));
    stopWatch.stop();
    /**
     * 集群模式下的PipLine
     */
    stopWatch.start("clusterPipLineGet");
    List<?> pipeLineResult = clusterPipLineGet(redisKey);
    System.out.println("clusterPipLineGet => " + pipeLineResult.size());
    System.out.println(pipeLineResult.get(testSize));
    System.out.println(pipeLineResult.get(testSize + 1));
    stopWatch.stop();

    System.out.println(stopWatch.prettyPrint());
    TimeUnit.MINUTES.sleep(10);
}
multiGetResult => 10002
RedisUser(userId=10000, name=null)
null
clusterPipLineGet => 10002
RedisUser(userId=9998, name=null)
RedisUser(userId=10000, name=null)
StopWatch 'redis测试': running time (millis) = 27644
-----------------------------------------
ms     %     Task name
-----------------------------------------
00006  000%  构建key
03951  014%  multiSet
23610  085%  multiGet
00077  000%  clusterPipLineGet

multiGet

  • 获取到的值和入参的key是一一对应的,对应的key获取不到值就为null
  • multiGet在集群模式下并不是批量获取的