Nosql
为什么使用Nosql
我们处于大数据时代!!普通的数据库无法进行数据分析
数据库历史发展
单机MySQL时代
单机MySQL时代(90年代),一个网站的访问量一般不会太大,单个数据库完全够用
出现的问题:
- 数据量增加到一定程度,单机数据库就放不下了
- 数据的索引(B+ Tree),一个机器内存也存放不下
- 访问量变大后(读写混合),一台服务器承受不住。
MySQL+MemCache(缓存)+垂直拆分(读写分离)
将单个数据库扩展成为多个“垂直”的数据库,同时满足读写分离和数据一致性。但是当访问量过大时频繁去数据库查询速度和效率低下。这个优化过程经历了以下几个过程:
- 优化数据库的数据结构和索引(难度大)
- 文件缓存,通过IO流获取比每次都访问数据库效率略高,但是流量爆炸式增长时候,IO流也承受不了
- MemCache,当时最热门的技术,通过在数据库和数据库访问层之间加上一层缓存,第一次访问时查询数据库,将结果保存到缓存,后续的查询先检查缓存,若有直接拿去使用,效率显著提升。
分库分表+主从复制(水平拆分)+Mysql集群
数据库发展中:本质是对数据库的读和写
数据库引擎
- MylSAM:在早些年使用,表锁,十分影响效率。高并发下会出现严重的问题
- Innodb:行锁
慢慢的就开始使用分库分表来解决写的压力,Mysql在那个年代推出了表分区,这个并没有多少公司使用
Mysql集群,很好的满足了那个年代的所有请求
如今最近的年代
如今信息量井喷式增长,各种各样的数据出现(用户定位数据,图片数据等),大数据的背景下关系型数据(RDBMS)无法满足大量数据要求。Nosql数据库就能轻松解决这些问题。
什么是Nosql
Nosql = Not Only SQL 不仅仅是数据库,泛指非关系型数据库。
关系型数据库:列+行,同一个表下数据的结构是一样的。
非关系型数据库:数据存储没有固定的格式,并且可以进行横向扩展。(用户个人信息、社交网络、地理位置,这些数据不需要固定格式,使用map<key,value>键值对来控制)
Nosql特点
解耦!
- 数据之间没有关系,方便扩展
- 大数据量下,性能高。(一秒写入8W次 读取11W次)
- 数据类型是多样性(不需要事先设计数据库,感受一下Mysql设计表和库的痛苦)
传统 RDBMS(关系型数据库) 和 NoSQL
传统的 RDBMS
- 结构化组织 - SQL
- 数据和关系都存在单独的表中 row col
- 数据操作,数据定义语言
- 严格的一致性
- 基础的事务
- …
Nosql
- 不仅仅是数据
- 没有固定的查询语言
- 键值对存储,列存储,文档存储,图形数据库(社交关系)
- 最终一致性,
- CAP定理和BASE (异地多活)
- 高性能,高可用,高可扩
- …
了解:3V+3高
大数据时代的3V:主要是描述问题的
- 海量Volume
- 多样Variety
- 实时Velocity
大数据时代的3高:主要是对程序的要求
- 高并发
- 高可扩
- 高性能
真正在公司中的实践:NoSQL + RDBMS 一起使用才是最强的,阿里巴巴的架构演进!
阿里巴巴演进分析
思考问题:淘宝网站数据,这么多东西难道都是在一个数据库中的吗?
如果你未来相当一个架构师: 没有什么是加一层解决不了的!
1、商品的基本信息
- 名称、价格、商家信息;
- 关系型数据库就可以解决了! MySQL / Oracle
- 淘宝内部的 MySQL 不是大家用的 MySQL
2、商品的描述、评论(文字比较多
- 文档型数据库中,MongoDB
3、图片
分布式文件系统 FastDFS
- 淘宝自己的 TFS
- Gooale的 GFS
- Hadoop HDFS
- 阿里云的 oss
4、商品的关键字 (搜索)
- 搜索引擎 solr elasticsearch
- ISerach:多隆(多去了解一下这些技术大佬!)
5、商品热门的波段信息
- 存数据库
- Redis Tair、Memache…
6、商品的交易,外部的支付接口
- 三方应用
大型互联网应用问题:
- 数据类型太多了!
- 数据源繁多,经常重构!
- 数据要改造,大面积改造?
解决问题:
NoSQL的四大分类
KV键值对
- sina:Redis
- 美团:Redis+Tair
- alibaba、baidu:Redis+Memcache
文档型数据库(bson格式和json一样)
- MongoDB(掌握)
- MongoDB 是一个基于分布式文件存储的数据库,C++ 编写,主要用来处理大量的文档!
MongoDB 是一个介于关系型数据库和非关系型数据中中间的产品!MongoDB 是非关系型数
据库中功能最丰富,最像关系型数据库的!
- MongoDB 是一个基于分布式文件存储的数据库,C++ 编写,主要用来处理大量的文档!
- ConthDB
列存储数据库
- HBase
- 分布式文件系统
图关系数据库
- Neo4j
- InfoGrid
他不是存图形,放的是关系,比如:朋友圈社交网络,广告推荐!
四者对比
Redis入门
概述
Redis(Remote Dictionary Server ),即远程字典服务 !
是一个免费、开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
是当下最热门的NoSQL技术之一,也被人们称之为结构化数据库
Redis能干什么?
- 在内存存储,能够持久化,内存是断电即失、所以持久化很重要(RDB、AOF)
- 高效率、用于高速缓冲
- 发布订阅系统(能实现小的消息队列)
- 地图信息分析
- 计时器、计数器(eg:浏览量)
- .......
特性
- 多样的数据类型
- 持久化
- 集群
- 事务
学习中需要用到的东西
-
狂神的公众号:狂神说,笔记在里面
-
下载地址:通过官网下载即可
Redis推荐都是在Linux服务器上搭建的,我们是基于Linux学习!
windows下载在github上,停更很久了
环境搭建
windows安装
下载安装包
下载地址:https://github.com/tporadowski/redis/releases
和视频同步的版本是3.2.0。下载压缩包,解压即安装
解压
解压到自己想要安装的位置,redis十分的小,只有5M
运行服务
运行成功
也可以指定配置文件启动
redis的windows版本有两个配置文件
使用redis.windows.conf配置文件
打开redis.windows.conf配置文件,修改端口为6380,用来查看启动时配置文件是否生效
通过命令行指定配置文件启动
redis-server.exe redis.windows.conf
配置文件生效
客户端连接测试
使用redis客户端连接redis服务
运行客户端
测试连接
测试设置基本值(键值对)和获取值
官网推荐还是在linux下使用redis
linux安装
下载安装包
在官网上,点击下面链接下载
视频使用的是5.0.8版本
解压
解压Redis的安装包,
程序文件一般放在:/opt 目录下
进入解压后的文件,可以看到我们redis的配置文件
Redis的重要配置文件:redis.conf
基本的环境安装
yum install gcc-c++ (基本命令的环境安装,c++写的)
make (把所有命令安装上,安装环境,第一次安装比较慢)
make install (确认命令的安装)
redis的默认安装路径 /usr/local/bin
将redis配置文件。复制到我们当前目录下
redis默认不是后台启动的,修改配置文件(改为yes,变成后台启动)
启动Redis服务!
[root@MyCentOS8 bin]# redis-server rconfig/redis.conf
使用redis-cli 进行连接测试!
查看redis的进程是否开启!
如何关闭Redis服务呢? shutdown
再次查看进程是否存在
后面我们会使用单机多Redis启动集群测试!
测试性能
redis-benchmark
是一个压力测试工具!
官方自带的性能测试工具!redis-benchmark 命令参数!
测试如下
# 测试:100个并发连接 100000请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
截取了以下一部分:
如何查看这些分析呢
基础的知识(基本命令)
数据库
redis默认有16个数据库(默认使用的是第0个),可以查看配置文件
切换数据库:select index(数据库索引)
127.0.0.1:6379> select 3 # 切换数据库
OK
127.0.0.1:6379[3]> DBSIZE # 查看数据库的大小
(integer) 0
查看当前数据库所有key:keys *
127.0.0.1:6379[3]> keys *
1) "name"
清除当前数据库:flushdb
127.0.0.1:6379[3]> flushdb
OK
清除全部数据库的内容:flushall
127.0.0.1:6379[3]> flushall
OK
思考:为什么redis是 6379!粉丝效应!(了解一下即可!)
Redis 是单线程的!
明白Redis是很快的,官方表示,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了!所有就使用了单线程了!
Redis 是C 语言写的,官方提供的数据为 100000+ 的QPS,完全不比同样是使用 key-vale的Memecache差!
Redis 为什么单线程还这么快?
- 误区1:高性能的服务器一定是多线程的?
- 多线程(CPU上下文会切换!)一定比单线程效率高!
速度:CPU>内存>硬盘
核心:redis 是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!!!),对于内存系统来说,如果没有上下文切换效率就是最高的!多次读写都是在一个CPU上的,在内存情况下,这个就是最佳的方案!
五大基本数据类型
官网介绍:
翻译:
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件MQ。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的磁盘持久化(persistence), 并通过Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)
Redis-Key
基本命令(常用)
# 查看所有的key
127.0.0.1:6379> KEYS *
(empty array)
# set key
127.0.0.1:6379> set name xiaor999
OK
127.0.0.1:6379> set age 30
OK
# KEYS *
127.0.0.1:6379> KEYS *
1) "name"
2) "age"
# 判断当前的key是否存在
127.0.0.1:6379> EXISTS name
(integer) 1
127.0.0.1:6379> EXISTS name123
(integer) 0
# 设置key的过期时间,单位是秒
127.0.0.1:6379> EXPIRE name 10
(integer) 1
# 查看当前key的剩余时间
127.0.0.1:6379> ttl name
(integer) 6
127.0.0.1:6379> ttl name
(integer) 3
127.0.0.1:6379> ttl name
(integer) -2
# 获取key值
127.0.0.1:6379> get name
(nil)
# 查看当前key的一个类型!
127.0.0.1:6379> type age
string
# 移除当前的key 1代表第一个数据库(库的索引是0)
127.0.0.1:6379> MOVE age 1
(integer) 1
后面如果遇到不会的命令,可以在官网查看帮助文档!
中文网的命令:http://www.redis.cn/
Redis不区分大小命令
String(字符串)
90%的程序员使用redis只会使用string类型。==api调用工程师
追加、获取长度
# 设置值
127.0.0.1:6379> SET key1 val1
OK
# 获得值
127.0.0.1:6379> get key1
"val1"
# 判断某一个key是否存在
127.0.0.1:6379> EXISTS key1
(integer) 1
# 获得所有的key
127.0.0.1:6379> KEYS *
1) "key1"
=========================================================================================
# 追加字符串,如果当前key不存在,就相当于setkey
127.0.0.1:6379> APPEND key1 hello
(integer) 9
127.0.0.1:6379> get key1
"val1hello"
# 获取字符串的长度!
127.0.0.1:6379> STRLEN key1
(integer) 9
自增 自减
自增1:INCR KEY
自减1:DECR KEY
按步长增加:INCRBY KEY INCREMENT
按步长减少:DECRBY KEY INCREMENT
举例:
# 设置初始浏览量为0
127.0.0.1:6379> set num 0
OK
127.0.0.1:6379> get num
"0"
=========================================================================================
# 自增1 浏览量加1
127.0.0.1:6379> INCR num
(integer) 1
127.0.0.1:6379> INCR num
(integer) 2
127.0.0.1:6379> get num
"2"
# 自减1 浏览量减1
127.0.0.1:6379> DECR num
(integer) 1
127.0.0.1:6379> DECR num
(integer) 0
127.0.0.1:6379> DECR num
(integer) -1
127.0.0.1:6379> GET num
"-1"
# 设置自增步长,指定增量!
127.0.0.1:6379> INCRBY num 10
(integer) 9
127.0.0.1:6379> INCRBY num 11
(integer) 20
# 设置自减步长,指定减量!
127.0.0.1:6379> DECRBY num 5
(integer) 15
127.0.0.1:6379> GET num
"15"
字符串范围
获取指定范围的字符串:GETRANGE KEY START END
举例:
127.0.0.1:6379> set mykey "xiaor,hellojava"
OK
127.0.0.1:6379> get mykey
"xiaor,hellojava"
=========================================================================================
# 截取字符串[0,3],闭区间
127.0.0.1:6379> GETRANGE mykey 0 3
"xiao"
# 获取全部的字符串 和get key是一样的
127.0.0.1:6379> GETRANGE mykey 0 -1
"xiaor,hellojava"
替换
语法:
替换指定位置开始的字符串:SETRANGE KEY OFFSET VALUE
举例:
127.0.0.1:6379> set xiaokey asdfghjkl
OK
127.0.0.1:6379> get xiaokey
"asdfghjkl"
=========================================================================================
# 替换指定位置开始的字符串!
127.0.0.1:6379> SETRANGE xiaokey 2 hello
(integer) 9
127.0.0.1:6379> get xiaokey
"ashellokl"
设置过期时间
# 设置key的值,30秒后过期。如果当前key存在,则会覆盖当前key的值,如果不存在则创建key
127.0.0.1:6379> SETEX rrkey 30 "redis,java"
OK
# 如果key不存在,创建key(1代表创建成功)
127.0.0.1:6379> SETNX bbkey "php,cc"
(integer) 1
# 如果key存在,创建失败(0代表创建失败)
127.0.0.1:6379> SETNX bbkey "html,vue"
(integer) 0
127.0.0.1:6379> get bbkey
"php,cc"
设置、获取多个值
语法:
同时设置多个值:MSET KEY VALUE [KEY VLAUE......]
同时获取多个值:MGET KEY [KEY......]
举例:
# mset 同时设置多个值
127.0.0.1:6379> MSET key1 val1 key2 val2 key3 val3
OK
127.0.0.1:6379> keys *
1) "key2"
2) "key3"
3) "key1"
# mget 同时获取多个值
127.0.0.1:6379> MGET key1 key2 key3
1) "val1"
2) "val2"
3) "val3"
# msetnx 不存在则设置,存在则设置失败,是一个原子性的操作,要么一起成功,要么一起失败
# key1存在,key4不存在。因为key1存在,所以返回0(设置失败)
127.0.0.1:6379> MSETNX key1 val123 key4 val4
(integer) 0
127.0.0.1:6379> get key4
(nil)
设置对象
方式一
语法:
设置对象:SET KEY1:KEY2:KEY3 VALUE
举例:
# 设置一个“user:1”的对象 值是一个json字符来保存对象!
127.0.0.1:6379> set user:1 {name:zhangsan,age:27}
OK
127.0.0.1:6379> get user:1
"{name:zhangsan,age:27}"
方式二(推荐)
语法:
mset user:{id}:filed value [user:{id}:filed value]
举例:
# 批量设置 mset user:{id}:{filed} value
127.0.0.1:6379> mset user:1:name xiaohua user:1:age 28
OK
127.0.0.1:6379> keys *
1) "user:1:name"
2) "user:1:age"
127.0.0.1:6379> mget user:1:name user:1:age
1) "xiaohua"
2) "28"
组合命令
语法:
先取值再赋值:GETSET KEY VALUE
举例:
# 如果不存在值,则返回 nil
127.0.0.1:6379> GETSET db redis
(nil)
127.0.0.1:6379> get db
"redis"
# 如果存在值,获取原来的值,并设置新的值
127.0.0.1:6379> GETSET db mysql
"redis"
127.0.0.1:6379> keys *
1) "db"
127.0.0.1:6379> get db
"mysql"
应用总结
数据结构是相同的!
String类似的使用场景:value除了是我们的字符串还可以是我们的数字!
- 计数器
- 统计多单位的数量
- 粉丝数
- 对象缓存存储!
List(列表)
基本的数据类型,列表
在redis里面,我们可以把list玩成 ,栈、队列、阻塞队列!
所有的list命令都是用l
开头的
设置、获取值
语法:
将一个值(多个值)从头部(左边)插入:LPUSH KEY ELEMENT [ELEMENT.......]
将一个值(多个值)从尾部(右边)插入:RPUSH KEY ELEMENT [ELEMENT.......]
获取LIST的值:LRANGE KEY START END (0 -1 代表全部)
举例:
127.0.0.1:6379> keys *
(empty array)
# 将一个值或者多个值,插入到列表头部(左边)
127.0.0.1:6379> LPUSH list one
(integer) 1
127.0.0.1:6379> LPUSH list two
(integer) 2
127.0.0.1:6379> LPUSH list three
(integer) 3
# 获取list中所有的值!
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
# 通过区间获取具体的值!
127.0.0.1:6379> LRANGE list 0 1
1) "three"
2) "two"
# 将一个值或者多个值,插入到列表位的右边
127.0.0.1:6379> RPUSH list rightval
(integer) 4
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
3) "one"
4) "rightval"
# 从左边插入两个值
127.0.0.1:6379> LPUSH newlist val1 val2
(integer) 2
127.0.0.1:6379> LRANGE newlist 0 -1
1) "val2"
2) "val1"
# 从右边插入两个值
127.0.0.1:6379> RPUSH newlist val3 val4
(integer) 4
127.0.0.1:6379> LRANGE newlist 0 -1
1) "val2"
2) "val1"
3) "val3"
4) "val4"
移除
语法:
从左边移除值:LPOP KEY [COUNT]
从右边移除值:RPOP KEY [COUNT]
举例:
127.0.0.1:6379> keys *
1) "list"
127.0.0.1:6379> LRANGE list 0 -1
1) "ten"
2) "nine"
3) "eight"
4) "seveen"
5) "six"
6) "five"
7) "flour"
8) "threee"
9) "two"
10) "one"
# 移除list的第一个元素
127.0.0.1:6379> LPOP list
"ten"
127.0.0.1:6379> LRANGE list 0 -1
1) "nine"
2) "eight"
3) "seveen"
4) "six"
5) "five"
6) "flour"
7) "threee"
8) "two"
9) "one"
# 移除list的最后一个元素
127.0.0.1:6379> RPOP list
"one"
127.0.0.1:6379> LRANGE list 0 -1
1) "nine"
2) "eight"
3) "seveen"
4) "six"
5) "five"
6) "flour"
7) "threee"
8) "two"
# 移除list的开头的两个元素
127.0.0.1:6379> LPOP list 2
1) "nine"
2) "eight"
127.0.0.1:6379> LRANGE list 0 -1
1) "seveen"
2) "six"
3) "five"
4) "flour"
5) "threee"
6) "two"
# 移除list的末尾的3个元素
127.0.0.1:6379> RPOP list 3
1) "two"
2) "threee"
3) "flour"
127.0.0.1:6379> LRANGE list 0 -1
1) "seveen"
2) "six"
3) "five"
通过下标获取值
语法:
通过下标获取list的值:LINDEX KEY INDEX
举例:
127.0.0.1:6379> LPUSH list val1 val2 val3 val4 val5
(integer) 5
127.0.0.1:6379> LRANGE list 0 -1
1) "val5"
2) "val4"
3) "val3"
4) "val2"
5) "val1"
# 通过下标获得 list 中的某一个值!
127.0.0.1:6379> LINDEX list 2
"val3"
127.0.0.1:6379> LINDEX list 1
"val4"
127.0.0.1:6379> LINDEX list 0
"val5"
127.0.0.1:6379> LINDEX list -1
"val1"
获取列表长度
语法:
获取list列表的长度:LLEN KEY
举例:
127.0.0.1:6379> LPUSH list lval1 lval2 lval3
(integer) 3
127.0.0.1:6379> LRANGE list 0 -1
1) "lval3"
2) "lval2"
3) "lval1"
# 返回列表的长度
127.0.0.1:6379> LLEN list
(integer) 3
移除指定的值
语法:
移除指定的值:LREM KEY COUNT ELEMENT
举例:
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
4) "one"
# 移除list集合中指定个数的value
127.0.0.1:6379> lrem list 1 one
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "three"
3) "two"
127.0.0.1:6379> lrem list 1 three
(integer) 1
127.0.0.1:6379> LRANGE list 0 -1
1) "three"
2) "two"
127.0.0.1:6379> Lpush list three
(integer) 3
127.0.0.1:6379> lrem list 2 three
(integer) 2
127.0.0.1:6379> LRANGE list 0 -1
1) "two"
截取list
语法:
截取list的值:LTRIM KEY START STOP
举例:
127.0.0.1:6379> LPUSH list one two three flour five
(integer) 5
127.0.0.1:6379> LRANGE list 0 -1
1) "five"
2) "flour"
3) "three"
4) "two"
5) "one"
# 通过下标截取指定的长度,这个list已经被改变了,截断了,只剩下截取的元素!
127.0.0.1:6379> LTRIM list 1 2
OK
127.0.0.1:6379> LRANGE list 0 -1
1) "flour"
2) "three"
移除列表元素,到新列表中
语法:
移除列表元素,到新列表中:RPOPLPUSH SOURCE DESTINATION
举例:
127.0.0.1:6379> keys *
1) "list"
127.0.0.1:6379> LRANGE list 0 -1
1) "one"
2) "two"
3) "three"
4) "flour"
5) "five"
# 移除列表的最后一个元素,将他移动到新的列表中!
127.0.0.1:6379> RPOPLPUSH list newList
"five"
127.0.0.1:6379> KEYS *
1) "newList"
2) "list"
# 查看原来的列表
127.0.0.1:6379> LRANGE list 0 -1
1) "one"
2) "two"
3) "three"
4) "flour"
# 查看目标列表中,确实存在改值!
127.0.0.1:6379> LRANGE newList 0 -1
1) "five"
指定值的替换(更新)
语法:
替换指定的值,更新操作:LSET KEY INDEX ELEMENT
举例:
# lset 将列表中指定下标的值替换为另外一个值,更新操作
# 判断这个列表是否存在
127.0.0.1:6379> EXISTS list
(integer) 0
# 如果不存在列表我们去更新就会报错
127.0.0.1:6379> lset list 0 item
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> LRANGE list 0 0
1) "value1"
# 如果存在,更新当前下标的值
127.0.0.1:6379> lset list 0 item
OK
127.0.0.1:6379> LRANGE list 0 0
1) "item"
# 如果不存在,则会报错!
127.0.0.1:6379> lset list 1 other
(error) ERR index out of range
值插入具体位置
语法:
具体位置插入值:LINSERT KEY BEFORE/AFTER PIVOT ELEMENT
举例:
127.0.0.1:6379> Rpush mylist "hello"
(integer) 1
127.0.0.1:6379> Rpush mylist "world"
(integer) 2
# linsert # 将某个具体的value插入到列把你中某个元素的前面或者后面!
127.0.0.1:6379> LINSERT mylist before "world" "other"
(integer) 3
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello"
2) "other"
3) "world"
127.0.0.1:6379> LINSERT mylist after world new
(integer) 4
127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello"
2) "other"
3) "world"
4) "new"
应用总结
- 他实际上是一个链表,before Node after , left,right 都可以插入值
- 如果key 不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在!
- 在两边插入或者改动值,效率最高! 中间元素,相对来说效率会低一点~
可以实现场景:消息排队!消息队列 (Lpush Rpop), 栈( Lpush Lpop)!
Set(集合)
是无序不重复集合
设置、获取、查看值
语法:
在Set集合中添加值:SADD KEY MEMBER [MEMBER ......]
查看Set集合中的所有值:SMEMBERS KEY
查看值是否在Set集合中:SISMEMBER KEY MEMBER
举例:
# set集合中添加值
127.0.0.1:6379> SADD myset java
(integer) 1
127.0.0.1:6379> SADD myset phthon
(integer) 1
127.0.0.1:6379> SADD myset php
(integer) 1
127.0.0.1:6379> KEYS *
1) "myset"
# 查看指定set的所有值
127.0.0.1:6379> SMEMBERS myset
1) "php"
2) "phthon"
3) "java"
# 判断某一个值是不是在set集合中!
127.0.0.1:6379> SISMEMBER myset java
(integer) 1
127.0.0.1:6379> SISMEMBER myset C++
(integer) 0
获取元素个数
语法:
获取Set的元素个数:SCARD KEY
举例:
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> sadd myset member1
(integer) 1
127.0.0.1:6379> sadd myset member2 member3 member4 member5
(integer) 4
# 获取set集合中的内容元素个数!
127.0.0.1:6379> SCARD myset
(integer) 5
移除指定元素
语法:
移除指定元素:SREM KEY MEMBER [MEMBER ......]
举例:
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> sadd myset member1
(integer) 1
127.0.0.1:6379> sadd myset member2 member3 member4 member5
(integer) 4
# 移除set集合中的指定元素
127.0.0.1:6379> SREM myset member123
(integer) 0
127.0.0.1:6379> SREM myset member1
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "member2"
2) "member5"
3) "member4"
4) "member3"
随机抽选元素
语法:
随机抽选元素:SRANDMEMBER KEY [COUNT]
举例:
127.0.0.1:6379> SMEMBERS myset
1) "mem2"
2) "mem4"
3) "mem1"
4) "mem3"
5) "mem5"
# 随机抽选出一个元素
127.0.0.1:6379> SRANDMEMBER myset
"mem2"
127.0.0.1:6379> SRANDMEMBER myset
"mem1"
127.0.0.1:6379> SRANDMEMBER myset
"mem3"
127.0.0.1:6379> SRANDMEMBER myset
"mem4"
127.0.0.1:6379> SRANDMEMBER myset
"mem5"
127.0.0.1:6379> SRANDMEMBER myset
"mem3"
# 随机抽选出指定个数的元素
127.0.0.1:6379> SRANDMEMBER myset 2
1) "mem5"
2) "mem3"
127.0.0.1:6379> SRANDMEMBER myset 3
1) "mem2"
2) "mem4"
3) "mem1"
127.0.0.1:6379> SRANDMEMBER myset 2
1) "mem5"
2) "mem2"
随机删除元素
语法:
随机删除元素:SPOP KEY [COUNT]
举例:
127.0.0.1:6379> SMEMBERS myset
1) "mem2"
2) "mem5"
3) "mem4"
4) "mem1"
5) "mem3"
# 随机删除一些set集合中的元素!
127.0.0.1:6379> SPOP myset
"mem4"
127.0.0.1:6379> SPOP myset 2
1) "mem5"
2) "mem3"
127.0.0.1:6379> SMEMBERS myset
1) "mem2"
2) "mem1"
移动元素到新的集合
语法:
移动元素到新的set集合中:SMOVE oldset newset MEMBER
举例:
127.0.0.1:6379> SMEMBERS myset
1) "m1"
2) "m4"
3) "m3"
4) "m6"
5) "m2"
6) "m5"
# 将一个指定的值,移动到另外一个set集合!
127.0.0.1:6379> SMOVE myset newSet m2
(integer) 1
127.0.0.1:6379> SMOVE myset newSet m2
(integer) 0
127.0.0.1:6379> SMEMBERS myset
1) "m3"
2) "m1"
3) "m6"
4) "m4"
5) "m5"
127.0.0.1:6379> SMEMBERS newSet
1) "m2"
集合
微博,B站,共同关注!(并集)
数字集合类:
- 差集
SDIFF
- 交集
SINTER
- 并集
SUNION
127.0.0.1:6379> SMEMBERS set1
1) "m3"
2) "m2"
3) "m1"
4) "m4"
5) "m5"
127.0.0.1:6379> SMEMBERS set2
1) "m2"
2) "m7"
3) "m8"
# 差集
127.0.0.1:6379> SDIFF set1 set2
1) "m3"
2) "m1"
3) "m5"
4) "m4"
# 差集
127.0.0.1:6379> SDIFF set2 set1
1) "m7"
2) "m8"
# 交集
127.0.0.1:6379> SINTER set1 set2
1) "m2"
# 交集
127.0.0.1:6379> SINTER set2 set1
1) "m2"
# 并集
127.0.0.1:6379> SUNION set1 set2
1) "m3"
2) "m7"
3) "m1"
4) "m2"
5) "m4"
6) "m5"
7) "m8"
# 并集
127.0.0.1:6379> SUNION set2 set1
1) "m3"
2) "m7"
3) "m1"
4) "m2"
5) "m5"
6) "m4"
7) "m8"
应用总结
微博,A用户将所有关注的人放在一个set集合中!将它的粉丝也放在一个集合中!
共同关注,共同爱好,二度好友,推荐好友!(六度分割理论)
Hash(哈希)
可以看做Map集合,key-map! 时候这个值是一个map集合!
本质和String类型没有太大区别,还是一个简单的 key-vlaue!
设置、获取、查看值
语法:
Hash插入值:HSET KEY FIELD VALUE [FIELD VALUE......]
Hash获取值:HGET KEY FIELD
Hash批量插入值:HMSET KEY FIELD VALUE [FIELD VALUE......]
Hash批量获取值:HMGET KEY FIELD [FIELD......]
Hash获取所有的值:HGETALL KEY
举例:
# set一个具体 key-vlaue
127.0.0.1:6379> hset myhash field1 kuangshen
(integer) 1
# 获取一个字段值
127.0.0.1:6379> hget myhash field1
"kuangshen"
# set多个 key-vlaue
127.0.0.1:6379> hmset myhash field1 hello field2 world
OK
# 获取多个字段值
127.0.0.1:6379> hmget myhash field1 field2
1) "hello"
2) "world"
# 获取全部的数据,
127.0.0.1:6379> hgetall myhash
1) "field1"
2) "hello"
3) "field2"
4) "world"
删除元素
语法:
Hash删除元素:HDEL KEY FIELD [FIELD......]
**举例: **
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f2"
4) "v2"
5) "f3"
6) "v3"
# 删除hash指定key字段!对应的value值也就消失了!
127.0.0.1:6379> HDEL myHash f2
(integer) 1
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f3"
4) "v3"
获取元素数量
语法:
Hash元素的数量获取:HLEN KEY
举例:
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f3"
4) "v3"
# 获取hash表的字段数量!
127.0.0.1:6379> HLEN myHash
(integer) 2
判断元素存在
语法:
判断Hash中元素是否存在:HEXISTS KEY FIELD
**举例: **
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f3"
4) "v3"
# 判断hash中指定字段是否存在!
127.0.0.1:6379> HEXISTS myHash f1
(integer) 1
127.0.0.1:6379> HEXISTS myHash f6
(integer) 0
获取所有键,获取所有值
语法:
获取Hash中所有元素:HKEYS KEY
获取Hash中所有元素的值:HVALS KEY
**举例: **
# 只获得所有的键值对
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f3"
4) "v3"
# 只获得所有field
127.0.0.1:6379> HKEYS myHash
1) "f1"
2) "f3"
# 只获得所有value
127.0.0.1:6379> HVALS myHash
1) "v1"
2) "v3"
自增 自减
语法:
Hash中元素自增:HINCRBY KEY FIELD INCREMENT(正数)
Hash中元素自减:HINCRBY KEY FIELD INCREMENT(负数)
举例:
127.0.0.1:6379> hset myhash field3 5
(integer) 1
# 自增
127.0.0.1:6379> HINCRBY myhash field3 1
(integer) 6
#自减
127.0.0.1:6379> HINCRBY myhash field3 -1
(integer) 5
是否存在
语法:
Hash中元素是否存在:HSETNX KEY FIELD VALUE
举例:
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f2"
4) "v2"
5) "f3"
6) "v3"
# 如果不存在则可以设置
127.0.0.1:6379> HSETNX myHash f4 v4
(integer) 1
127.0.0.1:6379> HGETALL myHash
1) "f1"
2) "v1"
3) "f2"
4) "v2"
5) "f3"
6) "v3"
7) "f4"
8) "v4"
# 如果存在则不能设置
127.0.0.1:6379> HSETNX myHash f1 v99
(integer) 0
127.0.0.1:6379> HSETNX myHash f1 v1
(integer) 0
应用总结
- hash变更的数据 user name age,尤其是是用户信息之类的,经常变动的信息!
- hash 更适合于对象的存储
- String更加适合字符串存储
Zset(有序集合)
在set的基础上,增加了一个值 zset k1 score1 v1
设置、获取值
语法:
设置:ZADD KEY SCORE VALUE
获取:ZRANGE KEY MIN MAX
举例:
# 添加一个值
127.0.0.1:6379> zadd myset 1 one
(integer) 1
# 添加多个值
127.0.0.1:6379> zadd myset 2 two 3 three
(integer) 2
# 获取所有值
127.0.0.1:6379> ZRANGE myset 0 -1
1) "one"
2) "two"
3) "three"
排序
语法:
从小到大排列值:ZRANGEBYSCORE KEY -inf +inf
从大到小排列值:ZREVRANGEBYSCORE KEY +inf -inf
按条件显示值的所有信息:ZRANGEBYSCORE KEY -inf +inf WiTHSCORES
**举例: **
# 获取所有数据
127.0.0.1:6379> zrange salary 0 -1 withscores
1) "wangwu"
2) "1000"
3) "lisi"
4) "3000"
5) "zhangsan"
6) "5000"
# 正序排序,获取所有数据
127.0.0.1:6379> zrangebyscore salary -inf +inf
1) "wangwu"
2) "lisi"
3) "zhangsan"
# 正序排序,获取指定区间(闭区间)的数据
127.0.0.1:6379> zrangebyscore salary 1000 3000
1) "wangwu"
2) "lisi"
# 正序排序,获取指定区间(闭区间)的数据,并携带score参数
127.0.0.1:6379> zrangebyscore salary 1000 3000 withscores
1) "wangwu"
2) "1000"
3) "lisi"
4) "3000"
# 获取所有数据
127.0.0.1:6379> zrange salary 0 -1 withscores
1) "wangwu"
2) "1000"
3) "lisi"
4) "3000"
5) "zhangsan"
6) "5000"
# 降序排序,获取所有数据
127.0.0.1:6379> zrevrangebyscore salary +inf -inf
1) "zhangsan"
2) "lisi"
3) "wangwu"
# 降序排序,获取指定区间(闭区间)的数据
127.0.0.1:6379> zrevrangebyscore salary 3000 1000
1) "lisi"
2) "wangwu"
# 降序排序,获取指定区间(闭区间)的数据,并携带score参数
127.0.0.1:6379> zrevrangebyscore salary 3000 1000 withscores
1) "lisi"
2) "3000"
3) "wangwu"
4) "1000"
移除
语法:
移除元素:ZREM KEY MEMBER [MEMBER .......]
获取zset的元素的个数:ZCARD KEY
**举例: **
127.0.0.1:6379> ZRANGE salary 0 -1
1) "xiaoR"
2) "xiaohong"
3) "zhangsan"
# 移除有序集合中的指定元素
127.0.0.1:6379> ZREM salary xiaohong
(integer) 1
127.0.0.1:6379> ZRANGE salary 0 -1
1) "xiaoR"
2) "zhangsan"
# 获取有序集合中元素的个数
127.0.0.1:6379> ZCARD salary
(integer) 2
获取指定区间元素个数
语法:
获取指定区间的成员数量的个数:ZCOUNT KEY MIN MAX
**举例: **
127.0.0.1:6379> zadd myset 1 hello
(integer) 1
127.0.0.1:6379> zadd myset 2 world 3 xiaoR
(integer) 2
# 获取指定区间的成员数量!
127.0.0.1:6379> ZCOUNT myset 1 3
(integer) 3
127.0.0.1:6379> ZCOUNT myset 1 2
(integer) 2
应用总结
其与的一些API,通过我们的学习吗,你们剩下的如果工作中有需要,这个时候你可以去查查看官方文 档!
案例思路:set 排序 存储班级成绩表,工资表排序!
普通消息,1, 重要消息 ,2,带权重进行判断!
排行榜应用实现,取Top N 测试
三种特殊数据类型
Geospatial地理位置
朋友的定位,附近的人,打车距离计算?
Redis 的 Geo 在Redis3.2 版本就推出了! 这个功能可以推算地理位置的信息,两地之间的距离,方圆几里的人
该数据类型的操作只有 六个命令:
官方文档:https://www.redis.net.cn/order/3685.html
GEOADD
添加地理位置坐标
# geoadd 添加地理位置
# 规则:两级无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入!
# 有效的经度从-180度到180度。
# 有效的纬度从-85.05112878度到85.05112878度。
# 当坐标位置超出上述指定范围时,该命令将会返回一个错误。
# 127.0.0.1:6379> geoadd china:city 39.90 116.40 beijin (error)
# ERR invalid longitude,latitude pair 39.900000,116.400000
# 一次添加一个位置
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
#一次添加多个位置
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shengzhen
(integer) 2
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 2
GEOPOS
获取地理位置坐标,一定是一个坐标值
# 获取指定的城市的经度和纬度!
127.0.0.1:6379> GEOPOS china:city beijing
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
127.0.0.1:6379> GEOPOS china:city beijing chongqing
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
2) 1) "106.49999767541885376"
2) "29.52999957900659211"
GEODIST
两人之间的距离!
单位:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
# 查看上海到北京的直线距离
127.0.0.1:6379> GEODIST china:city beijing shanghai km
"1067.3788"
# 查看重庆到北京的直线距离
127.0.0.1:6379> GEODIST china:city beijing chongqing km
"1464.0708"
GEORADIUS
以给定的经纬度为中心, 找出某一半径内的元素
我附近的人? (获得所有附近的人的地址,定位!)通过半径来查询!
获得指定数量的人,200
所有数据应该都录入:china:city ,才会让结果更加准确!
# 以110,30 这个经纬度为中心,寻找方圆1000km内的城市
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km
1) "chongqi"
2) "xian"
3) "shengzhen"
4) "hangzhou"
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km
1) "chongqi"
2) "xian"
# 显示到中心点的距离
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withdist
1) 1) "chongqi"
2) "341.9374"
2) 1) "xian"
2) "483.8340"
# 显示经纬度信息
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withcoord
1) 1) "chongqi"
2) 1) "106.49999767541885376"
2) "29.52999957900659211"
2) 1) "xian"
2) 1) "108.96000176668167114"
2) "34.25999964418929977"
# 筛选出指定的结果
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withdist withcoord count 1
1) 1) "chongqi"
2) "341.9374"
3) 1) "106.49999767541885376"
2) "29.52999957900659211"
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withdist withcoord count 2
1) 1) "chongqi"
2) "341.9374"
3) 1) "106.49999767541885376"
2) "29.52999957900659211"
2) 1) "xian"
2) "483.8340"
3) 1) "108.96000176668167114"
2) "34.25999964418929977"
GEORADIUSBYMEMBER
# 找出以北京为中心半径1000km范围内的城市
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1) "beijing"
2) "xian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 400 km
1) "hangzhou"
2) "shanghai"
GEOHASH
返回一个或多个位置元素的Geohash 表示
该命令将返回11个字符的Geohash字符串!
# 将二维的经纬度转换为一维的字符串,如果两个字符串越像,那么则距离越近!
127.0.0.1:6379> geohash china:city beijing chongqi
1) "wx4fbxxfke0"
2) "wm5xzrybty0"
GEO 底层的实现原理其实就是 Zset(有序集合)!我们可以使用Zset命令来操作geo!
查看 移除
# 查看地图中全部的元素
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chongqi"
2) "xian"
3) "shengzhen"
4) "hangzhou"
5) "shanghai"
6) "beijing"
# 移除指定元素!
127.0.0.1:6379> zrem china:city beijing
(integer) 1
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chongqi"
2) "xian"
3) "shengzhen"
4) "hangzhou"
5) "shanghai"
Hyperloglog
什么是基数?
基数(不重复的元素),可以接受误差!
A {1,3,5,7,8,7} 的基数是5,因为7重复
B{1,3,5,7,8}的基数也是5,没有重复的数
简介
Redis 2.8.9 版本就更新了 Hyperloglog 数据结构!
Redis Hyperloglog 基数统计的算法!
优点:占用的内存是固定,2^64 不同的元素的基数,只需要废 12KB内存!如果要从内存角度来比较的话 Hyperloglog 首选
网页的 UV (一个人访问一个网站多次,但是还是算作一个人!)
传统的方式, set 保存用户的id,然后就可以统计 set 中的元素数量作为标准判断 !这个方式如果保存大量的用户id,就会比较麻烦!我们的目的是为了计数,而不是保存用户id;
Hyperloglog有0.81% 错误率! 统计UV任务,可以忽略不计的!
# 创建第一组元素 mykey
127.0.0.1:6379> PFadd mykey a b c d e f g h i j
(integer) 1
# 统计 mykey 元素的基数数量
127.0.0.1:6379> PFCOUNT mykey
(integer) 10
# 创建第二组元素 mykey2
127.0.0.1:6379> PFadd mykey2 i j z x c v b n m
(integer) 1
127.0.0.1:6379> PFCOUNT mykey2
(integer) 9
# 合并两组 mykey mykey2 => mykey3 并集
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2
OK
# 看并集的数量!
127.0.0.1:6379> PFCOUNT mykey3
(integer) 15
如果允许容错,那么一定可以使用 Hyperloglog !
如果不允许容错,就使用 set 或者自己的数据类型即可!
Bitmap
为什么其他教程都不喜欢讲这些?这些在生活中或者开发中,都有十分多的应用场景,学习了,就是多一个思路!
位存储
统计用户信息,活跃,不活跃! 登录 、 未登录! 打卡,未打卡! 两个状态的,都可以使用 Bitmaps!
Bitmap 位图,数据结构! 都是操作二进制位来进行记录,就只有0 和 1 两个状态!
365 天 = 365bit 1字节 = 8bit 46 个字节左右!
测试
使用bitmap 来记录 周一到周日的打卡!
记录数据
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0
127.0.0.1:6379> setbit sign 7 1
(integer) 0
查看某一天是否有打卡!
127.0.0.1:6379> getbit sign 1
(integer) 0
127.0.0.1:6379> getbit sign 3
(integer) 1
统计操作,统计一周打卡的次数!
bitcount :统计值是1的数量
# 统计这周的打卡记录,就可以看到是否有全勤!
127.0.0.1:6379> bitcount sign
(integer) 4
事务
Redis事务本质:一组命令的集合,一个事务中的所有命令都会被序列化,在事务执行过程的中,会按照顺序执行!
一次性、顺序性、排他性!执行一系列的命令!
Redis事务没有隔离级别的概念!
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!
Exec Redis单条命令式保存原子性的,但是事务不保证原子性!
redis的事务:
- 开启事务:MULTI
- 命令入队(…)
- 取消事务:DISCARD
- 执行事务:EXEC
正常执行事务
# 开启事务
127.0.0.1:6379> multi
OK
# 命令入队
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
# 执行事务 可以发现所有命令是执行exec后同一执行的
127.0.0.1:6379> exec
1) OK
2) OK
3) "v2"
4) OK
放弃事务
# 开启事务
127.0.0.1:6379> multi
OK
# 命令入队
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
# 取消事务
127.0.0.1:6379> DISCARD
OK
# 事务队列中命令都不会被执行!
127.0.0.1:6379> get k4
(nil)
编译型异常
编译型异常(代码有问题! 命令有错!) ,事务中所有的命令都不会被执行!
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
# 执行错误的命令
127.0.0.1:6379> getset k3
(error) ERR wrong number of arguments for 'getset' command 127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> set k5 v5
QUEUED
# 执行事务报错!所有的命令都不会被执行!
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5
(nil)
运行时异常
运行时异常(1/0), 如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行 的,错误命令抛出异常!
127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> multi
OK
# 会执行的时候失败!
127.0.0.1:6379> incr k1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> get k3
QUEUED
# 虽然第一条命令报错了,但是依旧正常执行成功了!
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range
2) OK
3) OK
4) "v3"
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"
监控(面试常问)
悲观、乐观锁
悲观锁:
- 很悲观,认为什么时候都会出问题,无论做什么都会加锁!
乐观锁:
- 很乐观,认为什么时候都不会出问题,所以不会上锁! 更新数据的时候去判断一下,在此期间是否有人修改过这个数据
- 获取version
- 更新的时候比较 version
监控测试
正常执行成功!
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
# 监视 money 对象
127.0.0.1:6379> watch money
OK
# 开启事务
127.0.0.1:6379> multi
127.0.0.1:6379> DECRBY money 20
QUEUED
127.0.0.1:6379> INCRBY out 20
QUEUED
# 事务正常结束,数据期间没有发生变动,这个时候就正常执行成功! OK
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
测试多线程修改值 , 使用watch 可以当做redis的乐观锁操作!
# 监视 money
127.0.0.1:6379> watch money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY money 10
QUEUED
127.0.0.1:6379> INCRBY out 10
QUEUED
# 执行之前,另外一个线程,修改了money的值,这个时候,就会导致事务执行失败!
127.0.0.1:6379> exec
(nil)
如果修改失败,获取最新的值就好!
Jedis
什么是Jedis 是 Redis 官方推荐的 java连接开发工具!
使用Java 操作Redis 中间件!如果你要使用 java操作redis,那么一定要对Jedis 十分的熟悉!
jedis使用
创建工程
创建一个名为Redis-Study的空工程
创建模块
在父工程下创建一个名为redis-01-jedis的普通maven模块
导入依赖
打开maven官网,找到jedis依赖
pom.xml
<dependencies>
<!--jedis依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<!--fastjson依赖,因为我们要解析json-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
编码测试
1、连接数据库
2、操作命令
3、断开连接
常用的API:String、List、Set、Hash、Zset
基本命令测试
TestPing
public class TestPing {
public static void main(String[] args) {
// 连接redis
Jedis jedis = new Jedis("127.0.0.1", 6379);
// Jedis中的API就是之前学习的命令
System.out.println("ping命令:"+jedis.ping());
System.out.println("set命令:"+jedis.set("name", "zhangsan"));
// 关闭连接
jedis.close();
}
}
事务测试
public class TestTX {
public static void main(String[] args) {
// 连接redis
Jedis jedis = new Jedis("localhost", 6379);
JSONObject jsonObject = new JSONObject();
jsonObject.put("name","jinjiahuan");
jsonObject.put("age","27");
String result = jsonObject.toJSONString();
// 清空数据
jedis.flushDB();
// 开启事务
Transaction multi = jedis.multi();
try {
// 添加队列
multi.set("user1",result);
multi.set("user2",result);
// 设置异常
int s=1/0;
// 执行事务
multi.exec();
} catch (Exception e) {
e.printStackTrace();
// 出现异常,结束事务
multi.discard();
} finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
// 关闭连接
jedis.close();
}
}
}
整合SpringBoot
SpringBoot 操作数据:spring-data、jpa、jdbc、mongodb、redis!
SpringData 也是和 SpringBoot 齐名的项目!
说明: 在 SpringBoot2.x 之后,原来使用的jedis 被替换为了 lettuce
使用步骤
新建moudle
在父工程Redis-Study下新建一个名为redis-02-springboot普通maven模块
springboot的版本使用2.2.1.RELEASE
pom分析
查看pom.xml文件发现,操作redis的不是jedis,而是spring-boot-starter-data-redis
进入spring-boot-starter-data-redis底层,发现也没有jedis。但是存在lettuce-core依赖
因为在 SpringBoot2.x 之后,原来使用的jedis 被替换为了 lettuce
对比:
- jedis : 采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接池! 更像 BIO 模式
- lettuce : 采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像 NIO 模式。lettuce 底层导入的是netty包,性能更快一点
源码分析
springboot有大量的自动配置类,自然就会有关于redis的自动配置类,搜索发现RedisAutoConfiguration
RedisProperties和我们的配置文件进行了绑定,点击可以看到我们能够配置的东西
查看RedisAutoConfiguration
@Bean
// 我们可以自定义一个redisTemplate替换默认的redisTemplate
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 默认的RedisTemplate没有进行过的的设置,redis对象都是需要序列化的
// 两个泛型都是Object,我们后面需要强制转换为<String,Object>
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean // 由于String类型是redis中最长使用的类型,所以说单独提出来了一个bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
模块配置
application.properties
# 配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
连接池选择源码分析
点进去发现这是个接口,有两个实现类JedisConnectionFactory,LettuceConnectionFactory
查看JedisConnectionFactory实现类,发现有些类是没有的
查看LettuceConnectionFactory,发现使用的类都存在
所以我们如果要在配置文件中是redis连接池的话,要选择Lettuce连接池
application.properties
# 连接池配置
spring.redis.lettuce.pool...
测试
Redis02SpringbootApplicationTests
@SpringBootTest
class Redis02SpringbootApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
// redisTemplate 操作不同的数据类型,api和我们的指令是一样的
// opsForValue 操作字符串 类似String
// opsForList 操作List 类似List
// opsForSet
// opsForHash
// opsForZSet
// opsForGeo
// opsForHyperLogLog
// 除了基本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务,和基本的 CRUD
// 获取redis的连接对象
// RedisConnection connection =
//redisTemplate.getConnectionFactory().getConnection();
// connection.flushDb();
// connection.flushAll();
redisTemplate.opsForValue().set("高启强","想吃?了");
System.out.println(redisTemplate.opsForValue().get("高启强"));
}
}
运行测试
自定义RedisTemplate
进入RedisTemplate类中
分析
新建pojo包,包下创建User类,先不进行序列化操作
@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
private String name;
private int age;
}
测试Redis02SpringbootApplicationTests
@Test
public void test() throws JsonProcessingException {
User user = new User("张三", 23);
// 通过ObjectMapper实现user对象序列化
String userStr = new ObjectMapper().writeValueAsString(user);
redisTemplate.opsForValue().set("user",userStr);
System.out.println(redisTemplate.opsForValue().get("user"));
}
控制台打印正常
查看redis客户端数据,发现存入的key乱码
现在我们不进行ObjectMapper序列化
@Test
public void test() throws JsonProcessingException {
User user = new User("张三", 23);
// 通过ObjectMapper实现user对象序列化
// String userStr = new ObjectMapper().writeValueAsString(user);
redisTemplate.opsForValue().set("user",user);
System.out.println(redisTemplate.opsForValue().get("user"));
}
测试发现报错,因为没有进行序列化,所以所有的对象需要序列化
实体类
我们将user进行序列化
config.User
@Component
@AllArgsConstructor
@NoArgsConstructor
@Data
// 在企业中,我们的所有的pojo都会进行序列化
public class User implements Serializable {
private String name;
private int age;
}
测试
@Test
public void test() throws JsonProcessingException {
User user = new User("张三", 23);
// 通过ObjectMapper实现user对象序列化
// String userStr = new ObjectMapper().writeValueAsString(user);
redisTemplate.opsForValue().set("user",user);
System.out.println(redisTemplate.opsForValue().get("user"));
}
redis客户端,还是会有乱码,因为redisTemplate默认使用的是jdk序列化。因此我们要定制自己的序列化方式
配置类
新建config包,包下创建RedisConfig配置类
RedisConfig,固定模板,拿来即用
@Configuration
public class RedisConfig {
// 这是我给大家写好的一个固定模板,大家在企业中,拿去就可以直接使用!
// 自己定义了一个 RedisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 我们为了自己开发方便,一般直接使用 <String, Object>
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
测试类
Redis02SpringbootApplicationTests
而是出现了两个类
现在我们需要指向我们自定义的对象,这是spring的东西,我们现在来回顾
方式一
@Autowired:根据类型去注入对象,如果容器中一个类型有多个对象,就无法唯一指向了
@Qualifier("beanName"):可以通过指定对象名,指向唯一的对象
在RedisConfig中,方法名就是我们自定义的对象名
所以在Redis02SpringbootApplicationTests中,使用@Autowired和@Qualifier结合指向我们的对象
@SpringBootTest
class Redis02SpringbootApplicationTests {
// 通过类型注入
@Autowired
// 通过对象名注入
@Qualifier("redisTemplate")
// 两者结合指向我们唯一的自定义对象
private RedisTemplate redisTemplate;
@Test
public void test() throws JsonProcessingException {
User user = new User("张三", 23);
redisTemplate.opsForValue().set("user",user);
System.out.println(redisTemplate.opsForValue().get("user"));
}
方式二
修改RedisConfig中的方法名
@Configuration
public class RedisConfig {
// 这是我给大家写好的一个固定模板,大家在企业中,拿去就可以直接使用!
// 自己定义了一个 RedisTemplate
@Bean
@SuppressWarnings("all")
// 修改方法名(也就是修改对象名)
public RedisTemplate<String, Object> redisTemplateMy(RedisConnectionFactory factory) {
Redis02SpringbootApplicationTests
@Autowired
// 使用自定义的对象名
private RedisTemplate redisTemplateMy;
方式三
通过@Bean自定义对象名
RedisConfig
// 通过@Bean自定义对象名
@Bean("redisTemplateMy")
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
Redis02SpringbootApplicationTests
@Autowired
// 使用自定义的对象名
private RedisTemplate redisTemplateMy;
测试
启动Redis02SpringbootApplicationTests
redis客户端,成功,没有产生乱码
但是在企业开发中,我们在80%的情况下,都不会使用这个原生的方式去编写代码
redisTemplate会被封装到一个工具类使用起来比较方便
自定义Redis工具类
使用RedisTemplate需要频繁调用.opForxxx
然后才能进行对应的操作,这样使用起来代码效率低下,工作中一
般不会这样使用,而是将这些常用的公共API抽取出来封装成为一个工具类,然后使用工具类来间接操作Redis,不
但效率高并且易用。
工具类
新建utils包,包下创建RedisUtil工具类
RedisUtil
package com.jjh.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
//在我们真实的开发中, 或者你们在公司, 一般都可以看到一个公司自己封装redis的工具
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
// @SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
测试
Redis02SpringbootApplicationTests
@SpringBootTest
class Redis02SpringbootApplicationTests {
@Autowired
private RedisUtil redisUtil;
@Test
public void testRRedisUtil(){
redisUtil.set("name","jinjiahuan");
System.out.println(redisUtil.get("name"));
}
运行测试类testRRedisUtil方法
redis客户端
测试成功
Redis.conf详解
redis在启动的时候,就通过redis的配置文件启动的。
windows版有两个配置文件,使用redis.windows.conf配置文件
行家有没有,出手就知道
单位(unit)
redis的配置文件unit单位对大小写不敏感
包含(includes)
可以包含多个配置文件,就是好比我们学习Spring时候的Improt, JSP中的include
网络(network)
绑定ip
现在绑定的是本地ip,如果需要远程访问redis,要绑定远程服务器的ip地址
保护模式
保护模式,默认就是开启的
端口
端口配置
通用(general)
守护进程(win不支持)
# 以守护进程的方式运行(即后台运行),默认是no,我们需要自己开启为yes!
daemonize yes
管理守护进程(win不支持)
# 管理守护进程,默认是no,不用改
supervised no
进程文件(win不支持)
# 如果以后台的方式运行(daemonize yes),我们就需要指定一个pid文件!
pidfile /var/run/redis_6379.pid
日志级别
# Specify the server verbosity level.
# This can be one of:
# 主要用于测试和开发阶段
# debug (a lot of information, useful for development/testing)
# 记录较多的日志,像debug
# verbose (many rarely useful info, but not a mess like the debug level)
# 默认级别,生产环境使用
# notice (moderately verbose, what you want in production probably)
# 非常重要的信息或者是关键信息
# warning (only very important / critical messages are logged)
# 不一般不用修改,就使用默认的级别notice
loglevel notice
日志文件
# 用来记录日志的文件
logfile "server_log.txt"
数据库的数量
# 默认有16个数数据库
databases 16
logo显示(win没有这个配置)
# 是否总是显示LOGO,默认是开启的
always-show-logo yes
快照(snapshotting)
持久化规则
持久化, 在规定的时间内,执行了多少次操作,则会持久化到文件
- .rdb文件
- . aof文件
redis 是内存数据库,如果没有持久化,那么数据断电及失!
# 我们之后学习持久化,会自己定义这个测试
# 如果900s内,如果至少有一个1 key进行了修改,我们及进行持久化操作
save 900 1
# 如果300s内,如果至少10 key进行了修改,我们及进行持久化操作
save 300 10
# 如果60s内,如果至少10000 key进行了修改,我们及进行持久化操作
save 60 10000
持久化出错是否继续工作
# 持久化如果出错,是否还需要继续工作,默认是工作
stop-writes-on-bgsave-error yes
是否压缩rdb文件
rdb文件是持久化文件
# 是否压缩 rdb 文件,需要消耗一些cpu资源!
rdbcompression yes
rdb文件错误校验
# 保存rdb文件的时候,是否进行错误的检查校验
rdbchecksum yes
rdb文件目录
# rdb 文件保存的目录!默认是当前文件夹下
dir ./
主从复制(replication)
见后面内容:主从复制部分
安全(security)
设置密码
方式一
在配置文件中设置
requirepass 123456
但是我们一般不这样进行密码设置
方式二(推荐)
通过命令进行设置
127.0.0.1:6379> ping
PONG
# 获取redis的密码,默认是没有密码的
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
# 设置redis的密码OK
127.0.0.1:6379> config set requirepass "123456"
# 发现所有的命令都没有权限了
127.0.0.1:6379> config get requirepass
(error) NOAUTH Authentication required.
127.0.0.1:6379> ping
(error) NOAUTH Authentication required.
# 使用密码进行登录!
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "123456"
限制(clients)
# 设置能连接上redis的最大客户端的数量
maxclients 10000
# redis 配置最大的内存容量
maxmemory <bytes>
# 内存到达上限之后的处理策略
maxmemory-policy noeviction
# 1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
# 2、allkeys-lru : 删除lru算法的key
# 3、volatile-random:随机删除即将过期key
# 4、allkeys-random:随机删除
# 5、volatile-ttl : 删除即将过期的
# 6、noeviction : 永不过期,返回错误
AOF配置(append only mode)
# 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,rdb完全够用!
appendonly no
# 持久化的文件的名字
appendfilename "appendonly.aof"
# 每秒执行一次sync(同步),可能会丢失这1s的数据!
appendfsync everysec
# 每次修改都会sync。消耗性能
# appendfsync always
# 不执行sync,这个时候操作系统自己同步数据,速度最快!
# appendfsync no
Redis持久化
面试和工作的时候,持久化都是重点
Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!
RDB(Redis DataBase)持久化
什么是RDB
在主从复制中,rdb就是备用的!从机上面!
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话的Snapshot快照,它恢复时是将快照文件直接读取到内存里。
Redis会单独创建(fork )一个子进程来进行持久化,会先将数据写入一个临时文件中,待持久化过程都结束了,再用这个临时文件替代上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能,如果需要进行大规模的数据的恢复,且对于数据恢复的完整性不是非常敏感,那么RDB方式要比AOF方式更加高效。RDB的缺点就是最后一次持久化后的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改这个配置!
rdb保存的文件是dump.rdb ,都是我们的配置文件中的快照中进行配置的
触发规则
测试规则
1、定义自己的配置
自定义自己的规则,在60s内如果key修改了5个,那就进行持久化操作
指定配置文件,重启redis服务
客户端测试
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> set k2 v2
OK
127.0.0.1:6379> set k3 v3
OK
127.0.0.1:6379> set k4 v4
OK
127.0.0.1:6379> set k5 v5
OK
可以在目录下看到,已经产生rdb文件,测试成功
2、经测试,使用flushdb命令,也会产生rdp文件
3、经测试,退出redis程序,也会rdp文件
总结
-
如果满足save规则,就会触发rdb规则,例如save 60 5(意思为在60s内修改了5次key那么就会触发rdb操作)
-
执行flushall命令的时候也会触发rdb规则!
-
退出Redis,也会产生rdb文件!
在生产环境,有时候我们需要对dump.rdb进行备份,备份就会自动生成一个dump.rdb文件
恢复rdb
只要将rdb文件放在我们redis的启动目录下就可以,redis启动的时候会自动检测dump.rdb恢复其中的数据!
查看我们需要存放的位置直接用config get dir命令
如果config get dir文件下存在dump.rdb他就会自动恢复其中的数据
因此redis它默认的配置就够用,但还是要学习的
优缺点
优点:
- 适合大规模的数据恢复
- 如果对数据完整性不高
缺点:
- 需要一定的时间间隔进行操作!如果redis意外宕机,这个最后一次在内存中修改的数据就没有了
- fork进程的时候,会占用一定的内存空间
AOF(Append Only File)
将我们的所有写操作的命令都记录下来,history,恢复的时候就把这个文件全部在执行一遍
什么是AOF
以日志的形式来记录每个写的操作,将Redis执行过的所有指令记录下来(读操作不记录),只追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次完成数据的恢复工作
Aof保存的是appendonly.aof文件
测试AOF
1、在配置文件中开启AOF功能,默认是关闭的
2、重启redis服务,输入一些指令,检查appendonly.aof文件是否产生
客户端连接,输入指令
测试发现aof文件产生
3、关闭redis服务,删除rdb持久化文件(因为只测试aof文件,消除rdb进行数据恢复的可能性),重启redis服务
4、查看aof文件,其实就是以日志的形式纪录的所有我们运行过的写操作指令
5、关闭redis服务,破坏aof文件,重启redis服务,检查redis服务是否能够启动成功
重新启动redis,测试发现服务启动失败
6、这种情况下,我们可以通过redis-check-aof.exe命令检查并修复aof文件
# 检查并修复aof文件
D:\Redis-x64-3.2.100>redis-check-aof.exe --fix appendonly.aof
查看aof文件
重启redis,查看数据。因为aof文件修复,redis服务启动成功
查看数据,确实是执行了aof里面记录的一些指令,测试成功
优缺点
aof配置
# 每秒执行一次sync(同步),可能会丢失这1s的数据!
appendfsync everysec
# 每次修改都会sync。消耗性能
# appendfsync always
# 不执行sync,这个时候操作系统自己同步数据,速度最快!
# appendfsync no
优点:
-
每一次修改都同步,文件的完整性会更加好
-
每秒同步一次,可能会丢失这一秒的数据
-
从不同步,效率是最高的
缺点:
- 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢
- aof的运行效率也会比rdb慢,所以我们redis默认配置为rdb持久化
AOF重写规则
aof默认就是文件无限追加,文件会越来越大
如果aof文件大于64m,太大了!就会fork一个新的进程来将我们的文件进行重写!
扩展
- RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
- AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
- 只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
- 同时开启两种持久化方式
- 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF 文件保存的数据集要比RDB文件保存的数据集要完整。
- RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有 AOF可能潜在的Bug,留着作为一个万一的手段。
- 性能建议
- 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够 了,只保留 save 900 1 这条规则。
- 如果开启AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite 的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
- 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉(断电),会丢失十几分钟的数据, 启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。
Redis发布订阅
概述
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。
比如:微信、微博、关注系统
Redis客户端可以订阅任意数量的频道
三个角色:第一个消息发布者、第二个频道、第三个消息订阅者
订阅/发布消息图:
下图展示频道channel1,以及订阅这个频道的三个客户端——client2、client5和client1之间的关系
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端:
有关命令
这些命令被广泛用于构建即时通信应用,比如网络聊天室和实时广播,实时提醒等等。
实现
订阅者
# 订阅者订阅一个频道testchannel,在redis服务器中就会创建这个频道
127.0.0.1:6379> subscribe testchannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testchannel"
3) (integer) 1
# 只要订阅频道之后,订阅者就会等待推送的消息
发布者
# 发布者在testchannel频道中发布了一条消息
127.0.0.1:6379> publish testchannel hello,jinjiahuan
(integer) 1
测试
订阅者成功接收到消息
原理
Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。
Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能
微信:通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 频道!,而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定 channel 的订阅链表中。
通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
使用场景:
1、实时消息系统!
2、事实聊天!(频道当做聊天室,将信息回显给所有人即可!)
3、订阅,关注系统都是可以的!
稍微复杂的场景我们就会使用 消息中间件 MQ
Redis主从复制
概述
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower)
数据的复制是单向的,只能由主节点到从节点。
Master以写为主,Slave 以读为主。
默认情况下,每台Redis服务器都是主节点。
且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
- 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
- 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
- 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
- 高可用(集群)基石:除上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:
- 从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
- 从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。对于这种场景,我们可以使如下这种架构:
主从复制,读写分离! 80% 的情况下都是在进行读操作!减缓服务器的压力!架构中经常使用! 一主二从!
只要在公司中,主从复制就是必须要使用的,因为在真实的项目中不可能单机使用Redis!
环境搭建
理论上,需要多个服务器。每个服务器都安装redis,才能搭建环境。但在我们没有多台服务器,所以通过redis服务启动时通过指定不同的配置文件,搭建一个有多个redis服务的伪环境。这里的教程,以windows为例。linux也是一样的搭建方式。
查看库信息
默认情况下,每台redis服务器都是主节点
只配置从库,不用配置主库!
# 查看库信息
127.0.0.1:6379> info replication
# 角色 master 代表主机
role:master
# 0代表没有从机
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
复制配置文件
将redis.windows.conf文件复制三份,分别命名(可以自己自定义名称)为
- redis6379.conf
- redis6380.conf
- redis6381.conf
修改配置文件
分别修改复制的三个配置文件中的某些配置,以redis6379.conf为例,其他的两个文件如法炮制
修改端口号
# Accept connections on the specified port, default is 6379 (IANA #815344).
# If port 0 is specified Redis will not listen on a TCP socket.
port 6379
修改pid名称
修改pid名称(指定以后台的方式运行),windows在这里不支持,linux支持
修改log文件名字
# Specify the log file name. Also 'stdout' can be used to force
# Redis to log on the standard output.
logfile "6379.log"
修改dump.rdb名字
# The filename where to dump the DB
dbfilename dump6379.rdb
启动redis集群
修改完配置文件之后,分别指定不同的redis配置文件启动redis服务,这样就会有三个redis服务
启动redis6379服务和客户端
查看日志文件,服务启动成功
客户端启动成功
启动redis6380服务和客户端
启动redis6381服务和客户端
一主二从
介绍
现在查看,每一个都是主机
默认情况下,每台Redis服务器都是主节点。
我们一般情况下只用配置从机就好了!
规则就是认老大! 一主 (79)二从(80,81)
配置
方式一:使用命令配置(暂时的)
配置从机80:
# slaveof 127.0.0.1 6379 找谁当自己的老大!
127.0.0.1:6380> slaveof 127.0.0.1 6379
OK
127.0.0.1:6380> info replication
# Replication
# 当前角色是从机
role:slave
# 可以的看到主机的信息:IP
master_host:127.0.0.1
# 可以的看到主机的信息:端口
master_port:6379
master_link_status:up
master_last_io_seconds_ago:9
master_sync_in_progress:0
slave_repl_offset:15
slave_priority:100
slave_read_only:1
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6380>
配置从机81:
127.0.0.1:6381> slaveof 127.0.0.1 6379
OK
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:5
master_sync_in_progress:0
slave_repl_offset:239
slave_priority:100
slave_read_only:1
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6381>
查看主机79
127.0.0.1:6379> info replication
# Replication
# 当前角色 主机
role:master
# 有两个从机(80、81)的配置
connected_slaves:2
# 多了从机80的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=309,lag=0
# 多了从机81的配置
slave1:ip=127.0.0.1,port=6381,state=online,offset=309,lag=0
master_repl_offset:309
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:308
127.0.0.1:6379>
真实的从主配置应该在配置文件中配置,这样的话是永久的,我们这里使用的是命令,暂时的!
方式二:配置文件配置(永久的)
80配置文件
81配置文件
细节
主机可以写,从机不能写,只能读!主机中的所有信息和数据,都会自动被从机保存
79主机写数据
80,81从机查看数据
80,81从机写数据,会失败。因为是从机,不能进行写操作
测试
主机宕机
测试
主机进行断开
# 主机客户端输入命令
shutdown
exit
查看从机80,发现他的主机还是原来的主机79,从机80仍然是从机的身份
现在从机80只有一个k1
现在主机79又连接上了
主机79设定值k2:
从机80依旧可以获取到
测试总结
测试总结:主机断开连接,从机依旧连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依旧可以直接获取到主机写的信息!
从机宕机
测试
从机80断开:
查看主机79中的从机,只有一个了
现在给主机79设置一个值k3
再次连接断开的从机80
从机80中查看k3和keys*
发现从机80又变成了主机
再次把从机80变成主机79的从机:
再次在从机80查看key,有了key3
测试总结
这就说明,如果是使用命令行来配置的主从关系,那么从机如果宕机,那么当从机重新启动连接后。身份就又变成了主机。如果在宕机期间,主机进行了写操作,重新启动的从机没有办法获取主机写入的数据。只有重新配置主从关系后(身份变回从机),才能获取在宕机时,主机写入的数据。
复制原理
Slave 启动成功连接到 master 后会发送一个sync同步命令
Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行! 我们的数据一定可以在从机中看到!
层层链路模式
上一个主节点链接下一个 从节点!
就是6380认6379为老大,6381认6380为老大
所以6380就会变成既是主节点,又是从节点。
master—slaver master—slaver
数据复制测试
设置从机81的主机是从机80:
127.0.0.1:6381> slaveof 127.0.0.1 6380
OK
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
# 从机81的主机已经变成了80
master_port:6380
master_link_status:up
master_last_io_seconds_ago:7
master_sync_in_progress:0
slave_repl_offset:15
slave_priority:100
slave_read_only:1
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
此时看上去,80既是主节点,又是从节点。但是他的身份仍然是从节点
127.0.0.1:6380> info replication
# Replication
# 80依然是从机,身份没有变化
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:4
master_sync_in_progress:0
slave_repl_offset:5192
slave_priority:100
slave_read_only:1
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=197,lag=0
master_repl_offset:197
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:196
查看主机79的从机只有一个:
127.0.0.1:6379> info replication
# Replication
role:master
# 从机只有一个80
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=5276,lag=1
master_repl_offset:5276
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:5275
给主机79设置一个值k4,80、81从机都可以获取到值,依然可以实现主从复制
80从机获取值
81从机获取值
主机宕机测试
如果没有主机(主机宕机了),从机想自己变成主机使用命令SLAVEOF no one
(谋朝篡位)
现在我们将主机79断开
此时80、81还是从机
手动将从机80设置为主机,因为从机81的主机是80,80成为主机后,会看到从机81的信息
# 手动将从机设置为主机
127.0.0.1:6380> slaveof no one
OK
127.0.0.1:6380> info replication
# Replication
role:master
# 有一个81从机
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=879,lag=1
master_repl_offset:879
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:878
将主机79重新连接,发现没有。需要手动设置才能恢复宕机之前的层层链路模式
所以在层层链路模式中,如果主机宕机,需要手动配置一个主机
哨兵模式
自动选举老大的模式
概念
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用:
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题(如果这个哨兵挂了,没办法选取主机),为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover(主节点选举)过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。
当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
环境搭建
我们目前的状态是 一主二从!
配置哨兵配置文件
在redis的安装目录下新建一个名为sentinel.conf的配置文件作为哨兵配置文件
编辑哨兵配置文件
# sentinel monitor 被监视的服务名称(自定义) ip地址 端口号 1
# 后面的这个数字1,代表主机挂了,slave投票看让谁接替成为主机,票数最多的,就会成为主机!
sentinel monitor myredis 127.0.0.1 6379 1
启动哨兵
linux系统会有一个redis-sentinel的程序
通过命令就可以启动
redis-sentinel sentinel.conf
[root@hecs-362578 rconfig]# redis-sentinel sentinel.conf
434503:X 07 Dec 2021 14:46:23.877 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
434503:X 07 Dec 2021 14:46:23.877 # Redis version=6.2.1, bits=64, commit=00000000, modified=0, pid=434503, just started
434503:X 07 Dec 2021 14:46:23.877 # Configuration loaded
434503:X 07 Dec 2021 14:46:23.878 * monotonic clock: POSIX clock_gettime
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.2.1 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 434503
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
434503:X 07 Dec 2021 14:46:23.879 # Sentinel ID is 645eb5ca9edcacfee342122023fe146a0d5e09e3
434503:X 07 Dec 2021 14:46:23.879 # +monitor master myredis 127.0.0.1 6379 quorum 1
434503:X 07 Dec 2021 14:46:23.880 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
434503:X 07 Dec 2021 14:46:23.887 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
windows系统下,没有redis-sentinel程序,通过下面的命令启动
redis-server.exe sentinel.conf --sentinel
主机宕机测试
我们让79主节点宕机,哨兵就会监视到情况,并从其他从机中选举一个从机作为主机
79宕机
稍等一会,可以看到进行了选举
从机81成为了主机:
127.0.0.1:6381> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=3672,lag=0
master_repl_offset:3804
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:3803
从机80的主机变成了81:
# Replication
role:slave
master_host:127.0.0.1
master_port:6381
master_link_status:up
master_last_io_seconds_ago:0
master_sync_in_progress:0
slave_repl_offset:2324
slave_priority:100
slave_read_only:1
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:4900
哨兵日志:
[1604] 19 Nov 15:13:53.009 # Sentinel ID is f0a2b3b9a58a56bf0af2e33f6ddd37b2faac847a
[1604] 19 Nov 15:13:53.009 # +monitor master myredis 127.0.0.1 6379 quorum 1
[1604] 19 Nov 15:13:53.009 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:13:53.009 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.460 # +sdown master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.460 # +odown master myredis 127.0.0.1 6379 #quorum 1/1
[1604] 19 Nov 15:14:57.460 # +new-epoch 1
[1604] 19 Nov 15:14:57.460 # +try-failover master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.460 # +vote-for-leader f0a2b3b9a58a56bf0af2e33f6ddd37b2faac847a 1
[1604] 19 Nov 15:14:57.460 # +elected-leader master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.460 # +failover-state-select-slave master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.529 # +selected-slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.529 * +failover-state-send-slaveof-noone slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:57.599 * +failover-state-wait-promotion slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:58.564 # +promoted-slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:58.564 # +failover-state-reconf-slaves master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:58.618 * +slave-reconf-sent slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:59.646 * +slave-reconf-inprog slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:59.661 * +slave-reconf-done slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:59.708 # +failover-end master myredis 127.0.0.1 6379
[1604] 19 Nov 15:14:59.708 # +switch-master myredis 127.0.0.1 6379 127.0.0.1 6381
[1604] 19 Nov 15:14:59.708 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6381
[1604] 19 Nov 15:14:59.708 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381
[1604] 19 Nov 15:15:29.739 # +sdown slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381
通过日志的最后一句也可以看出主机是81。
如果主机79此时回来了,只能归并到新的主机81下,被当做从机,这就是哨兵模式的规则!
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
设置79成为81的从机
127.0.0.1:6379> slaveof 127.0.0.1 6381
OK Already connected to specified master
127.0.0.1:6379> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6381
master_link_status:up
master_last_io_seconds_ago:1
master_sync_in_progress:0
slave_repl_offset:18377
slave_priority:100
slave_read_only:1
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379>
优缺点
优点:
- 哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
- 主从可以切换,故障可以转移,系统的可用性就会更好
- 哨兵模式就是主从模式的升级,手动到自动,更加健壮!
缺点:
- Redis 不好在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
- 实现哨兵模式的配置其实是很麻烦的,里面有很多选择!
哨兵模式的全部配置
以上只有一个哨兵,如果是哨兵集群,需要写多个sentinel.conf配置文件
完整的哨兵模式配置文件 sentinel.conf
# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379,如果是哨兵集群,需要添加上其他的哨兵端口
port 26379
# 哨兵sentinel的工作目录
dir /tmp
# 哨兵sentinel监控的redis主节点的ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 1
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步,
# 这个数字越小,完成failover所需的时间就越长,
# 但是如果这个数字越大,就意味着越多的slave因为replication而不可用。
# 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间failover-timeout可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
#这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
#一个是事件的类型,
#一个是事件的描述。
#如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
# 这些配置一般都是由运维人员来进行配置
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
社会目前程序员饱和(初级和中极饱和)、高级程序员重金难求(提升自己)
Redis缓存穿透、击穿和雪崩
服务的高可用问题!
在这里我们不会详细的区分析解决方案的底层!
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
缓存穿透(查不到)
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒杀!),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题:
- 如果空值能够被缓存起来,这意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键
- 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿(量太大,缓存过期)
概念
这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。
加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概念
缓存雪崩
- 情况1:是指在某一个时间段,缓存集中过期失效。
- 情况2:Redis 宕机!
产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活!)
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。