Redis基础详解
目录
redis监视测试 watch unwatch -----redis实现乐观锁
一、概述
Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。免费和开源!是当下最热门的 NoSQL技术之一!也被人们称之为结构化数据库!
redis作用
- 内存存储、持久化,内存中是断电即失、所以说持久化很重要( rdb、aof )
- 效率高,可以用于高速缓存
- 发布订阅系统
- 地图信息分析
- 计时器、计数器(浏览量!)
- ......….
redis特性
- 多样的数据类型
- 持久化
- 集群
- 事务
二、Redis基础知识
redis默认有16个数据库,默认使用的是第0个,使用6379为默认端口,不区分大小写命令
可以从redis.conf文件中查看
可以使用select切换数据库,使用dbsize查看数据库大小
127.0.0.1:6379>select 3 #切换数据库
OK
127.0.0.1:6379[3]>DBSIZE #查看DB大小
(integer) 0
127.0.0.1:6379[3]>set name qq #3号数据库添加一条内容
OK
127.0.0.1:6379[3]>DBSIZE #查看DB大小
(integer)1
127.0.0.1:6379[3]>select 7
0K
127.0.0.1:6379[7]>DBSIZE
( integer)0
i27.0.0.1:6379[7]>get name
(nil)
127.0.0.1:6379[7]>select 3
OK
127.0.0.1:6379[3]>get name
"qq"
keys * 查看所有的key
127.0.0.1:6379[3]> keys * #查看数据库所有的key
1)"name"
清除当前数据库flushdb
清除全部数据库的内容FLUSHALL
127.0.0.1:6379[3]> flushdb
OK
127.0.0.1:6379[3]> keys *(empty list or set)
为什么Redis是单线程的?
因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。在单线程的情况下,就不用去考虑各种锁的问题,不存在加锁、释放锁操作,没有因为可能出现死锁而导致的性能消耗。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了。
既然是单线程,那怎么监听大量的客户端连接呢?
Redis 通过IO 多路复用程序 来监听来自客户端的大量连接(或者说是监听多个 socket),它会将感兴趣的事件及类型(读、写)注册到内核中并监听每个事件是否发生。这样的好处非常明显: I/O 多路复用技术的使用让 Redis 不需要额外创建多余的线程来监听客户端的大量连接,降低了资源的消耗(和 NIO 中的 Selector
组件很像)。
Redis的高并发和快速原因?
1.redis是基于内存的,内存的读写速度非常快(纯内存); 数据存在内存中,数据结构用HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)。
2.redis是单线程的,省去了很多上下文切换线程的时间(避免线程切换和竞态消耗)。
3.redis使用IO多路复用技术(IO multiplexing, 解决对多个I/O监听时,一个I/O阻塞影响其他I/O的问题),可以处理并发的连接(非阻塞IO)。
Redis6.0 之后为何引入了多线程?
Redis6.0 引入多线程主要是为了提高网络 IO 读写性能,因为这个算是 Redis 中的一个性能瓶颈(Redis 的瓶颈主要受限于内存和网络)。虽然,Redis6.0 引入了多线程,但是 Redis 的多线程只是在网络数据的读写这类耗时操作上使用了,在核心的消费模块执行命令仍然是单线程顺序执行。因此,不需要担心线程安全问题。Redis6.0 的多线程默认是禁用的,只使用主线程。如需开启需要修改 redis 配置文件 redis.conf
io-threads-do-reads yes
开启多线程后,还需要设置线程数,否则是不生效的。同样需要修改 redis 配置文件 redis.conf
io-threads 4 #官网建议4核的机器建议设置为2或3个线程,8核的建议设置为6个线程
三、Redis五大数据类型,三种特殊数据类型
Redis是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。它支持多种类型的数据结构,如字符串 ( strings )),散列 ( hashes ),列表( lists ),集合( sets ),有序集合( sorted sets)与范围查询bitmaps , hyperloglogs和地理空间(geospatial)索引半径查询。Redis内置了复制( replication ),LUA脚本(Luascripting),LRU驱动事件( LRU eviction ),事务 ( transactions )和不同级别的磁盘持久化 ( persistence),并通过Redis哨兵( Sentinel )和自动分区( Cluster )提供高可用性( high availability )。
启动Redis
1、启动Redis
建议通过配置文件来启动Redis。
> redis-server xx/xx/redis.conf
2、连接Redis
> redis-cli -p 6379
3、测试连通性
127.0.0.1:6379> ping PONG
确保redis运行
ps -ef|grep redis
4、停止Redis
> redis-cli shutdown > kill redis-pid
以上两条停止Redis命令效果一样。
Redis-Key
127.0.0.1:6379> set age 1 #设置值
127.0.0.1:6379> get age # 根据 key 获得对应的 value
127.0.0.1:6379> keys * #查看所有key值
127.0.0.1:6379> EXISTS age #判断某个 key 是否存在
127.0.0.1:6379> move age 1 #在当前库移除age,1代表当前库
127.0.0.1:6379> expire age 60 #设置过期时间 数据在 60s 后过期
127.0.0.1:6379> ttl age # 查看数据还有多久过期
127.0.0.1:6379> type age # 查看当前key的类型
String(字符串)
127.0.0.1:6379> set key1 v1 #设置值
127.0.0.1:6379> get key1 #获得值
127.0.0.1:6379> keys * #获得所有的key
127.0.0.1:6379> EXISTS key1 #判断某一个key是否存在
127.0.0.1:6379>APPEND key1 "hello" #追加字符串,如果当前key不存在,就相当setkey
127.0.0.1:6379> get key1"v1hello"
127.0.0.1:6379>STRLEN key1 #获取字符串的长度!#########################################################################
i++
i+=
127.0.0.1:6379> incr number # 自增1 将 key 中储存的数字值增一
127.0.0.1:6379> decr number # 自减1 将 key 中储存的数字值减一
127.0.0.1:6379>INCRBY views 10 # 可以设置步长,指定增量!
127.0.0.1:6379>DECRBY views 10 # 可以设置步长,指定减量!####################################################################
字符串范围range
127.0.0.1:6379> GETRANGE key1 0 3 #截取字符串[0,3]
127.0.0.1:6379>GETRANGE key1 0 -1 #获取全部的字符串 和 get key是一样的
127.0.0.1:6379>SETRANGE key2 1 xx #替换从指定位置开始的字符串!####################################################################### setex (set with expire)#设置过期时间
setnx (set if not exist)#不存在在设置(在分布式锁中会常常使用!)
127.0.0.1:6379> setex key3 30 "he11o" #设置key3 的值为hello,30秒后过期
127.0.0.1:6379> setnx mykey "redis" #如果mykey 不存在,创建mykey(integer) 1
127.0.0.1:6379> setnx mykey "MongoDB" #如果mykey存在,创建失败!(integer) 0
127.0.0.1:6379> get mykey"redis"
######################################################################批量设置mset mget
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 #同时设置多个值
127.0.0.1:6379> mget k1 k2 k3 #同时获取多个值
127.0.0.1:6379> msetnx k1 v k4 v4 #msetnx是一个原子性的操作,要么一起成功,要么一起失败!(integer) 0
127.0.0.1:6379> get k4(nil)
#对象
set user:1 {name:zhangsan , age:3]#设置一个user:1对象值为json字符来保存一个对象!
# 这里的key是一个巧妙的设计: user:{id}:{filed} ,如此设计在Redis中是完全oK了!
127.0.0.1:6379> mset user: 1:name zhangsan user:1:age 2
127.0.0.1:6379> mget user: 1:name user: 1:age1) "zhangsan"
2) "2"
##################################################################getset # 先get然后在set
127.0.0.1:6379> getset db redis # 如果不存在值,则返回 nil(nil)
127.0.0.1:6379> get db"redis"
127.0.0.1:6379> getset db mongodb #如果存在值,获取原来的值,并设置新的值"redis"
127.0.0.1:6379> get db"mongodb"
应用场景
- 1、缓存功能:部分数据第一次查询查询数据库,查询完后存入redis中,后续再获取可以从redis中获取
- 2、验证码:网站登录中常有验证码,我们可以用此数据类型,手机号作为key,验证码作为value存储在redis中,设置过期时间,后续如果用户输入验证码,我们从redis中取值对比,如果过期则无效
- 3、数字计数:点赞数、访问量、关注数等
- 4、存储对象
- 5、共享session 分布式服务会将用户信息的访问均衡到不同服务器上,用户刷新一次访问可能会需要重新登录,为避免这个问题可以用redis将用户session集中管理,每次获取用户更新或查询登录信息都直接从redis中集中获取
- 6、分布式锁 string类型的setnx的作用是“当key不存在时,设值并返回1,当key已经存在时,不设值并返回0”,“判断key是否存在"和"设值"两个操作是原子性地执行的,因此可以用string类型作为分布式锁,返回1表示获得锁,返回0表示没有获得锁。适用场景:在一个集群环境下,多个web应用时对同一个商品进行抢购和减库存操作时,可能出现超卖时会用到分布式锁
LIst(列表)
Redis 的 list 的实现为一个 双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销。在redis里面,我们可以把list,用作栈、队列、消息队列,所有list命令都是以l开头的
###################################################################
LPUSH 、Rpush、 LRANGE
127.0.0.1:6379>LPUSH list one # 将一个值或者多个值,插入到列表头部(左)
127.0.0.1:6379> LPUSH list two # two one
127.0.0.1:6379> LPUSH list three # three two one
127.0.0.1:6379> LRANGE list 0 1 # 查看对应下标的list列表, 0 为 start,1为 end1) "three"
2)"two"
127.0.0.1:6379> Rpush list righr #将一个值或者多个值,插入到列表位部(右)
127.0.0.1:6379>LRANGE list 0 -1 # three two one righr1) "three"
2)"two"3) "one"
4) "righr"
###################################################################
LPOP 、 RPOP
127.0.0.1:6379> Lpop list #移除list的第一个元素"three"
127.0.0.1:6379> Rpop list #移除list的最后一个元素"righr"Lindex
127.0.0.1:6379> lindex list 1 # 通过下标获得list中的某一个值
Llen
127.0.0.1:6379> llen list # 返回列表的长度Lrem
127.0.0.1:6379> Lrem list 3 one # 移除list集合中指定个数的value,精确匹配 移除三个one
Ltrim127.0.0.1:6379>ltrim mylist 1 2 #通过下标截取指定的长度,这个list已经被改变了,截断了只剩下截取的元素!
###################################################################
rpoplpush
127.0.0.1:6379> rpush mylist "hello"
127.0.0.1:6379> rpush mylist "hello1"
127.0.0.1:6379> rpush my1ist "hello2" # hello hello1 hello2127.0.0.1:6379>rpoplpush mylist myotherlist #移除列表的最后一个元素,将他移动到新的列表中!
127.0.0.1:6379> lrange mylist 0 -1 # 查看原来的列表1) "hello"
2) "hello1"
127.0.0.1:6379> lrange myotherlist 0 -1 #查看目标列表中,确实存在改值!1) "hello2"
###################################################################
Lset 将列表中指定下标的值替换为另外一个值,更新操作127.0.0.1:6379> EXISTS list #判断这个列表是否存在
127.0.0.1:6379> lset list 0 item #如果不存在列表我们去更新就会报错(error) ERR no such key
127.0.0.1:6379> lpush list value1(integer) 1
127.0.0.1:6379> LRANGE list 0 01) "value1"
127.0.0.1:6379> lset list 0 item #如果存在,更新当前下标0的值,为item
127.0.0.1:6379> LRANGE list 0 01) "item"
127.0.0.1:6379> lset list 1 other #如果列表元素不存在,也会报错!(error) ERR index out of range
###################################################################
LINSERT #将某个具体的value插入到列表中某个元素的前面或者后面!
127.0.0.1:6379> Rpush mylist "hello"
127.0.0.1:6379> Rpush mylist "world"
127.0.0.1:6379> LINSERT mylist before "world" "other"
127.0.0.1:6379> LRANGE mylist 0 -11)"hel1o"
2) "other"3)"wor1d"
127.0.0.1:6379>LINSERT mylist after world new
127.0.0.1:6379> LRANGE mylist 0 -11) "he1lo"
2) "other"3) "world"
4) "new"
应用场景
- 1. 消息队列 list类型的lpop和rpush(或者反过来,lpush和rpop)能实现队列的功能,不过我不推荐在实战中这么使用,因为现在已经有Kafka、NSQ、RabbitMQ等成熟的消息队列了,它们的功能已经很完善了。
- 2. 排行榜 list类型的lrange命令可以分页查看队列中的数据。可将每隔一段时间计算一次的排行榜存储在list类型中,如京东每日的手机销量排行、学校每次月考学生的成绩排名、斗鱼年终盛典主播排名等,每计算一次,存储在list类型中,接口访问时,通过page和size分页获取打擂金曲。只有定时计算的排行榜才适合使用list类型存储,与定时计算的排行榜相对应的是实时计算的排行榜,list类型不能支持实时计算的排行榜
- 3. 最新列表 list类型的lpush命令和lrange命令能实现最新列表的功能,每次通过lpush命令往列表里插入新的元素,然后通过lrange命令读取最新的元素列表,如朋友圈的点赞列表、评论列表。只有不需要分页(比如每次都只取列表的前5个元素)或者更新频率低(比如每天凌晨更新一次)的列表才适合用list类型实现。对于需要分页并且会频繁更新的列表,需用使用有序集合sorted set类型实现。另外,需要通过时间范围查找的最新列表,list类型也实现不了,也需要通过有序集合sorted set类型实现,如以成交时间范围作为条件来查询的订单列表。
对于排行榜和最新列表两种应用场景,list类型能做到的sorted set类型都能做到,list类型做不到的sorted set类型也能做到,那为什么还要使用list类型去实现排行榜或最新列表呢,直接用sorted set类型不是更好吗?原因是sorted set类型占用的内存容量是list类型的数倍之多
Set(集合)
set 类似于 Java 中的 HashSet
。Redis 中的 set 类型是一种无序集合,集合中的元素没有先后顺序。当你需要存储一个列表数据,又不希望出现重复数据时,set 是一个很好的选择,并且 set 提供了判断某个成员是否在一个 set 集合内的重要接口,这个也是 list 所不能提供的。可以基于 set 轻易实现交集、并集、差集的操作。比如:你可以将一个用户所有的关注人存在一个集合中,将其所有粉丝存在一个集合。Redis 可以非常方便的实现如共同关注、共同粉丝、共同喜好等功能。这个过程也就是求交集的过程。
sadd
127.0.0.1:6379> sadd mySet value1 value2 # 添加元素进去
(integer) 2
127.0.0.1:6379> sadd mySet value1 # 不允许有重复元素
(integer) 0smembers
127.0.0.1:6379> smembers mySet # 查看 set 中所有的元素
1) "value1"
2) "value2"scard
127.0.0.1:6379> scard mySet # 查看 set 的长度
(integer) 2sismember
127.0.0.1:6379> sismember mySet value1 # 检查某个元素是否存在set 中,只能接收单个元素srem
127.0.0.1:6379>srem myset value1 #移除set集合中的指定元素
SRANDMEMBER
127.0.0.1:6379>SRANDMEMBER myset #随机抽选出一个元素127.0.0.1:6379>SRANDMEMBER myset 2 #随机抽选出指定个数的元素
spop
127.0.0.1:6379>spop myset #随机删除一些set集合中的元素!
smove
127.0.0.1:6379> smove myset myset2 "asdf" # 将一个指定的值,移动到另外一个set集合!SDIFF 差集 SINTER 交集 SUNION 并集
127.0.0.1:6379> sinterstore mySet3 mySet mySet2 # 获取 mySet 和 mySet2 的交集并存放在 mySet3 中127.0.0.1:6379>SDIFF key1 key2 #差集 不同的
127.0.0.1:6379> SINTER key1 key2 #交集 共有的
127.0.0.1:6379>SUNION key1 key2 #并集
应用场景
- 1. 标签 比如我们博客网站常常使用到的兴趣标签,把一个个有着相同爱好,关注类似内容的用户利用一个标签把他们进行归并。
- 2、关注好友//粉丝/感兴趣的人集合 共同好友功能,共同喜好,或者可以引申到二度好友之类的扩展应用。
- 3、统计网站的独立IP 利用set集合当中元素不唯一性,可以快速实时统计访问网站的独立IP。
-
4、 随机展示 通常,app首页的展示区域有限,但是又不能总是展示固定的内容,一种做法是先确定一批需要展示的内容,再从中随机获取。
-
5、黑名单/白名单 经常有业务出于安全性方面的考虑,需要设置用户黑名单、ip黑名单、设备黑名单等,set类型适合存储这些黑名单数据,sismember命令可用于判断用户、ip、设备是否处于黑名单之中。
Hash(哈希)
hash 类似于 JDK1.8 前的 HashMap,内部实现也差不多(数组 + 链表)。不过,Redis 的 hash 做了更多优化。另外,hash 是一个 string 类型的 field 和 value 的映射表,特别适合用于存储对象,后续操作的时候,你可以直接仅仅修改这个对象中的某个字段的值。 比如我们可以 hash 数据结构来存储用户信息,商品信息等等。
hset、hget
127.0.0.1:6379> hset myhash field1 hello # set 一个具体的key-value
127.0.0.1:6379> hget myhash field1 # 获取存储在哈希表中指定字段的值。
hmset、 hmget
127.0.0.1:6379> hmset myhash field1 hello field2 world # set多个 key-vlaue
127.0.0.1:6379> hmget myhash field1 field2 #获取多个字段值1) "hello"
2)"world"hgetall
127.0.0.1:6379> hgetall myhash # 获取在哈希表中指定 key 的所有字段和值1)"field1"
2) "hello"
3)"field2"4)"world"
hdel
127.0.0.1:6379> hdel myhash field1 #删除hash指定key字段!对应的value值也就消失了!
hlen
127.0.0.1:6379> hlen myhash #获取hash表的字段数量!hexists
127.0.0.1:6379> hexists myhash field1 # 查看 hash中指定的字段是否存在。
hkeys、 hvals
127.0.0.1:6379> hkeys myhash # 获取 key 列表
127.0.0.1:6379> hvals myhash # 获取 value 列表
HINCRBY、HDECRBY、hsetnx
127.0.0.1:6379> HINCRBY myhash field3 1 #指定增量
127.0.0.1:6379> HDECRBY myhash field3 4
127.0.0.1:6379> hsetnx myhash field4 hello #如果不存在则可以设置
127.0.0.1:6379> hsetnx myhash field4 world #如果存在则不能设置对象
127.0.0.1:6379> hset userInfoKey name "guide" description "dev" age "24"
hash变更的数据user name age,尤其是是用户信息之类的,经常变动的信息! hash更适合于对象的存储,String更加适合字符串存储!
应用场景
hash类型是一个string类型的field和value的映射表
- 1. 购物车 以用户id为key,商品id为field,商品数量为value,恰好构成了购物车的3个要素
- 2. 存储对象 hash类型的(key, field, value)的结构与对象的(对象id, 属性, 值)的结构相似,也可以用来存储对象。
一般对象用string + json存储,对象中某些频繁变化的属性抽出来用hash存储
Sorted set(Zset有序集合)
和 set 相比,sorted set 增加了一个权重参数 score,使得集合中的元素能够按 score 进行有序排列,还可以通过 score 的范围来获取元素的列表。有点像是 Java 中 HashMap 和 TreeSet 的结合体。set k1 v1 ----- zset k1 score1 v1
zadd
127.0.0.1:6379> zadd myZset 3.0 value1 # 添加元素到 sorted set 中 3.0 为权重
127.0.0.1:6379> zadd myZset 2.0 value2 1.0 value3 # 一次添加多个元素
zrange
127.0.0.1:6379> zrange myZset 0 -1 # 顺序输出某个范围区间的元素,0 -1 表示输出所有元素
zrevrange
127.0.0.1:6379> zrevrange myZset 0 -1 # 逆序输出某个范围区间的元素,0 为 start 1 为 stop
zcard
127.0.0.1:6379> zcard myZset # 查看 sorted set 中的元素数量
zscore
127.0.0.1:6379> zscore myZset value1 # 查看某个 value 的权重
ZRANGEBYSCORE
127.0.0.1:6379>ZRANGEBYSCORE salary -inf +inf # 显示全部的用户从小到大!
127.0.0.1:6379>ZRANGEBYSCORE salary -inf +inf withscores # 显示全部的用户并且附带成绩
127.0.0.1:6379>ZRANGEBYSCORE salary -inf 2500 withscores # 显示工资小于2500员工的升序排序!zrem
127.0.0.1:6379>zrem salary xiaohong #移除有序集合中的指定元素zcount
127.0.0.1:6379> zcount myset 1 3 # 获取指定区间的成员数量!
应用场景
- 1、根据时间排序的新闻列表等,
- 2、 阅读排行榜
geospatial(地理位置)
朋友的定位,附近的人,打车距离计算?Redis的Geo在Redis3.2版本就推出了!这个功能可以推算地理位置的信息,两地之间的距离,方圆几里的人!
- 有效的经度从-180度到180度。
- 有效的纬度从-85.05112878度到85.05112878度。
当坐标位置超出上述指定范围时,该命令将会返回一个错误。
getadd 添加地理位置
#规则:两级无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入#参数key值(经度、纬度、名称)
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou 108.96 34.26 xian
getpos127.0.0.1:6379>GEOPos china:city beijing # 获取指定的城市的经度和纬度!
1)1) "116.39999896287918091"
2) "39.90000009167092543"
getdist 指定单位的参数 unit 必须是以下单位的其中一个:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
没有显式地指定单位参数, 那么
GEODIST
默认使用米作为单位。127.0.0.1:6379>GEODIST china: city beijing shanghai #返回两个给定位置之间的直线距离
"1067378.7564"
127.0.0.1:6379> GEODIST china: city beijing shanghai km"1067.3788"
georadius以给定的经纬度为中心, 找出某一半径内的元素【以自己为中心找附近的人】127.0.0.1:6379>GEORADIUS china:city 110 30 1000 km # 以110,30这个经纬度为中心,寻找方圆1000km内的城市
127.0.0.1:6379> GEORADIus china:city 110 30 500 km withdist #在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。127.0.0.1:6379> GEORADIus china:city 110 30 500 km WITHCOORD #将位置元素的经度和维度也一并返回。
127.0.0.1:6379> GEORADIus china:city 110 30 500 km withdist WITHCOORD count 1 #指定返回结果数量
ASC
: 根据中心的位置, 按照从近到远的方式返回位置元素。DESC
: 根据中心的位置, 按照从远到近的方式返回位置元素。GEORADIUSBYMEMBER找出位于指定范围内的元素,中心点是由给定的位置元素决定
【导航定位】
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1)"beijing"
2) "xian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 400 km1) "hangzhou"
2) "shanghai"
geohash返回一个或多个位置元素的Geohash表示# 将二维的经纬度转换为一维的字符串(11位),如果两个字符串越接近,那么则距离越近!
127.0.0.1:6379> geohash china:city beijing chongqi1) "wx4fbxxfkeo"
2) "wm5xzrybtyo"
GEO底层的实现原理其实就是Zset !我们可以使用Zset命令来操作geo !
127.0.0.1:6379>ZRANGE china:city 0 -1 #查看地图中全部的元素
127.0.0.1:6379> zrem china:city beijing #移除指定元素
Hyperloglog
基数(不重复的元素)统计的算法,占用内存固定,2^64不同的元素只需要12KB的内存。比如网页的UV (一个人访问一个网站多次,但是还是算作一个人! )传统的方式,set保存用户的id,然后就可以统计set 中的元素数量作为标准判断!这个方式如果保存大量的用户id,就会比较麻烦!我们的目的是为了计数,而不是保存用户id ;hyperloglog 会有0.81%错误率!统计UV任务,可以忽略不计的!
PFadd PFCOUNT PFMERGE
127.0.0.1:6379> PFadd mykey a b c d e f g h i j # 创建第一组元素 mykey
127.0.0.1:6379>PFCOUNT mykey # 统计 mykey元素的基数数量(integer) 10
127.0.0.1:6379> PFadd mykey2 i j z x c v b n m #创建第二组元素
127.0.0.1:6379> PFCOUNT mykey2(integer) 9
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 #合并两组 mykey mykey2 => mykey3并集
127.0.0.1:6379> PFCOUNT mykey3 #看并集的数量!(integer) 15
如果允许容错,那么一定可以使用Hyperloglog ! 如果不允许容错,就使用set或者自己的数据类型即可!
Bitmaps
位存储,0 1 0 1 0 1......统计用户信息,活跃,不活跃!登录、未登录!打卡,365打卡! Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态!
使用bitmap来记录周一到周日的打卡!
周一∶1周二:0周三:0 周四:1 周五∶1 周六:0 周日:0
setbit
127.0.0.1:6379>setbit sign 0 1
127.0.0.1:6379>setbit sign 1 0
127.0.0.1:6379>setbit sign 2 0
127.0.0.1:6379>setbit sign 3 1
127.0.0.1:6379>setbit sign 4 1
127.0.0.1:6379>setbit sign 5 0
127.0.0.1:6379>setbit sign 6 0查看某一天是否打卡
getbit
127.0.0.1:6379>getbit sign 6
统计操作,统计打卡的天数
bitcount
127.0.0.1:6379>bitcount sign
四、事务
Redis 中的事务是一组命令的集合,是 Redis 的最小执行单位。它可以保证一次执行多个命令,事务中的所有命令都会序列化、按顺序地执行。服务端在执行事务的过程中,不会被其他客户端发送来的命令请求打断。redis事务没有隔离级别的概念,不会出现脏读幻读之类的,所有的命令在事务中,并没有直接被执行!只有发起执行命令exec的时候才会执行,Redis单条命令式保存原子性的,但是事务不保证原子性!
redis的事务∶
- 开启事务(multi)
- 命令入队(.....)
- 执行事务(exec)
127.0.0.1:6379> multi #开启事务
127.0.0.1:6379> set k1 v1
127.0.0.1:6379> set k2 v2
127.0.0.1:6379>get k2
127.0.0.1:6379>set k3 v3
127.0.0.1:6379>exec #执行
放弃事务
127.0.0.1:6379> multi
127.0.0.1:6379> set k1 v1
127.0.0.1:6379> set k2 v2
127.0.0.1:6379>discard #取消事务,事务队列中的命令都不会执行
编译型异常(代码有问题!命令有错!)),事务中所有的命令都不会被执行!
127.0.0.1:6379> multi
127.0.0.1:6379>set k1 v1
127.0.0.1:6379> set k2 v2
127.0.0.1:6379> getset k3 #没有这个命令,命令错误
(error) ERR wrong number of arguments for 'getset' command127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379>set k5 v5QUEUED
127.0.0.1:6379>exec #运行报错
(error)EXECABORT Transaction discarded because of previous errors.127.0.0.1:6379> get k5 #查看,队列中命令果然都没有执行
(nil)
运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常
127.0.0.1:6379>set k1 "v1"
127.0.0.1: 6379> multi
127.0.0.1:6379>incr k1 #字符串类型自增报错
127.0.0.1:6379>set k2 v2
127.0.0.1:6379> set k3 v3
127.0.0.1:6379>get k3
127.0.0.1:6379>exec #运行第一条命令抛出,会执行其他命令
1) (error)ERR value is not an integer or out of range2)OK
3) OK4) "v3"
乐观锁和悲观锁
悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。也就是说,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。
乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了(具体方法可以使用版本号机制或 CAS 算法)。
redis监视测试 watch unwatch -----redis实现乐观锁
五、Jedis(使用Java来操作 Redis)
使用Java来操作 Redis,什么是Jedis?是Redis官方推荐的java连接开发工具!使用Java操作Redis 中间件!如果你要使用java操作redis,那么一定要对Jedis十分的熟悉!
1、导入对应的依赖
2、编码测试∶
- 连接数据库
- 操作命令
- 断开连接
连接redis,输出pong代表连接成功
再次理解事务
六、SpringBoot集成Redis
SpringBoot操作数据:spring-data jpa jdbc mongodb redis !SpringData也是和SpringBoot齐名的项目!说明︰在SpringBoot2.x之后,原来使用的jedis被替换为了lettuce?
jedis :采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool连接池!更像BIO模式
lettuce : 采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式
整合测试
1、导入依赖
<!--操作redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、配置连接
#配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
3、测试
@SpringBootTest
class Redis02SpringbootApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
// redisTemplate操作不同的数据类型,api和我们的指令是一样的
// opsForvalue 操作字符串 类似string
// opsForList 操作List 类似List
// opsForset
// opsForHash
// opsForzsetl
// opsForGeo
// opsForHyperLogLog
//除了进本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务,和基本的CRUD
//获取redis的连接对象
//Redisconnection connection = redisTemplate.getconnectionFactory().getconnection ();
//connection.flushDb();
//connection.flushAll();
redisTemplate.opsForvalue().set("mykey", "asdf");
system.out.println(redisTemplate.opsForvalue().get("mykey"));
}
}
七、自定义RedisTemplate
真实的开发一般使用json来传递对象,直接传递对象会报错,所有的对象需要序列化
@Component
@AllArgsConstructor
@NoArgsConstructorData
//在企业中,我们的所有pojo都会序列化
SpringBootpublic class User implements Serializable {
private String name;
private int age;
}
自定义redisTemplate
@Configuration
public class RedisConfig{
//自己定义一个RedisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate (RedisconnectionFactory factory){
//我们为了自己开发方便,一般直接使用<String, object>
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// Value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的Value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
RedisUtil封装工具类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
class SpringbootRedisApplicationTests {
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate redisTemplate;
@Autowired
private RedisUtil redisUtil;
@Test
public void test1() {
redisUtil.set("myKey","myValue");
System.out.println(redisUtil.get("myKey"));
}
}
八、Redis.conf详解
启动时通过配置文件来启动的
> redis-server xx/xx/redis.conf
单位
包含
网络
bind 127.0.0.1 #绑定的ip
protected-mode yes #保护模式,默认开启的
port 6379 #端口设置
通用GENERAL
daemonize yes #以守护进程的方式运行(后台运行),默认是no,需要我们自己开启为yes
logfile "" #日志的文件位置名,为空表示标准的输出
databases 16 #数据库的数量,默认是16个数据库
always-show-logo yes # 是否总是显示LoGo
快照
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb. aof
redis是内存数据库,如果没有持久化,那么数据断电及失!
#如果900s内,如果至少有一个1 key进行了修改,我们及进行持久化操作save 900 1
#如果300s内,如果至少10 key进行了修改,我们及进行持久化操作save 300 10
# 如果60s内,如果至少10000 key进行了修改,我们及进行持久化操作save 60 10000
#我们之后学习持久化,会自己定义这个测试!stop-writes-on-bgsave-error yes #持久化如果出错,是否还需要继续工作!
rdbcompression yes # 是否压缩rdb文件,需要消耗一些cpu资源!
rdbchecksum yes #保存rdb文件的时候,进行错误的检查校验!
dir./ #rdb文件保存的目录!
REPLICATION复制
我们后面讲解主从复制的,时候再进行讲解
SECURITY安全
#requirepas foobared 可以在这里设置redis的密码,默认是没有密码!
CLIENTS限制
maxclients 10000 #设置能连接上redis的最大客户端的数量
maxmemory <bytes> #redis 配置最大的内存容量
maxmemory-policy noeviction #内存到达上限之后的处理策略
- 1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
- 2、allkeys-lru :删除lru算法的key
- 3、volatile-random:随机删除即将过期key
- 4、allkeys-random:随机删除
- 5、volatile-ttl :删除即将过期的
- 6、noeviction :永不过期,返回错误
APPEND ONLY 模式aof配置
appendonly no #默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,rdb完全够用!
appendfilename "appendonly.aof" #持久化的文件的名字
#appendFsync always #每次修改都会sync。消耗性能
appendfsync everysec #每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no #不执行sync,这个时候操作系统自己同步数据,速度最快!
具体的配置,在 Redis持久化中去给大家详细详解!