Redis高级篇 Redis单线程 VS 多线程 入门 面试题 1、redis到底是单线程还是多线程
2、IO多路复用
3、redis为什么快?
Redis为什么选择单线程? redis3以及之前是单线程的。
redis4之后才慢慢支持多线程 支持异步删除,部分多线程,直到redis6/7之后才稳定,完全支持多线程。
那以前我们说redis是单线程是什么意义
主要是指Redis的网络IO和键值对读写是由一个线程来完成的,Redis在处理客户端的请求时包括获取(socket读),解析,执行,内容返回(socket写)等都由一个顺序串行的主线程处理,这就是所谓的单线程,这也是redis对外提供剪枝存储服务的主要流程。
但是Redis的其他功能,比如持久化RDB、AOF、异步删除、集群数据同步等等,其实是由额外的线程执行的。Redis命令工作线程是单线程的,但是,整个Redis来说,是多线程的。
也就是说一个命令set但是这时候需要RDB,AOF,这时候就会新开一个线程,处理这些。
但是操作十大类型的原子性操作都是单线程的。
Redis3单线程时代但性能依旧很快的原因
从硬件的角度,Redis是基于内存的操作,所有数据都在内存中,因此所有的运算都是内存级别的,所以性能高。
数据结构简单,Redis的数据结构是专门设计的,这些简单的数据结构的查找和操作大部分是O(1)
多路复用和非阻塞 I/O,Redis使用 I/O多路复用功能来箭筒多个socket连接客户端,这样就可以使用一个线程连接来处理多个请求,减少线程切换带来
IO 多路复用 = 用一个线程,通过操作系统提供的机制(如 epoll),同时监听多个网络连接的 I/O 事件,并在事件发生时及时处理,从而实现高并发、低资源消耗的网络服务。
避免上下文竞争,因为是单线程所以不会有锁竞争等。
Redis是基于内存操作的, 因此他的瓶颈可能是机器的内存或者网络带宽而非CPU ,所以后面会支持多线程。
在Redis4之前,一直使用单线程的主要原因
1、开发、维护更简单
2、即时使用单线程模型也并发的处理多客户端的请求,主要使用的是IO多路复用和非阻塞IO
3、对Redis系统来说,主要的性能瓶颈是内存或网络带宽而并非CPU
既然单线程那么好,为什么逐渐加入多线程特性?
1、硬件的发展,CPU都是多核时代了,如果还是单线程对性能的使用效率不够充分
2、单线程的痛点是什么?不得不修改?
对于redis del k1,这些命令都是原子的。如果del 的是一个hash结构非常复杂的大key,可能会删半天。这就会造成Redis主线程的卡顿。
解决的方法就是使用 惰性删除 (不立即删除,而是下次有人访问的时候删除并告诉访问的人没有了),于是在Redis4版本后,新增了多线程模块,当然4版本中的多线程主要是为了解决删除数据效率比较低的问题。
1 2 3 unlink k1 异步删除k1 flushdb async 异步删除db flushall async 异步删除所有redis
redis6/7的多线程特性和IO多路复用的入门篇
首先影响Redis性能的主要因素:CPU、内存、网络IO
最后Redis的瓶颈可以初步定为网络IO
Redis6/7中,第一个新特性就是多线程。随着网络硬件性能的提升,Redis的性能瓶颈有时会出现在网络IO的处理上,也就是说,单个主线程处理网络请求的速度跟不上底层网络硬件的速度。
为了应对这个问题:采用多个IO线程来处理网络请求,提高网络请求处理的并行速度。
但是,Redis的多IO献策会给你只是用来处理网络请求的(也就是客户端socket连接),对于读写操作命令Redis仍然使用单线程来处理。而继续使用单线程执行命令操作,就不用为了保证Lua脚本,事务的原子性,额外开发多线程互斥加锁机制了,这样一来,Redis线程模型实现就简单了
总结就是网络IO请求是多线程,真正执行命令还是单线程
只要连同一个redis服务器,就命令都是一样的,如果每次连都连一个新的会大费周章
就像多人共用一个 Excel 表格,频繁创建和销毁 Redis 连接代价很高
1 2 3 4 5 6 7 8 9 10 # 每次都新建连接 redis-cli -a 11111 SET k1 v1 redis-cli -a 11111 SET k2 v2 redis-cli -a 11111 GET k1 # 启动一个 redis-cli 会话,复用连接 redis-cli -a 11111 <<EOF SET k1 v1 SET k2 v2 GET k1 EOF
主线程和IO线程是怎么协作完成请求处理的?
四个阶段
首先主线程,接受建立连接请求,获取Socket(也就是redic-cli -a 密码 -c -p 端口号是标配)。然后将Socket放入全局等待队列。然后以论询方式将Socket连接分配给IO线程。然后主线程阻塞,等待IO县城完成请求读取和解析。然后IO线程开始执行,IO线程将Socket和线程绑定。然后IO线程读取Socket中的请求并解析。请求解析完成。然后回到主线程开始执行。然后主线程执行请求的命令操作。然后主线程请求的命令操作执行完成。然后主线程将结果数据写入缓冲区。然后主线程阻塞,等待IO线程完成数据回写Socket。然后IO线程开始执行。IO线程将结果数据回写Socket。然后Socket回写完成(相当于返回OK)。然后主线程开始执行,清空等待队列,等待后续请求。
阶段一:服务端和客户端建立Socket连接,并分配处理线程
首先,主线程负责接受建立连接请求。当有客户端请求和实例建立Socket连接时,主线程会创建和客户端的连接,并把Socket放入全局等待队列中。紧接着,主线程通过轮询方法把Socket连接分配给IO线程。
阶段二: IO线程读取并解析请求
主线程一旦把Socket分配给IO线程,就会进入阻塞状态,等待IO县城完成客户端请求读取和解析。因为有多个IO线程在并行处理,所以这个过程很快就完成。
阶段三:主线程执行请求操作
等到IO线程解析完请求,主线程还是会以单线程的方式执行这些命令操作。
阶段四:IO线程回写Socket和主线程清空全局队列
当主线程执行完请求操作后,会把需要返回的结果写入缓冲区,然后,主线程会阻塞等待IO线程,把这些结果回写到Socket中,并返回给客户端。和IO线程读取和解析请求一样,IO线程回写Socket时,也是有多个线程在并发执行,所以回写Socket的速度也很快。等到IO线程回写Socket完毕,主线程会清空全局队列,等待客户端的后续请求。
读到这里会有个问题:首先这个发送Socket请求然后有一个连接然后用IO线程解析命令这个是多线程的,那么多线程的IO线程谁先解析完谁给主线程执行命令,那么不就没办法保证解析的顺序,并且返回给主线程的顺序也没法保证吗?
Redis 的多线程 I/O 是“并行读取 + 串行提交”
IO 线程可以并行读取多个 socket 的数据,但必须按“socket 在事件循环中的就绪顺序”将解析结果提交给主线程。
后面会更加详细解释。
现在还有个问题,那这个IO线程是什么呢?
Unix网络编程中的五种IO模型 Blocking IO 阻塞IO
NoneBlocking IO 非阻塞IO NIO
IO multiplexing IO多路复用
1、Linxu世界一切皆文件
文件描述符,简称FD,句柄。FileDescriptor
文件描述符是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,他是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
这个在JAVA1.0就有了,上面看不懂,通俗来说就是:我连上了我的redis,然后通过用redis-cli -a 密码连接,现在连上了,也就是java发起了一个连接请求,操作系统分配的 听到了,然后会返回一个fd
文件描述符(FD) = 操作系统给“打开的资源”发的一个“编号门票” 你连 Redis、读文件、开网络连接……只要用了系统资源,OS 就给你一张“票”(FD),以后你干啥都靠这张票找 OS 要服务。
2、首次浅谈IO多路复用,IO多路复用是什么
一种同步的IO模型,实现一个线程监视 多个文件句柄 ,一旦某个文件句柄就绪 ,就能够通知到对应应用程序进行相应的读写操作,没有文件句柄就绪时 就会阻塞应用程序,从而释放CPU资源。
Redis没法保证发送顺序,因为这是分布式系统的正常,因为可能因为网络波动或者别的因素影响发送到达的顺序,但 Redis 100% 保证:同一个客户端连接内的命令,严格按照发送顺序执行。
详细概念解释
I/O:网络I/O,尤其在操作系统层面指数据在内核态和用户态之间的读写操作。
网络 I/O 操作(比如读写 socket)一定会在“用户态”和“内核态”之间切换,因为只有操作系统内核才能直接操作网卡、管理 TCP 连接、收发数据包。
多路:多个客户端连接(连接就是套接字描述符,即socket或者channel)
复用:复用一个或几个线程。
IO多路复用:也就是说一个或者一组线程处理多个TCP连接,使用单进程就能够实现同事处理多个客户端的连接,无需创建或者维护过多的进程/线程
一句话:一个服务端进程可以同时处理多个套接字描述符。实现IO多路复用的模型有3中:可以分select ->poll -> epoll三个阶段来描述。
3、场景体验,说人话引出epoll
模拟一个tcp服务器处理30个客户socket。
假设你是一个监考老师,让30个学生解答一道竞赛考题,然后负责验收学生答卷,你有下面几个选择:
1、轮询:按顺序逐个验收,这中间如有一个学生卡住,全班都耽误,用循环挨个处理socket,根本不具有并发能力。
2、来一个new一个,1对1服务:每个分身线程检查一个学生的答案是否正确。这种类似于为每一个用户创建一个进程或者线程处理连接
3、响应式处理,1对多服务:你站在讲台上等,谁解答完谁拒收。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。这就是IO复用模型,Linux下的select、poll、epoll就是干这个的。
将用户socket对应的文件描述符(FileDescriptor)注册进epoll,然后epoll帮你监听那些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用 非阻塞模式 。这样,整个过程,只有在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor反应模式。
在单个线程 通过记录跟踪每一个Socket的状态来同事管理多个IO流 ,一个服务端进程可以同时处理多个套接字付描述符。目的是尽量多的提高服务器的吞吐能力。
大家都用过nginx,nginx用epoll接受请求,nginx会有很多连接进来,epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。redis类似同理,这就是IO多路复用原理,有请求就相应,没请求不打扰。
4、小总结
只使用一个服务端进程可以同时处理多个套接字描述符连接。
redis-cli -a 密码这个命令让客户端连接服务器了,内部的处理是复杂的。
客户端请求服务端时,实际就是在服务端你的Socket文件中写入客户端对应的文件描述符,如果有多个客户端同时请求服务端,为每次请求分配一个线程类似每次来都New一个就比较耗费服务端资源。因此我们只使用一个线程来监听多个文件描述符,这就是IO多路复用。
采用多路IO复用技术可以让单个线程高效的处理多个连接请求一个服务端进程可以同时处理多个套接字描述符。
5、面试题:redis为什么这么快
IO多路复用+epoll函数使用,才是redis为什么这么快的直接原因,而不仅仅是单线程命令+redis安装在内存中。
signal driven IO 信号驱动IO
asynchronous IO 异步IO
总结 Redis工作线程是单线程的,但是,整个Redis来说是多线程的。
主线程和IO线程是怎么协作完成请求处理的?
IO的读和写本身是堵塞的,比如当socket中有数据时,Redis会通过调用先将数据从内核态空间拷贝到用户态空间,再交给Redis调用,而这个拷贝的过程就是阻塞的,当数据量越大的时候拷贝所需要的时间就越多,而这些操作都是基于单线程完成的。
从Redis6开始,就新增了多线程的功能来提高IO的读写性能,他的主要实现思路是将主线程的IO读写任务拆分给一组独立的线程去执行,这样就可以使用多个socket的读写可以并行化了,采用多路IO复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),将最耗时的Socket的读取、请求及诶、写入单独外包出去,剩下命令执行仍然由主线程串行执行并和内存的数据交互。
这样网络IO操作就变成多线程化了,其他核心部分仍然是线程安全的,是个不错的折中方法。
redis7默认是否开启了多线程? 如果你在实际应用中,发现Redis实例的CPU开销不大但吞吐量却没有提升,可以考虑使用Redis7的多线程机制,加速网络处理,进而提升实例的吞吐量。
Redis将所有数据放在内存中,内存的响应时长大约为100纳秒,对于小数据包,Redis服务器可以处理8W到10W的QPS(“每秒请求数” ),这是Redis处理极限了,对于80%的公司来说,单线程的Redis已经足够使用了。
在Redis6 / 7 之后,多线程机制默认是关闭的,如果需要使用多线程功能,需要在redis.conf中完成两个设置
1 2 3 设置io-thread-do-reads配置项为yes,表示启用多线程。 io-threads 4 设置线程个数,官方的建议是如果为4核的CPU,建议线程数设置为2或3,如果8核就设置6,线程数一定小于机器核数 unlink key / flushall async命令
BigKey 面试题 海量数据里查询某一固定前缀的key
如何生产上限制key * /flushdb / flushall 等危险命令以防止误删误用
memory usage命令用过吗?
bigkey问题,多大算big?如何发现?如何删除?如何处理?
BigKey你做过调用吗?惰性释放lazyfree了解过吗?
Morekey问题,生产上redis数据库有1000W条记录,如何遍历?key *可以吗?
…
MoreKey案例 如何写个小脚本,写500W条key,操作多个key问题。
1、大批量插入redis
Linux Bash下执行,插入100w
通过redis提供的管道 --pipe命令插入100w大批量数据
1 2 3 4 for((i = 1; i <=100*10000 ; i++));do echo "set k$i v&i" >> /tmp/redisTest.txt;done; more /tmp/redisText.txt 那么怎么让这些命令灌进入redis里面 cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a 111111 --pipe
现在测试数据准备好了,然后这时候在生产环境中,还能用keys *吗
1 flushdb这些命令没有offset或者limit这些,所以执行的时候会锁住,导致后续的请求整个链路卡住,十几秒结束后,数据库产生了雪崩效应
如何限制keys * 这些危险命令防止误删误用
通过配置设置禁用这些命令,在redis.conf里面的security里面
1 2 3 4 5 security大概在1042行附近 rename-command keys "" rename-command flushdb "" rename-command flushall "" 然后重启就生效了
不用keys 避免卡顿,那用什么
用 scan命令 类似Limit但是不完全相同
scan命令用于迭代数据库中的键,里面有sscan(set),hscan(hash),zscan(zset)
1 2 3 4 5 6 7 8 9 scan 游标 [匹配元素] [count 个数]默认10个 基于游标的迭代器,每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为scan命令的游标参数,一次来延续之前的迭代过程。 scan返回两个元素数组,第一个元素是游标,第二个元素是匹配的元素的数组 如果返回的游标是0,表示迭代结束。 支持模糊查询,一次返回的数量不可控,只能是大概率符合count参数 scan的顺序不是从第0位一直到末尾的,而是采用了高位进位加法来遍历。这样特殊的方式进行遍历,是考虑到字典的扩容和缩容,避免槽位的遍历重复和遗漏 scan 0 match k* count 15
BigKey案例 多大算BigKey
大的内容不是key本身,而是它对应的value
参考《阿里云开发规范》
string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000
非字符串的bitkey,不要使用del删除,(除了string可以用del删除其他的),使用hscan、sscan、zscan方式渐进式删除,同时要注意防止Bigkey过期时间自动删除问题,比如200w个元素的zset设置1小时过期,然后删除的时候就会造成阻塞
string和二级结构
string是value,最大512MB,但是大于等于10KB的就是bigkey
list、hash、set和zset,个数超过5000就是Bigkey
哪些危害
1、内存不均,集群迁移困难
2、超时删除,大key删除不方便
3、网络流量阻塞
如何产生
1、社交类,王心凌粉丝列表,粉丝逐步递增
2、汇总统计,某个报表,积累多了
如何发现
redis-cli --bigkeys
好处:会给一个总结,给出每种数据结构Top1 bigkey,同时给出每种数据类型的键值个数+平均大小
不足:想查询大于10kb的所有Key,--bigkeys参数就无能为例了,需要用到memory usage来计算每个键值的字节数。
1 2 redis-cli -h 127.0.0.1 -p 6379 -a 密码 -bigkeys 如果加参数 -i 0.1 也就是每隔100条 scan指令就会休眠0.1s,ops(每秒操作数)就不会剧烈抬升,但是扫描的时间会变长。
MEMORY USAGE键
给出一个key和他的值,在RAM内存中所占用的字节数,返回的结果是key的值以及为管理该key分配的内存总字节数。
如何删除
1 2 3 4 5 6 heset customer:001 id 11 cname li4 age 23 score 80 hdel customer:001 score age 这样就先把field一个个删除了 对于string类型 直接del删除 如果过于庞大unlink异步删除 对于hash类型,使用hscan每次获取少量field-value,再使用hdel删除每个field hscan + hdel
1 2 3 4 5 6 7 8 list 使用ltrim渐进式逐步删除,直到全部删除完成 ltrim是对一个列表进行修剪,让列表只保留指定区间的元素,不在指定区间的元素被删除。 举例 rpush list 1 2 3 4 5 ltrim list 0 2 这样List里面就剩下1 2 3
1 2 3 4 5 set 使用sscan每次获取部分元素,再使用srem命令删除每个元素 SSCAN key cursor [MATCH pattern] [COUNT count] sscan set 0 遍历所有元素 srem set a b 删除元素a b
1 2 3 4 5 6 7 zset 使用zscan每次获取部分元素,再使用zremrangebyrank命令删除每个元素 zadd salary 3000 z3 4000 li4 5000 w5 7000 s7 zrange salary 0 -1 withscores zscan salary 0 渐进式遍历 salary 这个有序集合 zremrangebyrank salary 0 1 按排名范围删除元素 —— 删除 salary 中 排名从 0 到 1 的元素(包含两端) zrange salary 0 -1 withscores
BigKey生产调优 redis.conf 配置文件LAZY FREEING相关说明
vim redis.conf
redis删除有两种方法,一个是阻塞的del,一个是非阻塞的unlink,
默认删除对象使用阻塞的方法。如果需要非阻塞型的需要修改一些配置,能提高性能。
1 2 3 lazyfree-lazy-server-del UNLINK key 是 Redis 4.0+ 提供的异步删除命令 replica-lazy-flush 当 Redis 内部自动触发删除操作(非用户直接调用 DEL/UNLINK)时,是否使用惰性删除? lazyfree-lazy-user-del 当从节点(Replica)执行 FLUSHDB 或 FLUSHALL(通常由主节点同步过来的命令)时,是否使用惰性删除?
缓存双写一致性之更新策略讨论
默认MySQL有数据。
上面的业务逻辑用java代码如何写?
面试题 1、你只要用缓存,就可能涉及到redis缓存和数据库双存储双写,如何解决一致性问题?
2、双写一致性,你先动缓存redis还是数据库mysql哪一个?why?
3、延迟双删你做过吗?会有那些问题?
4、有一种情况,微服务查询redis没有,mysql有,为保证数据双写一致性回写redis你需要注意什么?双检加锁 策略你了解过吗?如何尽量避免缓存击穿?
5、redis和mysql双写100%会出现纰漏,做不到强一致性,如何保证 最终一致性?
缓存双写一致性 如果redis中有数据 ,需要和数据库中的值相同
如果redis中无数据, 数据库中的值要是最新值,且准备回写redis
缓存按照操作来分,细分2种:只读缓存(只做查询缓存没有第三步,不回写)。
大部分是 读写缓存 ,那么读写缓存又有两种策略
1、同步直写策略
只要完成了第二步,Mysql尽量的同步写redis缓存,缓存和数据库中的数据一致,要想保证数据一致,就采用同步直写策略。(热点敏感数据,VIP数据)
2、异步缓写策略
业务不用这么急,正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,比如仓库、物流系统,积分变更等。
还有种情况是异常情况出现了,不得不将失败的动作重新修补,有可能需要借助kafka或者RabbitMQ等消息中间件,实现重试重写。
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Service @Slf4j public class UserService { public static final String CACHE_KEY_USER = "user:" ; @Resource private UserMapper userMapper; @Resource private RedisTemplate redisTemplate; public User findUserById (Integer id) { User user = null ; String key = CACHE_KEY_USER + id; user = (User) redisTemplate.opsForValue().get(key); if (user == null ){ user = userMapper.selectByPrimaryKey(id); if (user == null ){ return user; }else { redisTemplate.opsForValue().set(key,user); } } return user; } }
上面的代码中,如果在大厂里面是不行的,如果redis操作和Mysql操作间隔非常小,还没回写就重复查询到Mysql里面导致多次回写。
采用双检加锁策略
解决一堆线程查询到Mysql,一堆线程回写问题。
多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上 使用一个互斥锁来锁住它。 其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。
后面的线程进来发现已经有缓存了,就直接走缓存。
为什么要双检,这类似单例原则。
假设有两个线程 A 和 B 同时进入:
1 2 3 4 5 6 7 线程 A: if (instance == null) → true 线程 B: if (instance == null) → true (A 还没创建完) 线程 A: 获得锁 → 创建 instance 线程 A: 释放锁 线程 B: 获得锁 → 再次创建 instance!❌
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public String get (String key) { String value = redis.get(key); if (value != null ){ return value; }else { synchronized (TestFuture.class){ value = redis.get(key); if (value != null ){ return value; }else { value = dao.get(key); redis.setnx(key,value,time); return value; } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service @Slf4j public class UserService { public static final String CACHE_KEY_USER = "user:" ; @Resource private UserMapper userMapper; @Resource private RedisTemplate redisTemplate; public User findUserById2 (Integer id) { User user = null ; String key = CACHE_KEY_USER + id; user = (User) redisTemplate.opsForValue().get(key); if (user == null ){ synchronized (UserService.class){ user = (User) redisTemplate.opsForValue().get(key); if (user == null ){ user = userMapper.selectByPrimaryKey(id); if (user == null ){ return user; }else { redisTemplate.opsForValue().setIfAbsent(key,user,7L ,TimeUnit.DAYS); } } } } return user; } }
数据库和缓存一致性的几种更新策略 目的:我们要达到最终一致性!
给缓存设置过期时间,定期清理缓存并回写
所有的 写操作以数据库为准 ,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,切记,要以mysql的数据库写入库为准
上述的是一种方案,下面的也是。
一致性模型
特点
适用场景
强一致性
读一定拿到最新写入的值
银行转账、库存扣减
最终一致性
允许短暂不一致(上面的案例),但最终会一致
用户资料、文章阅读量、非核心业务
可以停机的情况
今天晚上更新数据,不用边跑边更新数据。首先挂牌报错,凌晨升级,温馨提示,服务降级。单线程,这样重量级的数据操作最好不要多线程。
不能停机的情况,4种更新策略
第一种:先更新数据库,再更新缓存(❌️)
但是如果停机了,百分之百只有一个线程在更新是可以的
异常问题1
1 先更新mysql的商品库存,当前商品的库存是100,更新为99
2 更新Mysql修改为99成功,然后更新redis
3 此时异常出现,更新redis失败了,这导致mysql数据正确,redis错误
4、这时候访问redis,读到了脏数据
异常问题2
A,B两个线程发起调用
正常逻辑
1、A update mysql 100
2、A update redis 100
3、B update mysql 80
4、B update redis 80
但是在多线程环境下,A、B有先有后并行
这时候
1 -> 3 -> 4 -> 2这样就出错了
最终mysql和redis数据不一致了
第二种:先更新缓存,在更新数据库(❌️)
这个不太推荐,一般把mysql作为底单数据库,保证最后解释。
异常问题2
A,B两个线程发起调用
正常逻辑
1、A update redis 100
2、A update mysql 100
3、B update redis 80
4、B update mysql 80
但是在多线程环境下,A、B有先有后并行
这时候
1 -> 3 -> 4 -> 2这样就出错了
最终mysql和redis数据不一致了。redis 80 mysql 100
第三种:先删除缓存,再更新数据库(❌️)
异常问题:
1、A现成先删除redis,然后更新mysql,此时mysql正在更新中,还没有结束(比如网络延时)
B突然出现要来读取缓存数据。
2、此时redis里面的数据是空的,B线程来读取,先去读取redis里面的数据,这时候已经被A线程删掉了
那么B就会从Mysql里面获得旧值,B线程发现redis里没有(缓存缺失)马上去mysql里面读取, 从数据库里读取来的是旧值
并且B会把获得的旧值写回redis,那么刚被A删除的旧数据又被写回了
3、这时候A更新完mysql,发现redis里面的缓存是脏数据。
解决方案:
采用延时双删策略:
延时双删面试题
这个删除休眠该多久呢?
线程A sleep的时间,需要大于线程B读取数据再写入缓存的时间。
这个时间怎么确定呢?
第一种方法:在业务程序运行的时候,统计下程序读数据和写缓存的操作时间,自行评估自己的项目读数据业务逻辑耗时,以此为基础进行估算,然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加 百毫秒 即可。
这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
第二种方法:新启动一个后台监控程序,比如后面要讲的WatchDog监控程序,会加时
这种同步淘汰策略,吞吐量降低怎么办?
第一次删除是为了防止其他线程读到脏数据
延时双删的缺点既有读取到脏数据又有污染缓存
后续看门狗WatchDog源码分析
第四种:先更新数据库,再删除缓存(⚠️)
异常问题
线程A更新数据库,还没更新完,B缓存立刻命中旧值,A更新缓存数据。
唯一的问题,读到的缓存旧值问题。
微软云是用的是先更新数据库,再删除缓存。阿里巴巴的canel也是类似的思想。
不可能保证强一致性,那如何保证最终一致性?
解决方案:消息中间件。
1、更新数据库数据
2、数据库会将操作信息写入binlog日志当中
3、订阅程序(监控程序)提取出所需要的数据以及key
4、另起一段非业务代码,新开一个线程,异步啥的,获得该binlog信息,跟主业务分离
5、尝试删除缓存操作,发现缓存失败。如果成功就皆大欢喜
6、将这些信息发送到消息队列
7、重新从消息队列中获得该数据,重试操作
1 2 3 4 可以吧要删除的缓存值或者是要更新的数据库值暂存到消息队列中(kafka或者rabbitmq) 当程序没有能够成功的删除缓存值或者更新数据库值的时候,可以从消息队列中重新读取这些值,然后再次进行删除或者更新。 如果能成功的删除或者更新,我们就把这些值从消息队列中去除,以免重复操作,此时我们也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试。 如果重试超过一定次数后还是没有成功,我们就需要向业务层发送报错信息了,通知运维人员
实际场景 流量充值,线下发短信实际充值可能滞后5分钟,可以接受。电商发货,短信下发,但是物流明天见。
小总结 优先使用先更新数据库,再删除缓存方案。
理由如下:
1、先删除缓存值再更新数据库,有可能导致请求因缓存缺失而访问数据库,给数据库带来压力导致打满mysql。
2、如果业务应用中读取数据库和写缓存的时间不好估算,那么延迟双删中的等待时间就不好设置。
如果业务层要求必须读取一致性的数据,那么我们就需要在更新数据库的时候,先向Redis缓存客户端暂停并发读请求,等数据更新完,缓存删除后,再读取数据,从而保证数据一致性。但真实环境上,不推荐,分布式下很难做到实时一致性,一般都是最终一致性。
Redis和MySQL数据双写一致性工程落地案例 复习+面试题 1、首先我们得到的比较好的方案是先更新mysql,再动redis,避免redis业务key突然消失,多线程请求集火打满mysql
2、懂,写操作
先更新数据库,再删除缓存。尝试使用双检加锁机制lock住mysql,只让一个请求线程回写redis,完成数据一致性。
3、面试提问:我想mysql有记录改动了(有增删改写操作),立刻同步反应到redis??如何做?你怎么知道mysql有改动?(Mysql监听)
首先Mysql有改动就有Binlog日志,动过就能知道Mysql有变更。那么现在就是需要能够监听到mysql变动并且能够通知给redis。
那么就用到了阿里巴巴的中间件canal
canal 是什么
阿里巴巴 Mysql binlog增量订阅组件、消费和解析。
能干嘛
数据库镜像
数据库实时备份
索引构建和实施维护(拆分异构索引、倒排索引等)
业务cache刷新
带业务逻辑的增量数据处理
去哪下
Releases · alibaba/canal · GitHub
工作原理,面试回答
传统Mysql主从复制工作原理
MySQL的主从复制将经过如下步骤:
1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,(offset偏移量是否改变)
如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;
canal工作原理
有点类似消息中间件。
mysql-canal-redis双写一致性Coding
先处理Mysql (windows)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1、查看mysql版本 select version(); 5.7.28 2、当前的主机二进制日志 show master status; mysql-bin.000085 13117(偏移量position) 3、show variables like 'log_bin' 默认是off,是否开放日志 4、开启mysql的binlog写入功能 5、打开mysql目录,最好提前备份,my.ini 在[mysqld]下面添加三行 log-bin=mysql-bin #开启 binlog binlog-format=ROW #选择 ROW 模式 server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复 (ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。 STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况; MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;) 6、重启mysql 7、再次查看是否开放日志 8、授权canal连接mysql账号 mysql默认的用户在mysql库的user表里 select * from mysql.user; 默认没有canal账户,此处新建+授权 DROP USER IF EXISTS 'canal'@'%'; CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; FLUSH PRIVILEGES; SELECT * FROM mysql.user;
处理canal服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1、下载 在github上下载canal.deployerxx.tar.gz 2、解压 解压后整体放入/mycananl路径下 在mycanal路径下tar -zxvf canal.deployerxx.tar.gz 3、配置 修改/mycanal/conf/example路径下instance.properties文件 换成自己的mysql主机master的IP地址 canal.instance.master.address=192.168.111.1:3306 换成自己在mysql新建的canal账户 canal.instance.dbUsername=canal canal.instance.dbPassword=canal 4、启动 进入到/mycanal/bin目录 然后./startup.sh 安装好java8 5、查看 判断canal是否启动成功 1、查看server日志 进入/mycanal/logs/canal然后运行cat canal log 2、查看样例example的日志 进入/mycanal/logs/example然后cat example.log
canal客户端(java编写业务程序)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 1、SQL脚本 这里举个样本案例,建个数据表库 CREATE TABLE `t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `userName` varchar(100) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 2、建module canal_demo02 3、改POM 引入canal的jar包 4、写YML 5、主启动 @SpringBootApplication public class CanalDemo02App{ public static void main(String[] args){ //SpringApplication.run(CanalDemo02App.class,args); } } 这个 Maven 模块(module)并不是一个独立运行的服务,而是一个“被其他程序依赖的监听逻辑库”。 这个类被 另一个真正的 Spring Boot 应用 所引用和启动 当前这个 CanalDemo02App 只是一个代码模块(library),不是可执行服务 只是提供 @Component、@Service 等 Bean 可以理解为一个Spring 组件库,里面提供各种Bean工具给其他程序调用 6、业务类
pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.atguigu.canal</groupId > <artifactId > canal_demo02</artifactId > <version > 1.0-SNAPSHOT</version > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.5.14</version > <relativePath /> </parent > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <junit.version > 4.12</junit.version > <log4j.version > 1.2.17</log4j.version > <lombok.version > 1.16.18</lombok.version > <mysql.version > 5.1.47</mysql.version > <druid.version > 1.1.16</druid.version > <mapper.version > 4.1.5</mapper.version > <mybatis.spring.boot.version > 1.3.0</mybatis.spring.boot.version > </properties > <dependencies > <dependency > <groupId > com.alibaba.otter</groupId > <artifactId > canal.client</artifactId > <version > 1.1.0</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-aop</artifactId > </dependency > <dependency > <groupId > org.aspectj</groupId > <artifactId > aspectjweaver</artifactId > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.47</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid-spring-boot-starter</artifactId > <version > 1.1.10</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid</artifactId > <version > ${druid.version}</version > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > ${mybatis.spring.boot.version}</version > </dependency > <dependency > <groupId > cn.hutool</groupId > <artifactId > hutool-all</artifactId > <version > 5.2.3</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > ${junit.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > log4j</groupId > <artifactId > log4j</artifactId > <version > ${log4j.version}</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > <optional > true</optional > </dependency > <dependency > <groupId > javax.persistence</groupId > <artifactId > persistence-api</artifactId > <version > 1.0.2</version > </dependency > <dependency > <groupId > tk.mybatis</groupId > <artifactId > mapper</artifactId > <version > ${mapper.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-autoconfigure</artifactId > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.8.0</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
YML
1 2 3 4 5 6 7 8 9 server.port =5555 spring.datasource.type =com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name =com.mysql.jdbc.Driver spring.datasource.url =jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username =root spring.datasource.password =123456 spring.datasource.druid.test-while-idle =false
业务类
1、RedisUtils 包装jedis的util
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class RedisUtils { public static final String REDIS_IP_ADDR = "192.168.111.185" ; public static final String REDIS_pwd = "111111" ; public static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig=new JedisPoolConfig (); jedisPoolConfig.setMaxTotal(20 ); jedisPoolConfig.setMaxIdle(10 ); jedisPool=new JedisPool (jedisPoolConfig,REDIS_IP_ADDR,6379 ,10000 ,REDIS_pwd); } public static Jedis getJedis () throws Exception { if (null !=jedisPool){ return jedisPool.getResource(); } throw new Exception ("Jedispool is not ok" ); } }
2、RedisCanalClientExample
简略版
1 2 3 4 5 6 7 8 9 10 11 12 public class RedisCanalClientExample { public static final Integer _60SECONDS = 60 ; public static final String REDIS_IP_ADDR = "192.168.111.185" ; private static void redisInsert (List<Column> columns) {} private static void redisDelete (List<Column> columns) {} private static void redisUpdate (List<Column> columns) {} public static void printEntry (List<Entry> entrys) {} public static void main (String[] args) {} }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 public class RedisCanalClientExample { public static final Integer _60SECONDS = 60 ; public static final String REDIS_IP_ADDR = "192.168.111.185" ; private static void redisInsert (List<Column> columns) { JSONObject jsonObject = new JSONObject (); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if (columns.size() > 0 ) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0 ).getValue(),jsonObject.toJSONString()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisDelete (List<Column> columns) { JSONObject jsonObject = new JSONObject (); for (Column column : columns) { jsonObject.put(column.getName(),column.getValue()); } if (columns.size() > 0 ) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0 ).getValue()); }catch (Exception e){ e.printStackTrace(); } } } private static void redisUpdate (List<Column> columns) { JSONObject jsonObject = new JSONObject (); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } if (columns.size() > 0 ) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0 ).getValue(),jsonObject.toJSONString()); System.out.println("---------update after: " +jedis.get(columns.get(0 ).getValue())); }catch (Exception e){ e.printStackTrace(); } } } public static void printEntry (List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue ; } RowChange rowChage = null ; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException ("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s" , entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else { redisUpdate(rowData.getAfterColumnsList()); } } } } public static void main (String[] args) { System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------" ); CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress (REDIS_IP_ADDR, 11111 ), "example" , "" , "" ); int batchSize = 1000 ; int emptyCount = 0 ; System.out.println("---------------------canal init OK,开始监听mysql变化------" ); try { connector.connect(); connector.subscribe("bigdata.t_user" ); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString()); Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0 ) { emptyCount++; try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0 ; printEntry(message.getEntries()); } connector.ack(batchId); } System.out.println("已经监听了" +totalEmptyCount+"秒,无任何消息,请重启重试......" ); } finally { connector.disconnect(); } } }
题外话
如何监控多个表?java程序下配置过滤正则
要么程序指定然后覆盖配置,要么在配置里面写白名单或者黑名单
忘记关闭jedis了?关闭资源代码简写try-with-resources释放资源
java在7后增加了try-with-resources,他是一个生命或多个资源的try语句。一个资源作为一个一个对象,必须在程序结束后关闭。try-with-resources语句确保在语句的最后每个资源都被关闭。 也就是在try括号里面的资源,只要实现了AutoCloseable和Closeable的对象都可以用这个关闭资源。
案例落地实战bitmap/hyperloglog/GEO 面试题 面试题1 1、抖音电商直播,主播介绍的商品有评论,1个商品对应了1系列的评论,排序+展现+取前10条记录
2、用户在手机APP上的签到打卡信息:1天对应1系列用户的签到记录,新浪微博,钉钉打卡签到,来没来如何统计
3、应用网站上的网页访问信息:1个网页对应1系列的访问点击,淘宝网首页,每天有多少人浏览首页
4、你们公司系统上线后,说一下UV(Unique Visitor)—— 独立访客数 、PV(Page View)—— 页面浏览量 、DAU(Daily Active Users)—— 日活跃用户数 分别是多少
面试题2 面试问记录对集合中的数据进行统计
1、在移动应用中,需要统计每天的新增用户数和第二天的留存用户数
2、在电商网站的商品评论中,需要统计评论列表中的最新评论
3、在签到打卡中,需要统计一个月内连续打卡的用户数
4、在网页访问记录中,需要统计独立访客量。
痛点:类似今日头条、抖音这些用户访问级别都是亿级别的,请问如何处理?
需求痛点 亿级数据的收集+清洗+统计+展现
一句话 :存的进+取的快+多维度
真正有价值的是统计
统计的类型有哪些? 亿级系统中常见的四种统计
1、聚合统计
统计多个集合元素的居合结果,就是前面讲过的交差并等集合统计
1 2 3 4 复习命令 A-B 属于A不属于B的 SDIFF key [key ...] A并B 属于A或者B的元素合并后 SUNION key [key ...] A交B 属于A也属于B的共同拥有的 SINTER key [key...] SINTERCARD numkeys key [key ...] [LIMIT limit]
2、排序统计
抖音短视频最新评论留言的场景,请你设计一个展现列表。能够排序+分页的redis数据结构用什么合适?
zset 在面对需要展示最新列表,排行榜等场景时,如果数据更新频繁或者需要分页显示,建议使用Zset
3、二值统计
集合元素的取值就只有0和1两种。
在钉钉上班签到打卡的场景中,我们只用记录有或没。用 bitmap
4、基数统计
统一一个集合中 不重复的元素个数
用hyperloglog
hyperloglog UV Unique Visitor,独立访客,一般理解为客户端IP 需要去重考虑
PV Page View , 页面浏览量,不用去重
DAU Daily Active User 日活跃用户量,登录或者使用了某个产品的用户数(去重复登录的用户)
常用语反应网站,互联网应用或者网络游戏的运营情况
MAU Monthly Active User 月活跃用户量
它只能用于统计巨量数量,不太设计具体的统计对象的内容和精准性
统计单日一个页面的访问量(PV),单次访问就算一次。
统计单日一个页面的用户访问量(UV),即按照用户为维度计算,单个用户一天内多次访问也只算一次。
多个key的合并统计,某个门户网站的所有模块的PV聚合统计就是整个网站的总PV。
1 2 3 4 5 6 7 去重复功能的基数估计法就是Hyperloglog 复习命令 pfadd hll01 2 3 4 5 pfadd hll02 2 4 4 4 6 pfcount hll02 pfmerge distResult hll01 hll02 pfcount distResult
去重复统计你会先想到哪些方式?(演化)
最先想到hashset
1 2 3 4 5 @Test public void test () { List<String> list = Arrays.asList("22.1.01.1" ,"192.168.12.1" ); HashSet<String> sets = new HashSet <>(list); }
然后用bitmap
如果数据量大假设list里面有4个亿,hashset明显会性能下降。
如果数据显较大亿级统计,使用bitmaps同样会有这个问题。但是百万级别是可以的,方法是精确计算,并且百万级别可以应用。
bitmap是通过用位bit数组来表示各元素是否出现,每个元素对应一位,所需的总内存为N个bit。
基数计数则将每一个元素对应到bit数组中的其中一位,比如bit数组010010101(按照从零开始下标,有的就是1、4、6、8)。
新进入的元素只需要将已经有的bit数组和新加入的元素进行按位或计算就行。这个方式能大大减少内存占用且位操作迅速。
But,假设一个样本案例就是一亿个基数位值数据,一个样本就是一亿
如果要统计1亿个数据的基数位值,大约需要内存100000000/8/1024/1024约等于12M,内存减少占用的效果显著。
这样得到统计一个对象样本的基数值需要12M。
如果统计10000个对象样本(1w个亿级),就需要117.1875G将近120G,可见使用bitmaps还是不适用大数据量下(亿级)的基数计数场景,
但是bitmaps方法是精确计算的。
解决办法:概率算法
通过牺牲准确率来换取空间,对于不要求绝对准确率的场景下可以使用,因为概率算法不直接存储数据本身,通过一定的概率统计方法预估基数值,同时保证误差在一定范围内,由于又不储存数据 故此可以大大节约内存。
HyperLogLog就是一种概率算法的实现。
原理说明
首先只是进行不重复的基数统计,不是集合页不保存数据,只记录数量而不是具体内容
有误差。误差仅仅只是0.81%左右
这个误差哪里来的?Redis之父安特雷兹说的
淘宝网站首页亿级UV的Redis统计方案
需求: UV的统计需要去重,平均是1~1.5亿,如果每天存1.5亿的IP,访问者来了后先查是否存在,不存在就加入。
方案讨论:
1、用Mysql ,傻X,不解释
2、用redis的hash结构,
1 2 3 redis——hash = <keyDay,<ip,1>> 按照ipv4的结构来说明,每个ipv4的地址最多是15个字节(ip = "192.168.111.1",最多xxx.xxx.xxx.xxx) 某一天的1.5亿 * 15个字节= 2G,一个月60G,redis死定了。o(╥﹏╥)o
3、hyperloglog
为什么是只需要花费12Kb?
HyperLogLogService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public class HyperLogLogService { @Resource private RedisTemplate redisTemplate; @PostConstruct public void initIP () { new Thread (() -> { String ip = null ; for (int i = 0 ; i < 200 ; i++){ Random random = new Random (); ip = random.nextInt(256 )+"." + random.nextInt(256 )+"." + random.nextInt(256 )+"." + random.nextInt(256 ); Long hll = redisTemplate.opsForHyperLogLog().add("hll" ,ip); log.info("ip={},该IP地址访问首页的次数{}" ,ip,hll); } },"t1" ).start(); } public long uv () { return redisTemplate.opsForHyperLogLog().size("hll" ); } }
HyperLogLogController
1 2 3 4 5 6 7 8 9 10 11 12 @Api(tags = "淘宝亿级UV的Redis统计方案") @RestController @Slf4j public class HyperLogLogController { @Resource HyperLogLogService hyperLogLogService; @RequestMapping("/uv",method = RequestMethod.GET) public long uv () { return hyperLogLogService.uv(); } }
GEO Redis之GEO 面试题
交友软件中 附近的人,外卖软件中附近的美食店,打车软件附近的车辆等等。这种功能如何实现?
会有什么问题?
1、查询性能问题。如果并发高,数据量大这种查询是要搞垮mysql数据库的
2、一般mysql查询的是一个平面矩形访问,而轿车服务是要以我为中心N公里为半径的圆形覆盖。
3、精准度问题,地球不是平面坐标系,而是一个圆球,这种矩形计算在长距离计算时会有很大误差,Mysql不合适。
那么这时候要用经纬度
如何获得某个地址的经纬度
百度地图-坐标拾取器
命令复习
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 GEOADD 添加经纬度坐标 GEO city [经度] [维度] "天安门" [经度] [维度] "故宫" type city 返回是zset zrange city 0 -1 返回是中文乱码需要再启动客户端加上--raw GEOPOS 返回经纬度 GEOPOS city 天安门 故宫 GEOHASH 返回坐标的geohash表示 GEOHASH city 天安门 故宫 GEODIST 两个位置之间距离 GEODIST city 天安门 故宫 [m/km/ft/mi] GEORADIUS 以半径为中心,查找附近的XXX GEORADIUS city [自己的经度] [自己的纬度] 10 km withdist withcoord count 10 withhash desc GEORADIUSBYMEMBER 找出位于制定范围内的元素,中心点是给定的位置元素决定的 GEORADIUS city 天安门 10 km withdist withcoord count 10 withhash
美团地图位置附近的酒店推送 关键点 GEORADIUS
GeoController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Api(tags = "美团地图位置附近的酒店推送GEO") @RestController @Slf4j public class GeoController { @Resource private GeoService geoService; @ApiOperation("添加坐标geoadd") @RequestMapping(value = "/geoadd",method = RequestMethod.GET) public String geoAdd () { return geoService.geoAdd(); } @ApiOperation("获取经纬度坐标geopos") @RequestMapping(value = "/geopos",method = RequestMethod.GET) public Point position (String member) { return geoService.position(member); } @ApiOperation("获取经纬度生成的base32编码值geohash") @RequestMapping(value = "/geohash",method = RequestMethod.GET) public String hash (String member) { return geoService.hash(member); } @ApiOperation("获取两个给定位置之间的距离") @RequestMapping(value = "/geodist",method = RequestMethod.GET) public Distance distance (String member1, String member2) { return geoService.distance(member1,member2); } @ApiOperation("通过经度纬度查找北京王府井附近的") @RequestMapping(value = "/georadius",method = RequestMethod.GET) public GeoResults radiusByxy () { return geoService.radiusByxy(); } @ApiOperation("通过地方查找附近,本例写死天安门作为地址") @RequestMapping(value = "/georadiusByMember",method = RequestMethod.GET) public GeoResults radiusByMember () { return geoService.radiusByMember(); } }
GeoService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Service @Slf4j public class GeoService { public static final String CITY = "city" ; @Autowired private RedisTemplate redisTemplate; public String geoAdd () { Map<String, Point> map= new HashMap <>(); map.put("天安门" ,new Point (116.403963 ,39.915119 )); map.put("故宫" ,new Point (116.403414 ,39.924091 )); map.put("长城" ,new Point (116.024067 ,40.362639 )); redisTemplate.opsForGeo().add(CITY,map); return map.toString(); } public Point position (String member) { List<Point> list= this .redisTemplate.opsForGeo().position(CITY,member); return list.get(0 ); } public String hash (String member) { List<String> list= this .redisTemplate.opsForGeo().hash(CITY,member); return list.get(0 ); } public Distance distance (String member1, String member2) { Distance distance= this .redisTemplate.opsForGeo().distance(CITY,member1,member2, RedisGeoCommands.DistanceUnit.KILOMETERS); return distance; } public GeoResults radiusByxy () { Circle circle = new Circle (116.418017 , 39.914402 , Metrics.KILOMETERS.getMultiplier()); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50 ); GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this .redisTemplate.opsForGeo().radius(CITY,circle, args); return geoResults; } public GeoResults radiusByMember () { String member="天安门" ; RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50 ); Distance distance=new Distance (10 , Metrics.KILOMETERS); GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this .redisTemplate.opsForGeo().radius(CITY,member, distance,args); return geoResults; } }
bitmap 面试题 日活统计。连续签到打卡。最近一周的活跃用户。统计指定用户一年之中的登录天数。某用户按照一年365天,哪几天登陆过?那几天没登录?全年中登录的天数。
是什么 由 0 和 1 状态表现的二进制位的 bit 数组
京东签到领取京豆 签到日历仅展示当月签到数据
签到日历需要展示最近连续签到天数
小厂利用传统的Mysql方式
就是建立一张mysql签到表,里面有用户的uuid和签到次数,签到一次+1即可。
困难和解决思路:签到用户量小可以,但是大体量的话就会很恐怖了。
1 2 3 一条签到记录对应一条记录,会占据越来越大的空间 一个月最多31天,刚好我们的int类型是32位,那这样一个Int类型就可以搞定一个月,32位大于31天,当天来了位是1没来就是0 一条数据直接存储一个月的签到记录,不再是存储一天的签到记录
大厂方法,基于Redis的bitmaps实现签到日历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 命令复习 setbit k1 1 1 setbit k1 7 1 strlen k1 计算字节不满一个算一个 bitcount k1 记录1的个数 hset uid:map 0 uid1 用户表 hset uid:map 1 uid2 然后映射表 setbit 20230201 0 1 登录表 setbit 20230201 1 1 setbit 20230202 0 1 setbit 20230202 1 1 表示20230202和01这两天用户uid1和uid2都登录了 bitop and k3 20230202 02230201 bitcount k3 连续两天登录的人数记录
布隆过滤器BloomFilter 面试题 有50亿个电话号码,现在有10万个电话号码,如何快速准确判断这些电话号码是否已经存在?
判断是否存在,布隆过滤器了解过吗?
安全链接网址,全球数10亿网址判断?
黑名单校验,识别垃圾邮件?
白名单校验,识别出合法用户进行后续处理。
是什么 由一个初值都为 0 的Bit数组和多个哈希函数构成,用来快速判断集合中是否存在某个元素。
设计思想:本质就是判断具体数据是否存在于一个大的集合中
布隆过滤器是一种类似set的数据结构,只是统计结果在巨量数据下有点小瑕疵,不够完美。
它实际上是一个很长的二进制数组 + 一系列随机hash算法映射函数,主要用于判断一个元素是否在集合中。
能干嘛 高效的插入和查询,占用空间少,返回的结果是不确定性的,不够完美。
一个元素判断为存在时,不一定存在(hash冲突导致),但是不存在肯定不存在。
布隆过滤器可以添加元素,但是不能删除元素,容易增加误判率,由于hash冲突可能全删。
布隆过滤器原理 实际上是一个大型位数组和几个不同的hash函数。初始值都是0.
添加key时,使用多个hash函数对key进行hash运算得到一个整数索引值,对位数组长度进行取模运算得到一个位置,每个hash函数都会得到一个不同的位置,将这几个位置都置为 1 就完成了add操作。
查询key时,只要有其中一位是零就表示这个key不存在,但如果都是1,则不一定存在对应的key
1 正是基于布隆过滤器的快速检测特性,我们可以在把数据写入数据库时,使用布隆过滤器做个标记。当缓存缺失后,应用查询数据库时,可以通过查询不同过滤器快速判断数据是否存在,如果不存在,就不用再去数据库中查询了。这样一来,及时发生缓存穿透了,大量请求只会查询Redis和布隆过滤器,而不会挤压到数据库,也就不会影响数据库的正常运行。布隆过滤器可以使用Redis实现,本身就能承担较大的并发访问压力。
使用布隆过滤器的三个步骤
1、初始化bit数组
2、添加占坑位(多个hash然后取模占坑位)
3、判断是否存在
小总结
有可能有,无肯定无。使用时最好不要让实际元素数量远大于初始化数量,一次给够避免扩容。
当实际元素数量超过初始化数量时,应该对布隆过滤器进行重建,重新分配一个size更大的过滤器,再将所有的历史元素批量add进行。
布隆过滤器使用场景 解决缓存穿透的问题,和redis结合bitmap使用
缓存穿透是什么\
一般情况下,先查询缓存redis是否有该条数据,缓存中没有时,再查询数据库。
当数据库也不存在该条数据时,每次查询都要访问数据库,这就是缓存穿透。
缓存透带来的问题是,当有大量请求查询数据库不存在的数据时,就会给数据库带来压力,甚至会拖垮数据库。
可以使用布隆过滤器解决缓存穿透的问题
把已存在数据的key存在布隆过滤器中,相当于redis前面挡着一个布隆过滤器。
当有新的请求时,先到布隆过滤器中查询是否存在:
如果布隆过滤器中不存在该条数据则直接返回;
如果布隆过滤器中已存在,才去查询缓存redis,如果redis里没查询到则再查询Mysql数据库
黑名单校验,识别垃圾邮件
发现存在黑名单中的,就执行特定操作。比如:识别垃圾邮件,只要是邮箱在黑名单中的邮件,就识别为垃圾邮件。
假设黑名单的数量是数以亿计的,存放起来就是非常耗费存储空间的,布隆过滤器则是一个较好的解决方案。把所有黑名单都放在布隆过滤器中,在收到邮件时,判断邮件地址是否在布隆过滤器中即可。
尝试手写布隆过滤器,结合bitmap自研一下体会思想 架构
步骤
实现 redis的setbit / getbit
setBit的构建过程
1 2 3 4 @PostConstruct初始化白名单数据 计算元素的hash值 通过上一步hash值算出对应的二进制数组的坑位 将对应坑位的值的修改为数字1,表示存在
编码
Springboot + redis + mybatis案例基础与一键编码环境整合
1 2 3 MyBatis通用Mapper4 mybatis-generator Mybatis通用Mapper4官网
一键生成 t_customer用户表SQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 CREATE TABLE `t_customer` ( `id` int(20) NOT NULL AUTO_INCREMENT, `cname` varchar(50) NOT NULL, `age` int(10) NOT NULL, `phone` varchar(20) NOT NULL, `sex` tinyint(4) NOT NULL, `birth` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `idx_cname` (`cname`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
建springboot的Module mybatis_generator
改POM 这里复习mybatis使用,后面再结合上redis缓存实战,在后面再新增布隆过滤器
依赖
作用
是否必需?
org.mybatis:mybatis
MyBatis 核心库 (SQL 映射、Session 管理等)
✅ 必需
org.mybatis.spring.boot:mybatis-spring-boot-starter
Spring Boot 对 MyBatis 的自动装配支持
✅ 必需(在 Spring Boot 项目中)
tk.mybatis:mapper
通用 Mapper 插件 (提供 Mapper<T> 接口)
✅ 必需(为了免写 SQL)
org.mybatis.generator:mybatis-generator-core
代码生成器核心 (用于自动生成 Entity/Mapper)
⚠️ 仅生成时需要
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.atguigu.redis7</groupId > <artifactId > mybatis_generator</artifactId > <version > 1.0-SNAPSHOT</version > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.6.10</version > <relativePath /> </parent > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <java.version > 1.8</java.version > <hutool.version > 5.5.8</hutool.version > <druid.version > 1.1.18</druid.version > <mapper.version > 4.1.5</mapper.version > <pagehelper.version > 5.1.4</pagehelper.version > <mysql.version > 5.1.39</mysql.version > <swagger2.version > 2.9.2</swagger2.version > <swagger-ui.version > 2.9.2</swagger-ui.version > <mybatis.spring.version > 2.1.3</mybatis.spring.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.mybatis</groupId > <artifactId > mybatis</artifactId > <version > 3.4.6</version > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > ${mybatis.spring.version}</version > </dependency > <dependency > <groupId > org.mybatis.generator</groupId > <artifactId > mybatis-generator-core</artifactId > <version > 1.4.0</version > <scope > compile</scope > <optional > true</optional > </dependency > <dependency > <groupId > tk.mybatis</groupId > <artifactId > mapper</artifactId > <version > ${mapper.version}</version > </dependency > <dependency > <groupId > javax.persistence</groupId > <artifactId > persistence-api</artifactId > <version > 1.0.2</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > <exclusions > <exclusion > <groupId > org.junit.vintage</groupId > <artifactId > junit-vintage-engine</artifactId > </exclusion > </exclusions > </dependency > </dependencies > <build > <resources > <resource > <directory > ${basedir}/src/main/java</directory > <includes > <include > **/*.xml</include > </includes > </resource > <resource > <directory > ${basedir}/src/main/resources</directory > </resource > </resources > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <configuration > <excludes > <exclude > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </exclude > </excludes > </configuration > </plugin > <plugin > <groupId > org.mybatis.generator</groupId > <artifactId > mybatis-generator-maven-plugin</artifactId > <version > 1.3.6</version > <configuration > <configurationFile > ${basedir}/src/main/resources/generatorConfig.xml</configurationFile > <overwrite > true</overwrite > <verbose > true</verbose > </configuration > <dependencies > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > ${mysql.version}</version > </dependency > <dependency > <groupId > tk.mybatis</groupId > <artifactId > mapper</artifactId > <version > ${mapper.version}</version > </dependency > </dependencies > </plugin > </plugins > </build > </project >
写YML 这里不需要,因为这不需要自己独立运行而是提供给其他Module来生成代码的。
自动化生成需要在resources路径下新建两个文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 config.properties 配置属性 #t_customer表包名 package.name=com.atguigu.redis7 这里写的包名是后面要生成代码的包名 jdbc.driverClass = com.mysql.jdbc.Driver jdbc.url = jdbc:mysql://localhost:3306/bigdata jdbc.user = root jdbc.password =123456 generatorConfig.xml 生成的配置 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE generatorConfiguration PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN" "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd" > <generatorConfiguration > <properties resource ="config.properties" /> <context id ="Mysql" targetRuntime ="MyBatis3Simple" defaultModelType ="flat" > <property name ="beginningDelimiter" value ="`" /> <property name ="endingDelimiter" value ="`" /> <plugin type ="tk.mybatis.mapper.generator.MapperPlugin" > <property name ="mappers" value ="tk.mybatis.mapper.common.Mapper" /> <property name ="caseSensitive" value ="true" /> </plugin > <jdbcConnection driverClass ="${jdbc.driverClass}" connectionURL ="${jdbc.url}" userId ="${jdbc.user}" password ="${jdbc.password}" > </jdbcConnection > <javaModelGenerator targetPackage ="${package.name}.entities" targetProject ="src/main/java" /> <sqlMapGenerator targetPackage ="${package.name}.mapper" targetProject ="src/main/java" /> <javaClientGenerator targetPackage ="${package.name}.mapper" targetProject ="src/main/java" type ="XMLMAPPER" /> <table tableName ="t_customer" domainObjectName ="Customer" > <generatedKey column ="id" sqlStatement ="JDBC" /> </table > </context > </generatorConfiguration >
然后双击插件Mybatis-generator.generate,一键生成entity+mapper接口+xml实现SQL
SpringBoot + Mybatis + Redis缓存实战编码 建Module 改造我们的redis_study工程 POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.atguigu.redis7</groupId > <artifactId > redis7_study</artifactId > <version > 1.0-SNAPSHOT</version > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.6.10</version > <relativePath /> </parent > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <junit.version > 4.12</junit.version > <log4j.version > 1.2.17</log4j.version > <lombok.version > 1.16.18</lombok.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 4.3.1</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.47</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid-spring-boot-starter</artifactId > <version > 1.1.10</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > druid</artifactId > <version > 1.1.16</version > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > 1.3.0</version > </dependency > <dependency > <groupId > cn.hutool</groupId > <artifactId > hutool-all</artifactId > <version > 5.2.3</version > </dependency > <dependency > <groupId > javax.persistence</groupId > <artifactId > persistence-api</artifactId > <version > 1.0.2</version > </dependency > <dependency > <groupId > tk.mybatis</groupId > <artifactId > mapper</artifactId > <version > 4.1.5</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-autoconfigure</artifactId > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > ${junit.version}</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > log4j</groupId > <artifactId > log4j</artifactId > <version > ${log4j.version}</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > <optional > true</optional > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
YML \src\main\resources\目录下新建mapper文件夹并拷贝CustomerMapper.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 server.port =7777 spring.application.name =redis7_study logging.level.root =info logging.level.com.atguigu.redis7 =info logging.pattern.console =%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger- %msg%n logging.file.name =D:/mylogs2023/redis7_study.log logging.pattern.file =%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger- %msg%n spring.swagger2.enabled =true spring.mvc.pathmatch.matching-strategy =ant_path_matcher spring.redis.database =0 spring.redis.host =192.168.111.185 spring.redis.port =6379 spring.redis.password =111111 spring.redis.lettuce.pool.max-active =8 spring.redis.lettuce.pool.max-wait =-1ms spring.redis.lettuce.pool.max-idle =8 spring.redis.lettuce.pool.min-idle =0 spring.datasource.type =com.alibaba.druid.pool.DruidDataSource spring.datasource.driver-class-name =com.mysql.jdbc.Driver spring.datasource.url =jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username =root spring.datasource.password =123456 spring.datasource.druid.test-while-idle =false mybatis.mapper-locations =classpath:mapper/*.xml mybatis.type-aliases-package =com.atguigu.redis7.entities
主启动
1 2 3 4 5 6 7 @SpringBootApplication @MapperScan("com.atguigu.redis7.mapper") public class Redis7Study7777 { public static void main (String[] args) { SpringApplication.run(Redis7Study7777.class,args); } }
业务类
1 2 3 4 5 数据库表t_customer是否ok entity 上一步自动生成的拷贝过来的Customer mapper接口生成了 mapperSQL文件也生成了 sevice文件和controller要自己写
service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Service @Slf4j public class CustomerSerivce { public static final String CACHE_KEY_CUSTOMER = "customer:" ; @Resource private CustomerMapper customerMapper; @Resource private RedisTemplate redisTemplate; public void addCustomer (Customer customer) { int i = customerMapper.insertSelective(customer); if (i > 0 ) { customer=customerMapper.selectByPrimaryKey(customer.getId()); String key=CACHE_KEY_CUSTOMER+customer.getId(); redisTemplate.opsForValue().set(key,customer); } } public Customer findCustomerById (Integer customerId) { Customer customer = null ; String key=CACHE_KEY_CUSTOMER+customerId; customer = (Customer) redisTemplate.opsForValue().get(key); if (customer==null ){ customer=customerMapper.selectByPrimaryKey(customerId); if (customer != null ) { redisTemplate.opsForValue().set(key,customer); } } return customer; } }
controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Api(tags = "客户Customer接口+布隆过滤器讲解") @RestController @Slf4j public class CustomerController { @Resource private CustomerSerivce customerSerivce; @ApiOperation("数据库初始化2条Customer数据") @RequestMapping(value = "/customer/add", method = RequestMethod.POST) public void addCustomer () { for (int i = 0 ; i < 2 ; i++) { Customer customer = new Customer (); customer.setCname("customer" +i); customer.setAge(new Random ().nextInt(30 )+1 ); customer.setPhone("1381111xxxx" ); customer.setSex((byte ) new Random ().nextInt(2 )); customer.setBirth(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant())); customerSerivce.addCustomer(customer); } } @ApiOperation("单个用户查询,按customerid查用户信息") @RequestMapping(value = "/customer/{id}", method = RequestMethod.GET) public Customer findCustomerById (@PathVariable int id) { return customerSerivce.findCustomerById(id); } }
新增布隆过滤器案例
code
BloomFilterInit(白名单) 利用@PostConstruct初始化白名单数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public class BloomFilterInit { @Resource private RedisTemplate redisTemplate; @PostConstruct public void init () { String uid = "customer:12" ; int hashValue = Math.abs(uid.hashCode()); long index = (long ) (hashValue % Math.pow(2 , 32 )); log.info(uid+" 对应------坑位index:{}" ,index); redisTemplate.opsForValue().setBit("whitelistCustomer" ,index,true ); } }
CheckUtils 检查型工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component @Slf4j public class CheckUtils { @Resource private RedisTemplate redisTemplate; public boolean checkWithBloomFilter (String checkItem,String key) { int hashValue = Math.abs(key.hashCode()); long index = (long ) (hashValue % Math.pow(2 , 32 )); boolean existOK = redisTemplate.opsForValue().getBit(checkItem, index); log.info("----->key:" +key+"\t对应坑位index:" +index+"\t是否存在:" +existOK); return existOK; } }
CustomerService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 @Service @Slf4j public class CustomerSerivce { public static final String CACHE_KEY_CUSTOMER = "customer:" ; @Resource private CustomerMapper customerMapper; @Resource private RedisTemplate redisTemplate; @Resource private CheckUtils checkUtils; public void addCustomer (Customer customer) { int i = customerMapper.insertSelective(customer); if (i > 0 ) { customer=customerMapper.selectByPrimaryKey(customer.getId()); String key=CACHE_KEY_CUSTOMER+customer.getId(); redisTemplate.opsForValue().set(key,customer); } } public Customer findCustomerById (Integer customerId) { Customer customer = null ; String key=CACHE_KEY_CUSTOMER+customerId; customer = (Customer) redisTemplate.opsForValue().get(key); if (customer==null ) { customer=customerMapper.selectByPrimaryKey(customerId); if (customer != null ) { redisTemplate.opsForValue().set(key,customer); } } return customer; } @Resource private CheckUtils checkUtils; public Customer findCustomerByIdWithBloomFilter (Integer customerId) { Customer customer = null ; String key = CACHE_KEY_CUSTOMER + customerId; if (!checkUtils.checkWithBloomFilter("whitelistCustomer" ,key)) { log.info("白名单无此顾客信息:{}" ,key); return null ; } customer = (Customer) redisTemplate.opsForValue().get(key); if (customer == null ) { customer = customerMapper.selectByPrimaryKey(customerId); if (customer != null ) { redisTemplate.opsForValue().set(key, customer); } } return customer; } }
CustomerController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Api(tags = "客户Customer接口+布隆过滤器讲解") @RestController @Slf4j public class CustomerController { @Resource private CustomerSerivce customerSerivce; @ApiOperation("数据库初始化2条Customer数据") @RequestMapping(value = "/customer/add", method = RequestMethod.POST) public void addCustomer () { for (int i = 0 ; i < 2 ; i++) { Customer customer = new Customer (); customer.setCname("customer" +i); customer.setAge(new Random ().nextInt(30 )+1 ); customer.setPhone("1381111xxxx" ); customer.setSex((byte ) new Random ().nextInt(2 )); customer.setBirth(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant())); customerSerivce.addCustomer(customer); } } @ApiOperation("单个用户查询,按customerid查用户信息") @RequestMapping(value = "/customer/{id}", method = RequestMethod.GET) public Customer findCustomerById (@PathVariable int id) { return customerSerivce.findCustomerById(id); } @ApiOperation("BloomFilter案例讲解") @RequestMapping(value = "/customerbloomfilter/{id}", method = RequestMethod.GET) public Customer findCustomerByIdWithBloomFilter (@PathVariable int id) throws ExecutionException, InterruptedException { return customerSerivce.findCustomerByIdWithBloomFilter(id); } }
总结 布隆过滤器的优点和缺点
优点是高效的插入和查询,内存占用bit空间少
缺点是不能删除元素,存在误判不能精准过滤,有是可能有,无是肯定无因为hash冲突。
布谷鸟过滤器 为了解决布隆过滤器不能删除元素的问题。
缓存预热+缓存雪崩+缓存击穿+缓存穿透 面试题 缓存预热、雪崩、穿透、击穿分别是什么?
缓存预热怎么做?
如何避免或者减少缓存雪崩?
穿透和击穿有什么区别?
穿透和击穿有什么解决方案?
加入出现了缓存不一致,有哪些修补方案?
缓存预热 什么是缓存预热?
Mysql假如新增100条记录,一般默认以Mysql为准作为底单数据,如何同步给redis(布隆过滤器),这100条合法数据??
为什么需要预热?
Mysql有100条新记录,redis没有。
1、比较懒,什么都不做,只对mysql做了数据新增,利用redis的回写机制,让他逐步实现00条新增记录的同步。最好提前晚上部署发布版本的时候,由自己人提交前做一次,让Redis同步了,不要把问题交给客户。
2、通过中间件MQ异步预热(中大型) 或者程序自行完成。利用@PostConstruct初始化白名单
缓存雪崩 发生 redis主机挂了,Redis全盘崩溃,redis中有大量key同时过期大面积失效。
预防+解决
redis 中 key 设置为永不过期 or 过期时间错开
redis 缓存集群实现高可用
主从 + 哨兵
Redis Cluster
开启Redis持久化机制aof / rdb ,尽快回复缓存集群
多缓存结合预防雪崩
caffeine或者ehcache本地缓存 + redis缓存
服务降级
Hystrix 或者 阿里sentinel限流 & 降级
人民币玩家 阿里云-云数据库Redis版上面全部都提供服务
缓存穿透 请求去查询一条记录,先查redis无,后查Mysql无,都查不到该条记录,但是请求每次都会打到数据库上面去,导致后台数据库压力暴增,redis就成了摆设
解决
空对象缓存、bloomfilter过滤器
方案1:空对象缓存或者缺省值
方法1:回写增强,确定一个空值比如null,第一次来查询redis和Mysql都没有,返回null给调用者,但是增强回写后,第二次查询,这时候redis就有了,返回null,避免大量请求到mysql。
但是这个方法无法防御黑客或者恶意攻击,随机变换不存在的uid来访问依然炸。
方案2:自研布隆过滤器或者Google布隆过滤器Guava解决
让布隆过滤器当做白名单使用
误判问题,但是概率小可以接受,不能从布隆过滤器删除
全部合法的key都需要放入Guava版布隆过滤器+redis里面不然数据就是返回null
实战
建Module 修改redis7_study
改POM
1 2 3 4 5 6 <dependency > <groupId > com.google.guava</groupId > <artifactId > guava</artifactId > <version > 23.0</version > </dependency >
写YML 跟之前一样
主启动 跟之前一样
业务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Test public void testGuavaWithBloomFilter () { BloomFilter<Integer> filter = BloomFilter.create(Funnels.integerFunnel(), 100 ); System.out.println(filter.mightContain(1 )); System.out.println(filter.mightContain(2 )); filter.put(1 ); filter.put(2 ); System.out.println(filter.mightContain(1 )); System.out.println(filter.mightContain(2 )); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Api(tags = "google工具Guava处理布隆过滤器") @RestController @Slf4j public class GuavaBloomFilterController { @Resource private GuavaBloomFilterService guavaBloomFilterService; @ApiOperation("guava布隆过滤器插入100万样本数据并额外10W测试是否存在") @RequestMapping(value = "/guavafilter",method = RequestMethod.GET) public void guavaBloomFilter () { guavaBloomFilterService.guavaBloomFilter(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Service @Slf4j public class GuavaBloomFilterService { public static final int _1W = 10000 ; public static int size = 100 * _1W; public static double fpp = 0.03 ; private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size,fpp); public void guavaBloomFilter () { for (int i = 1 ; i <=size; i++) { bloomFilter.put(i); } List<Integer> list = new ArrayList <>(10 * _1W); for (int i = size+1 ; i <= size + (10 *_1W); i++) { if (bloomFilter.mightContain(i)) { log.info("被误判了:{}" ,i); list.add(i); } } log.info("误判的总数量::{}" ,list.size()); } }
debug源码分析hash函数
这里发现guava会根据误判率开辟bit数组和hash函数个数,如果越小数组会越大,函数越多,必须大于0。
默认0.03误判率。
最终方案
黑名单的使用
缓存击穿 缓存穿透是不存在的key,而缓存击穿是大量的请求同时查询一个key的时候,此时这个key正好失效了,就会导致大量的请求都打到数据库上面去。
突然,出现高频访问的redis热点可以失效。1、自然过期时间的。2、delete老的Key,换上新的key,del动作的时候,失效了。
简单的说就是热点key突然失效了,暴打mysql
解决
失效原因就是两个。
方案一: 差异失效时间,对于访问频繁的热点key,干脆就不设置过期时间
方案二: 互斥更新,采用双检加锁策略(单机版就Jvm锁,分布式就分布式锁)
多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。
案例
天猫聚划算功能实现+防止缓存击穿
分析说明
1 2 3 4 5 6 100%高并发,绝对不可以用Mysql实现 先把mysql里面参加活动的数据抽取进redis,一般采用定时器扫描来决定上线活动还是下线取消 支持分页功能,一页20条记录 那么在redis用什么数据结构呢?用list,一般分页用list,排行榜用zset lpush list 1 2 3 lrange list 0 2
使用springboot+redis实现高并发的聚划算业务
建Module 修改redis7_study
改POM 不用改
写YML 不用改
主启动 无
业务类
entity
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data @AllArgsConstructor @NoArgsConstructor @ApiModel(value = "聚划算活动producet信息") public class Product { private Long id; private String name; private Integer price; private String detail; }
JHSTaskService 采用定时器将参与聚划算活动的特价商品新增进入redis中。
定时任务可以用XXL-JOB或者小公司用Redis分布式锁+@Scheduled 轻量级替代
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Service @Slf4j public class JHSTaskService { public static final String JHS_KEY="jhs" ; public static final String JHS_KEY_A="jhs:a" ; public static final String JHS_KEY_B="jhs:b" ; @Autowired private RedisTemplate redisTemplate; private List<Product> getProductsFromMysql () { List<Product> list=new ArrayList <>(); for (int i = 1 ; i <=20 ; i++) { Random rand = new Random (); int id= rand.nextInt(10000 ); Product obj=new Product ((long ) id,"product" +i,i,"detail" ); list.add(obj); } return list; } public void initJHS () { log.info("启动定时器淘宝聚划算功能模拟.........." + DateUtil.now()); new Thread (() -> { while (true ){ List<Product> list=this .getProductsFromMysql(); this .redisTemplate.delete(JHS_KEY); this .redisTemplate.opsForList().leftPushAll(JHS_KEY,list); try { TimeUnit.MINUTES.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runJhs定时刷新.............." ); } },"t1" ).start(); } @PostConstruct public void initJHSAB () { log.info("启动AB定时器计划任务淘宝聚划算功能模拟.........." +DateUtil.now()); new Thread (() -> { while (true ){ List<Product> list=this .getProductsFromMysql(); this .redisTemplate.delete(JHS_KEY_B); this .redisTemplate.opsForList().leftPushAll(JHS_KEY_B,list); this .redisTemplate.expire(JHS_KEY_B,20L ,TimeUnit.DAYS); this .redisTemplate.delete(JHS_KEY_A); this .redisTemplate.opsForList().leftPushAll(JHS_KEY_A,list); this .redisTemplate.expire(JHS_KEY_A,15L ,TimeUnit.DAYS); try { TimeUnit.MINUTES.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runJhs定时刷新双缓存AB两层.............." ); } },"t1" ).start(); } }
到现在,上述聚划算的功能算是完成,那么在高并发下有什么 经典 生产问题?
就是啊热点key突然失效导致可怕的缓存击穿。delete命令执行的一瞬间有空隙,其他请求线程继续找到Redis为null。打到了Mysql,暴击。
差异失效时间
Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Service @Slf4j public class JHSTaskService { public static final String JHS_KEY="jhs" ; public static final String JHS_KEY_A="jhs:a" ; public static final String JHS_KEY_B="jhs:b" ; @Autowired private RedisTemplate redisTemplate; private List<Product> getProductsFromMysql () { List<Product> list=new ArrayList <>(); for (int i = 1 ; i <=20 ; i++) { Random rand = new Random (); int id= rand.nextInt(10000 ); Product obj=new Product ((long ) id,"product" +i,i,"detail" ); list.add(obj); } return list; } public void initJHS () { log.info("启动定时器淘宝聚划算功能模拟.........." + DateUtil.now()); new Thread (() -> { while (true ){ List<Product> list=this .getProductsFromMysql(); this .redisTemplate.delete(JHS_KEY); this .redisTemplate.opsForList().leftPushAll(JHS_KEY,list); try { TimeUnit.MINUTES.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runJhs定时刷新.............." ); } },"t1" ).start(); } @PostConstruct public void initJHSAB () { log.info("启动AB定时器计划任务淘宝聚划算功能模拟.........." +DateUtil.now()); new Thread (() -> { while (true ){ List<Product> list=this .getProductsFromMysql(); this .redisTemplate.delete(JHS_KEY_B); this .redisTemplate.opsForList().leftPushAll(JHS_KEY_B,list); this .redisTemplate.expire(JHS_KEY_B,20L ,TimeUnit.DAYS); this .redisTemplate.delete(JHS_KEY_A); this .redisTemplate.opsForList().leftPushAll(JHS_KEY_A,list); this .redisTemplate.expire(JHS_KEY_A,15L ,TimeUnit.DAYS); try { TimeUnit.MINUTES.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runJhs定时刷新双缓存AB两层.............." ); } },"t1" ).start(); } }
Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @RestController @Slf4j @Api(tags = "聚划算商品列表接口") public class JHSProductController { public static final String JHS_KEY="jhs" ; public static final String JHS_KEY_A="jhs:a" ; public static final String JHS_KEY_B="jhs:b" ; @Autowired private RedisTemplate redisTemplate; @RequestMapping(value = "/pruduct/find",method = RequestMethod.GET) @ApiOperation("按照分页和每页显示容量,点击查看") public List<Product> find (int page, int size) { List<Product> list=null ; long start = (page - 1 ) * size; long end = start + size - 1 ; try { list = this .redisTemplate.opsForList().range(JHS_KEY, start, end); if (CollectionUtils.isEmpty(list)) { } log.info("查询结果:{}" , list); } catch (Exception ex) { log.error("exception:" , ex); } return list; } @RequestMapping(value = "/pruduct/findab",method = RequestMethod.GET) @ApiOperation("防止热点key突然失效,AB双缓存架构") public List<Product> findAB (int page, int size) { List<Product> list=null ; long start = (page - 1 ) * size; long end = start + size - 1 ; try { list = this .redisTemplate.opsForList().range(JHS_KEY_A, start, end); if (CollectionUtils.isEmpty(list)) { log.info("=========A缓存已经失效了,记得人工修补,B缓存自动延续5天" ); this .redisTemplate.opsForList().range(JHS_KEY_B, start, end); } log.info("查询结果:{}" , list); } catch (Exception ex) { log.error("exception:" , ex); } return list; } }
手写Redis分布式锁 分布式锁一般用Redis实现
面试题 1 2 3 4 5 6 7 8 9 10 11 12 Redis除了拿来做缓存,还有什么用法? 数据共享,分布式Session 分布式锁 全局ID 计数器、点赞 位统计 购物车 轻量级消息队列 list/stream 抽奖 点赞、签到、打卡 差集交集并集,用户关注,可能认识的人,推荐模型 热点新闻,热搜排行榜
Redis做分布式锁的时候有需要注意的问题?
公司自己实现的分布式锁是否用的setnx命令实现?这是最合适的吗?如何考虑分布式锁的可重入问题?
如果Redis是单点部署的,会带来什么问题?那怎么解决单点问题呢?
Redis集群模式下,比如主从模式,CAP方面有什么问题呢?
CAP 是什么?
分布式系统无法同时满足一致性(C)、可用性(A)、分区容错性(P)
必须放弃哪个?
P 不能放弃 (网络分区必然发生),所以实际是 C vs A 的选择
CP 代表集群
ZooKeeper、etcd、HBase、Consul(默认)→ 强一致,宁可不可用
AP 代表集群
Eureka、Cassandra、DynamoDB、Riak,Redis → 高可用,容忍不一致
简单介绍一下Redlock吧?简历上写redisson(工作上使用)。
Redis分布式锁如何续期?看门狗知道吗?
锁的种类 单机版同一个JVM虚拟机内,synchronized或者Lock接口
分布式多个不同JVM虚拟机 单机的线程锁机制不在其作用,资源类在不同的服务器之间共享了。
举个例子解释分布式锁锁在哪里,首先Vue发送请求到nginx,然后分发到订单模块,然后到库存模块扣减库存,然后到mysql
那么分布式锁就锁在库存模块和订单模块之间,但写代码写在库存模块里。
首先A抢到线程然后setnx 也就是不存在就设置建,那么谁第一个先抢到就会键锁成功,然后这时候B来获得锁,发现建锁失败,那么就等待,或者重复执行,那么这时候A执行完扣减库存的代码了,然后把key删除掉
一个靠谱分布式锁需要具备的条件和刚需
独占性 :OnlyOne,任何时刻只能有且仅有一个线程持有
高可用 :如果是redis集群环境下,不能因为某个节点挂了而出现获取锁和释放锁失败的情况。高并发请求下,依旧性能好。
防死锁 :杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案。(设置过期时间类似)
不乱抢 :防止张冠李戴,不能私下unlock别人的锁,只能自己加的锁自己释放。(假设A线程里调用了B线程然后B线程也要拿锁,如果线程B直接删锁,就会把A的锁删除)
重入性 :同一个节点的同一个线程如果获得锁之后它也可以再次获得这个锁。(如果一个线程已经持有了某把锁,它再次请求这把锁时,应该立即成功,而不是被阻塞或失败。 )
分布式锁 1 2 setnx key value set key value [ex seconds] [px milliseconds] [nx|xx]
首先setnx + expire(设置过期时间)不安全,两条命令非原子性的。因为这是两行操作
这两步之间,可能被其他操作打断 ,比如:
程序崩溃
网络中断
进程被 kill
主从切换(在某些部署架构下)
⚠️ 只要第 1 步成功、第 2 步没执行,就会留下一个永不过期的锁 !
方式
是否原子
安全性
推荐
SETNX + EXPIRE
❌ 否
有死锁风险
不要用
SET key val NX EX 30
✅ 是
安全(有兜底)
推荐
Redisson / RedLock
✅ 是
更完善(含重入、续期等)
生产首选
那么解决方式
方式
本质
类型
Redisson 分布式锁
基于 Redis + Lua 脚本封装的成熟、生产级 SDK
开箱即用的“成品”
Lua + Redis 自研锁
手动编写 Lua 脚本 + Redis 命令实现核心逻辑
需要自己造轮子的“半成品”
这里参考JUC中AQS锁的规范落地+可重入锁考虑+Lua脚本+Redis命令一步步实现分布式锁
后面一整个大的迭代案例
分布式锁大案例含Lua Base案例 首先使用场景:多个服务间保证同一时刻同一时间段内同一用户只能有一个请求(防止关键业务出现并发攻击)
1、建立Module
1 2 3 redis_distributed_lock2 redis_distributed_lock3 这里开两台
2、改POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.atguigu.redislock</groupId > <artifactId > redis_distributed_lock2</artifactId > <version > 1.0-SNAPSHOT</version > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.6.12</version > <relativePath /> </parent > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > <lombok.version > 1.16.18</lombok.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > ${lombok.version}</version > <optional > true</optional > </dependency > <dependency > <groupId > cn.hutool</groupId > <artifactId > hutool-all</artifactId > <version > 5.8.8</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
3、写YML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 server.port =7777 spring.application.name =redis_distributed_lock swagger2.enabled =true spring.mvc.pathmatch.matching-strategy =ant_path_matcher spring.redis.database =0 spring.redis.host =192.168.111.185 spring.redis.port =6379 spring.redis.password =111111 spring.redis.lettuce.pool.max-active =8 spring.redis.lettuce.pool.max-wait =-1ms spring.redis.lettuce.pool.max-idle =8 spring.redis.lettuce.pool.min-idle =0
4、主启动
1 2 3 4 5 6 7 8 9 @SpringBootApplication public class RedisDistributedLockApp7777 { public static void main (String[] args) { SpringApplication.run(RedisDistributedLockApp7777.class,args); } }
5、业务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 @Configuration @EnableSwagger2 public class Swagger2Config { @Value("${swagger2.enabled}") private Boolean enabled; @Bean public Docket createRestApi () { return new Docket (DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .enable(enabled) .select() .apis(RequestHandlerSelectors.basePackage("com.atguigu.redislock" )) .paths(PathSelectors.any()) .build(); } private ApiInfo apiInfo () { return new ApiInfoBuilder () .title("springboot利用swagger2构建api接口文档 " +"\t" + DateTimeFormatter.ofPattern("yyyy-MM-dd" ).format(LocalDateTime.now())) .description("springboot+redis整合" ) .version("1.0" ) .termsOfServiceUrl("https://www.baidu.com/" ) .build(); } } @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate (LettuceConnectionFactory lettuceConnectionFactory) { RedisTemplate<String,Object> redisTemplate = new RedisTemplate <>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer ()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer ()); redisTemplate.setHashKeySerializer(new StringRedisSerializer ()); redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer ()); redisTemplate.afterPropertiesSet(); return redisTemplate; } } @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock (); public String sale () { String retMessage = "" ; lock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { lock.unlock(); } return retMessage+"\t" +"服务端口号:" +port; } } @RestController @Api(tags = "redis分布式锁测试") public class InventoryController { @Autowired private InventoryService inventoryService; @ApiOperation("扣减库存,一次卖一个") @GetMapping(value = "/inventory/sale") public String sale () { return inventoryService.sale(); } }
6、丝袜哥测试
1 http://localhost:7777/swagger-ui.html#/
测试成功。但是这是单机版的。
开始手写分布式锁 首先把上面的配置原样拷贝个新的redis_distributed_lock3然后其他都一样,除了端口号变成了8888
然后布置nginx微服务架构
然后配置nginx配置负载均衡
1 2 3 4 5 6 命令地址 /usr/local/nginx/sbin 配置地址 /usr/local/nginx/conf 然后启动 /usr/local/nginx/sbin ./nginx 启动nginx并测试通过,浏览器看到nginx欢迎welcome页面 然后usr/local/nginx/conf目录下修改配置文件 nginx.conf新增反向代理和负载均衡配置
1 2 3 然后关闭nginx /usr/local/nginx/sbin ./nginx -s stop 然后指定配置启动 在/usr/local/nginx/sbin路径下执行命令 ./nginx -c /usr/loca/nginx/conf/nginx.conf
然后修改代码+启动两个微服务
1 2 3 4 7777 InventoryService 8888 InventoryService 通过Nginx访问,你的Linux服务器地址IP,反向代理+负载均衡,也就是说访问的是Linux服务器,然后分配给7服务或者8服务,然后一边一个默认轮询 http://192.168.111.185/inventory/sale 这个是Linux装Nginx的地址
然后上swagger验证都ok。手动模拟可以,下面模拟高并发。
利用 jmeter 这个在后面springcloud会说明
然后压测开始后,发现错误了
然后商品被卖出去2次,出现超卖故障现象。
但是为什么加了锁没控制住呢?
在单机环境下,可以使用synchronized或Lock来实现。
但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),
所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)
不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
解决上redis分布式锁 然后现在需要修改service的代码了,现在的逻辑是去redis获得锁才能进入库存消减逻辑,之前是直接查询库存,还有值就直接减了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock (); public String sale () { String retMessage = "" ; String key = "zzyyRedisLock" ; String uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue); if (!flag){ try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } sale(); }else { try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { stringRedisTemplate.delete(key); } } return retMessage+"\t" +"服务端口号:" +port; } }
然后继续压测。
发现没问题了可以扣减完成,没有出错。
功能是完成了,性能呢?
潜在隐藏的bug, 在高并发下,禁止使用递归重试,因为递归很容易让栈溢出StackOverflowError
进一步完善 多线程判断思想JUC里面说过的虚假唤醒,用while替代if
用自旋替代递归重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock (); public String sale () { String retMessage = "" ; String key = "zzyyRedisLock" ; String uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){ try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } } try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { stringRedisTemplate.delete(key); } return retMessage+"\t" +"服务端口号:" +port; } }
好了那这个还有问题?
正常:假设A先进来,然后也建设锁了,然后删掉,B用完了,删掉。
但是怕A进入到try了,进入finally之前,redis宕机了,但是我们的锁没有设定过期时间,根本没机会删除key,这时候B其他的订单模块进入不了库存了。
宕机过期+防止死锁 那么可以这么改吗?
1 2 3 4 5 6 7 while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)){ try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } } stringRedisTemplate.expire(key,30L ,TimeUnit.SECONDS);
不行,因为设置key+过期时间分开了,在多线程高并发的情况下,A拿到锁,然后还没更新key的时候,key死了,然后B也拿到锁了,然后就扣减两次,那就很危险了。所以必须合并成一行原子性操作。
1 2 3 4 5 6 while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L ,TimeUnit.SECONDS)){ try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } }
加锁和过期时间必须在一行,保证原子性
还有问题吗?
防止误删key问题 A线程执行业务超时间,然后B拿到锁执行业务,然后A线程执行完,把锁删除,这就是误删锁问题。
过期时间到了但是锁失效了!!!!
解决:需要添加判断是否是自己的锁来进行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock (); public String sale () { String retMessage = "" ; String key = "zzyyRedisLock" ; String uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L ,TimeUnit.SECONDS)) { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } } try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber+"\t" +uuidValue; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)){ stringRedisTemplate.delete(key); } } return retMessage+"\t" +"服务端口号:" +port; } }
还有什么问题吗?
最后的判断的时候,卡住了,没有删掉,也就是说在get和del之间,锁可能被别人抢走。
lua保证原子性 lua脚本解决,让上面变成原子性 ,启动Lua脚本编写redis分布式锁判断+删除判断代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 lua脚本命令浅谈 Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return 返回脚本执行后的结果值 什么意思,就是在redis里面操作 set k1 v1 expire k1 45 get k1 这里多了三次交互,然后要求三条命令变成一次交互最后返回get的结果 语法:eval luascript numkeys [key[key ...]] [arg [arg ...]] EVAL就是提示redis调用脚本了,然后脚本里面要用return 返回结果那么就写成这样 EVAL "return 'hello world'" 0 ; 如果要在lua脚本里面调用redis命令那么就用redis.call() EVAL "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1')" 0 这里的key和val都是写死的如果想动态传参呢? 首先mset k1 v1 k2 v2这个命令呢?想把参数动态传参,这里类似数组,并且从1 开始 这里可以解释语法了,EVAL是唤醒 luascript是lua脚本 numkeys是参数个数有几个就写几个key 然后后面就写参数,参数可以比key多 EVAL "return redis.call('mset' , KEYS[1], ARGV[1], KEYS[2],ARGV[2])" 2 k1 k2 v1 v2 然后官网提供了判断的代码 EVAL "if redis.call('get' , KEYS[1]) == ARGV[1] then return redis.call('del' ,KEYS[1]) else return 0 end" 1 RedisLock 11112222 条件判断语法lua if (布尔条件) then 业务代码 elseif (布尔代码) then 业务代码 else 业务代码 end
回到初衷,怎么改写java让判断和删除变成一条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; private Lock lock = new ReentrantLock (); public String sale () { String retMessage = "" ; String key = "zzyyRedisLock" ; String uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L ,TimeUnit.SECONDS)) { try { TimeUnit.MILLISECONDS.sleep(20 ); } catch (InterruptedException e) { e.printStackTrace(); } } try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber+"\t" +uuidValue; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { String luaScript = "if redis.call('get' , KEYS[1]) == ARGV[1] then " + " return redis.call('del' ,KEYS[1]) " + "else return 0 end" ; stringRedisTemplate.execute(new DefaultRedisScript (luaScript,Boolean.class),Arrays.asList(key),uuidValue); } return retMessage+"\t" +"服务端口号:" +port; } }
可重入锁+设计模式 现在目前while判断并且自旋锁重试获取锁+setnx含自然过期时间+lua脚本官网删除锁命令
问题:如何兼顾锁的可重入性?
也就是说如果A拿自己的锁应该是判断成功的。
上面的例子是最简单的例子,假设A创建主订单,然后A业务里面还要创建个子订单或者增加积分,然后在子订单里面也要拿到这个线程的锁,那么这时候就会造成死锁了,所以需要考虑可重入性。
可重入锁 又名递归锁
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。
所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
可以再次进入同步锁,进入同步域(同步代码块/方法或者显示锁定的代码)
一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。自己可以获取自己的内部锁
JUC知识复习,可重入锁出bug会如何影响程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class ReEntryLockDemo { final Object obj = new Object (); public void entry01 () { new Thread (() -> { synchronized (obj){ System.out.println(Thread.currentThread().getName()+"\t" +"外层调用" ); synchronized (obj){ System.out.println(Thread.currentThread().getName()+"\t" +"中层调用" ); synchronized (obj){ System.out.println(Thread.currentThread().getName()+"\t" +"内层调用" ); } } } },t1).start(); } public void entry02 () { m1(); } private synchronized void m1 () { System.out.println(Thread.currentThread().getName()+"\t" +"外层调用" ); m2(); } private synchronized void m2 () { System.out.println(Thread.currentThread().getName()+"\t" +"中层调用" ); m3(); } private synchronized void m3 () { System.out.println(Thread.currentThread().getName()+"\t" +"内层调用" ); } public static void main (String[] args) { ReEntryLockDemo demo = new ReEntryLockDemo (); demo.entry01(); demo.entry02(); } }
上面两个都是成功的。
可重入锁种类: 隐式锁(即synchronized关键字使用的锁)默认是可重入锁
指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的
与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。
这个分为同步块和同步方法两种形式
这里还有第三种,这个是显式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class ReEntryLockDemo { final Object obj = new Object (); Lock lock = new ReentrantLock (); public void entry03 () { new Thread (() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"\t" +"外层调用" ); try { System.out.println(Thread.currentThread().getName()+"\t" +"内层调用" ); }finally { lock.unlock(); } }finally { lock.unlock(); } },t1).start(); try { TimeUnit.MILLISECONDS.sleep(2 );} catch (InterruptedException e) {e.printStackTrace();} new Thread (() -> { try { System.out.println(Thread.currentThread().getName()+"\t" +"外层调用" ); }finally { lock.unlock(); } },"t2" ).start(); } public static void main (String[] args) { ReEntryLockDemo demo = new ReEntryLockDemo (); demo.entry03(); } }
synchronized的重入的视线机理
1 2 3 4 5 6 7 每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。 当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。 在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。 当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。
显式锁也有这种可重入锁
了解完之后开始用lock unlock 配合可重入锁进行AQS源码分析
思考,怎么样用可重入锁计数问题,redis中哪个数据类型可以代替?
KKV也就是Map要用Hset才可以。
Redis案例命令
1 2 3 4 5 6 7 hexists RedisLock 111-222 检查是否为0也就是是否没被人拿 hset RedisLock 111-222 1 hincrby RedisLock 111-222 1 重入一次+1 hincrby RedisLock 111-222 -1 出去一次-1 hget RedisLock 111-222 这时候又是0了 hset redis锁名字 某个请求线程的UUID+ThreadId 加锁次数
小总结
setnx,只能解决有无得问题,够用但是不完美
hset,不但解决有无,还可以解决可重入问题
重新设计重点 初始用setnx,然后现在要满足AQS编写锁的规范接口行为,不得不考虑可重入性,现在改为用上面的设计。结合当前的业务。
目前是有2条分支,目的是保证同一个时候只能有一个线程持有锁进去redis做扣减库存的工作。
2个分支:1 保证加锁/解锁.lock/unlock。2 扣减库存redis命令的原子性。
很明显lock 和 unlock这两个操作需要很多判断if / else 所以这两个操作都需要用Lua脚本来执行。
lua脚本加锁和解锁 开始分解步骤,先判断redis分布式锁这个key是否存在,先用exists key 这个命令,返回零说明不存在,hset新建当前线程属于自己的锁 BY UUID:ThreadID,返回一说明已经有锁,需要进一步判断是不是当前线程自己的(HEXISTS key uuid:ThreadID)返回零说明不是自己的,返回一说明是自己的锁,自增一表示重入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 //加锁的Lua脚本,对标我们的lock方法 //V1版本,先理清楚逻辑 if redis.call('exists' ,'key' ) == 0 then redis.call('hset' ,'key' ,'uuid:Threadid' ,1 ) redis.call('expire' ,'key' ,50 ) return 1 elseif redis.call('hexists' ,'key' ,'uuid:threadid' ) == 1 then redis.call('hincrby' ,'key' ,'uuid:Threadid' ,1 ) redis.call('expire' ,'key' ,50 ) return 1 else return 0 end //这个版本有相同部分是否可以替换处理??hincrby命令可否替代hset命令 //hincryby命令是否可以完成新建和自增的需求?可以✅️ //然后就变成了下面的代码V2版本 if redis.call('exists' ,'key' ) == 0 or redis.call('hexists' ,'key' ,'uuid:threadid' ) == 1 then redis.call('hincrby' ,'key' ,'uuid:Threadid' ,1 ) redis.call('expire' ,'key' ,50 ) return 1 else return 0 end //V3版本 脚本OK,换上参数替代 if redis.call('exists' ,KEYS[1 ]) == 0 or redis.call('hexists' ,KEYS[1 ],ARGV[1 ]) == 1 then redis.call('hincrby' ,KEYS[1 ],ARGV[1 ],1 ) redis.call('expire' ,KEYS[1 ],ARGV[2 ]) return 1 else return 0 end
key
KEYS[1]
RedisLock
value
ARGV[1]
2f586ae740a94736894ab9d51880ed9d:1
过期时间值
ARGV[2]
30 秒
然后放到redis客户端里面进行测试
1 EVAL "脚本" RedisLock 2f586ae740a94736894ab9d51880ed9d:1 50
加锁完成了,然后解锁lua脚本unlock
解锁流程:首先检查有没有锁,返回零,说明根本没有锁,程序块返回nil,不是零,说明有锁并且是自己的锁,直接调用HINCRBY 负一 表示每次减个一,解锁一次,直到变为零表示可以删除该锁Key,del锁key。
全套解锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //V1版本 if redis.call('HEXISTS' ,lock,uuid:threadID) == 0 then return nil elseif redis.call('HINCRBY' ,lock,uuid:threadID,-1 ) == 0 then return redis.call('del' ,lock) else return 0 end //V2换成参数 if redis.call('HEXISTS' ,KEYS[1 ],ARGV[1 ]) == 0 then return nil elseif redis.call('HINCRBY' ,KEYS[1 ],ARGV[1 ],-1 ) == 0 then return redis.call('del' ,KEYS[1 ]) else return 0 end //测试 eval "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end" 1 RedisLock 2 f586ae740a94736894ab9d51880ed9d:1
然后将上述lua脚本整合进入java程序
首先复原程序为初始化的无锁版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; public String sale () { String retMessage = "" ; String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber+"\t" ; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } return retMessage+"\t" +"服务端口号:" +port; } }
现在我们自己写个lock工具类然后里面用lock就好了,现在问题就转换为如何将lock+lua脚本自研版的redis分布式锁搞定。
新建 RedisDistributedLock类并实现JUC里面的Lock接口,这个就是我们自研的Redis分布式锁,实现了Lock接口,也就是说如果需要自研的redis分布式锁,需要满足lock接口中的规范然后重写。
那么我们继承了lock接口,现在需要对lock接口通用方法了解一下。
1 2 3 4 5 6 lock方法 unlock方法 trylock方法 trylock带参数方法 现在需要重写这四个方法太麻烦了。所以我们用lock方法嗲用的实际上是trylock方法,然后trylock方法实际上调用的是trylock带参数方法,现在我们只用重写两个方法了。 所以暴露的真正是trylock和unlock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock (StringRedisTemplate stringRedisTemplate, String lockName) { this .stringRedisTemplate = stringRedisTemplate; this .lockName = lockName; this .uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); this .expireTime = 30L ; } @Override public void lock () { tryLock(); } @Override public boolean tryLock () { try {tryLock(-1L ,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException{ if (time != -1L ){ this .expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end" ; System.out.println("script: " +script); System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript <>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50 ); } return true ; } @Override public void unlock () { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end" ; System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript <>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if (flag == null ) { throw new RuntimeException ("This lock doesn't EXIST" ); } } @Override public void lockInterruptibly () throws InterruptedException { } @Override public Condition newCondition () { return null ; } }
现在我们用上面的自研锁有什问题?
考虑扩展性这方面,如果后面用zookeeper、mysql就不行了,只能用redis
现在引入工厂模式了,参考spring ioc
Java里面有哪些方法获得?
1、缺少什么new什么
2、通用性极差上面那个,然后到多态
3、多态+动态,右边就是提交给spring容器管理+池化技术。
4、设计模式,可以通过工厂设计模式,直接通过传递参数从工厂获得。
现在写一个工厂,提升自研锁的扩展性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 @Component public class DistributedLockFactory { @Autowired private StringRedisTemplate stringRedisTemplate; private String lockName; public Lock getDistributedLock (String lockType) { if (lockType == null ) return null ; if (lockType.equalsIgnoreCase("REDIS" )){ lockName = "zzyyRedisLock" ; return new RedisDistributedLock (stringRedisTemplate,lockName); } else if (lockType.equalsIgnoreCase("ZOOKEEPER" )){ return new ZookeeperDistributedLock (); } else if (lockType.equalsIgnoreCase("MYSQL" )){ return null ; } return null ; } } public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock (StringRedisTemplate stringRedisTemplate, String lockName) { this .stringRedisTemplate = stringRedisTemplate; this .lockName = lockName; this .uuidValue = IdUtil.simpleUUID()+":" +Thread.currentThread().getId(); this .expireTime = 30L ; } @Override public void lock () { tryLock(); } @Override public boolean tryLock () { try {tryLock(-1L ,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException{ if (time != -1L ){ this .expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end" ; System.out.println("script: " +script); System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript <>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50 ); } return true ; } @Override public void unlock () { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end" ; System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript <>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if (flag == null ) { throw new RuntimeException ("This lock doesn't EXIST" ); } } @Override public void lockInterruptibly () throws InterruptedException { } @Override public Condition newCondition () { return null ; } } @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; public String sale () { String retMessage = "" ; Lock redisLock = distributedLockFactory.getDistributedLock("redis" ); redisLock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { inventoryNumber = inventoryNumber - 1 ; stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber+"\t服务端口:" +port; System.out.println(retMessage); return retMessage; } retMessage = "商品卖完了,o(╥﹏╥)o" +"\t服务端口:" +port; }catch (Exception e){ e.printStackTrace(); }finally { redisLock.unlock(); } return retMessage; } }
这里可重入锁测试失败了,发现是ThreadID一致,但是第二次调用的uuid是不一样的,就会报错
问题就是工厂里面每次都new一个,导致uuid不一样,现在修改uuid是工厂里,工厂是唯一的了,那么如果一个线程就只会调用一次工厂,然后也就只会获得一个uuid了。这样所有线程的uuid都是一样的,只是线程Id不一样而已.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 @Component public class DistributedLockFactory { @Autowired private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; public DistributedLockFactory () { this .uuidValue = IdUtil.simpleUUID(); } public Lock getDistributedLock (String lockType) { if (lockType == null ) return null ; if (lockType.equalsIgnoreCase("REDIS" )){ lockName = "zzyyRedisLock" ; return new RedisDistributedLock (stringRedisTemplate,lockName,uuidValue); } else if (lockType.equalsIgnoreCase("ZOOKEEPER" )){ return new ZookeeperDistributedLock (); } else if (lockType.equalsIgnoreCase("MYSQL" )){ return null ; } return null ; } } public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock (StringRedisTemplate stringRedisTemplate, String lockName,String uuidValue) { this .stringRedisTemplate = stringRedisTemplate; this .lockName = lockName; this .uuidValue = uuidValue+":" +Thread.currentThread().getId(); this .expireTime = 30L ; } @Override public void lock () { this .tryLock(); } @Override public boolean tryLock () { try { return this .tryLock(-1L ,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { if (time != -1L ) { expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end" ; System.out.println("lockName: " +lockName+"\t" +"uuidValue: " +uuidValue); while (!stringRedisTemplate.execute(new DefaultRedisScript <>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) { try { TimeUnit.MILLISECONDS.sleep(60 ); } catch (InterruptedException e) { e.printStackTrace(); } } return true ; } @Override public void unlock () { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + "return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + "return redis.call('del',KEYS[1]) " + "else " + "return 0 " + "end" ; System.out.println("lockName: " +lockName+"\t" +"uuidValue: " +uuidValue); Long flag = stringRedisTemplate.execute(new DefaultRedisScript <>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime)); if (flag == null ) { throw new RuntimeException ("没有这个锁,HEXISTS查询无" ); } } @Override public void lockInterruptibly () throws InterruptedException { } @Override public Condition newCondition () { return null ; } } @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; public String sale () { String retMessage = "" ; Lock redisLock = distributedLockFactory.getDistributedLock("redis" ); redisLock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); this .testReEnter(); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }catch (Exception e){ e.printStackTrace(); }finally { redisLock.unlock(); } return retMessage+"\t" +"服务端口号:" +port; } private void testReEnter () { Lock redisLock = distributedLockFactory.getDistributedLock("redis" ); redisLock.lock(); try { System.out.println("################测试可重入锁####################################" ); }finally { redisLock.unlock(); } } }
自动续期 业务上我们还要对锁增加自动续期的功能。
那么现在问题是如何确定续期时间,以及如何续期?
如何确保redisLock过期时间大于业务时间?(最怕干100s都没完成)所以需要后台有个定时任务,定时程序,设定过期时间30s,然后每过20s都扫一遍看看完成没有。
CAP
C - Consistency (一致性)
定义: 在分布式系统中的所有数据备份,在同一时刻是否同样的值。
通俗理解: “强一致性”。当你更新了数据,所有的用户(无论连接到哪个节点)都能立刻 看到最新的数据。如果系统做不到立刻同步,它宁愿报错也不返回旧数据。
A - Availability (可用性)
定义: 负载过大或部分节点故障时,系统仍然能够正常响应用户的读写请求。
通俗理解: “不报错”。无论系统内部发生了什么故障(只要没全挂),用户只要发请求,系统就必须给一个回应(哪怕这个回应的数据可能不是最新的)。
例子: 刷抖音/推特。哪怕某个服务器慢了,也要让你刷出视频来,哪怕这个视频是几分钟前的旧推荐,也比给你看一个“服务器错误”的白屏要好。
P - Partition Tolerance (分区容错性)
定义: 系统在遇到网络分区故障(Network Partition)时,仍然能够保证对外提供满足一致性或可用性的服务。
通俗理解: “网断了也能用”。分布式系统由多台服务器组成,服务器之间的网络随时可能断开(即“分区”)。如果网线被挖断了,A机房连不上B机房,系统还能继续运行吗?
Redis集群是AP redis异步复制造成的锁丢失 比如:主节点没来得及把刚刚set进来这条数据给从节点,master就挂了,从机上位但是从机没有上面的数据,出现数据不一致。
Zookeeper集群是CP zk是CP原理,用的是leader和follower,假设1号机注册给server1,server1同步给server2,server2同步给各个follower,为了保证一致性,只有整个过程都成功了,1号机采收到注册成功。
当leader重启或者网络故障下,整个zk集群会重新选举新老大,选举期间client不可以注册,zk不可用,所以牺牲了可用性A,只有选举出新老大后,系统才恢复注册。但由于在大型分布式系统中故障难以避免,leader出故障可能性很高,所以很多大型系统都不会选择zk的原因。
Eureka集群是AP Redis集群有的问题,Eureka也有
Nacos集群是AP 这个也是默认AP,但是也可以CP,但是CP用zk就好了。
自动续期,Lua脚本
hset zzyyRedisLock 111122223333:11 3
EXPIRE zzyyRedisLock 30
ttl zzyyRedisLock
。。。。。
eval “if redis.call(‘HEXISTS’,KEYS[1],ARGV[1]) == 1 then return redis.call(‘expire’,KEYS[1],ARGV[2]) else return 0 end” 1 zzyyRedisLock 111122223333:11 30
ttl zzyyRedisLock
1 2 3 4 5 6 //==============自动续期 if redis.call('HEXISTS' ,KEYS[1 ],ARGV[1 ]) == 1 then return redis.call('expire' ,KEYS[1 ],ARGV[2 ]) else return 0 end
现在续期的脚本写好了,现在需要定时器的代码了
思路是加锁成功后,后台就新建扫描程序来看看是否到过期时间,自动续期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 public class RedisDistributedLock implements Lock { private StringRedisTemplate stringRedisTemplate; private String lockName; private String uuidValue; private long expireTime; public RedisDistributedLock (StringRedisTemplate stringRedisTemplate,String lockName,String uuidValue) { this .stringRedisTemplate = stringRedisTemplate; this .lockName = lockName; this .uuidValue = uuidValue+":" +Thread.currentThread().getId(); this .expireTime = 30L ; } @Override public void lock () { tryLock(); } @Override public boolean tryLock () { try {tryLock(-1L ,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();} return false ; } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { if (time != -1L ) { this .expireTime = unit.toSeconds(time); } String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " + "redis.call('hincrby',KEYS[1],ARGV[1],1) " + "redis.call('expire',KEYS[1],ARGV[2]) " + "return 1 " + "else " + "return 0 " + "end" ; System.out.println("script: " +script); System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); while (!stringRedisTemplate.execute(new DefaultRedisScript <>(script,Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { TimeUnit.MILLISECONDS.sleep(50 ); } this .renewExpire(); return true ; } @Override public void unlock () { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " + " return nil " + "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end" ; System.out.println("lockName: " +lockName); System.out.println("uuidValue: " +uuidValue); System.out.println("expireTime: " +expireTime); Long flag = stringRedisTemplate.execute(new DefaultRedisScript <>(script, Long.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime)); if (flag == null ) { throw new RuntimeException ("This lock doesn't EXIST" ); } } private void renewExpire () { String script = "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " + "return redis.call('expire',KEYS[1],ARGV[2]) " + "else " + "return 0 " + "end" ; new Timer ().schedule(new TimerTask () { @Override public void run () { if (stringRedisTemplate.execute(new DefaultRedisScript <>(script, Boolean.class), Arrays.asList(lockName),uuidValue,String.valueOf(expireTime))) { renewExpire(); } } },(this .expireTime * 1000 )/3 ); } @Override public void lockInterruptibly () throws InterruptedException { } @Override public Condition newCondition () { return null ; } }
现在锁写好了,然后结合service测试锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; public String sale () { String retMessage = "" ; Lock redisLock = distributedLockFactory.getDistributedLock("redis" ); redisLock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); try { TimeUnit.SECONDS.sleep(120 ); } catch (InterruptedException e) { e.printStackTrace(); } }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }catch (Exception e){ e.printStackTrace(); }finally { redisLock.unlock(); } return retMessage+"\t" +"服务端口号:" +port; } private void testReEnter () { Lock redisLock = distributedLockFactory.getDistributedLock("redis" ); redisLock.lock(); try { System.out.println("################测试可重入锁####################################" ); }finally { redisLock.unlock(); } } }
总结 V1版本 synchronized单机版ok,上分布式死翘翘,上了nginx分布式微服务,单机锁不行。
V2版本 取消单机锁,上redis分布式锁setnx。这个版本处理细节:1、只加了锁,没有释放锁,除了异常可能无法释放锁,必须在代码层面finally释放锁。2、宕机了,部署了微服务代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要有lockKey的过期时间设定。3、为redis的分布式锁key,增加过期时间此外,还必须要setnx+过期时间必须同一行(必须规定只能自己删除自己的锁,不能把别人的锁删了)最终用Lua脚本。
V3版本 为了业务上锁可重入性,hset替代setnx+lock变为lua脚本保证,然后考虑自动续期
V4版本 加锁了之后,立刻创建定时器,固定时间扫描如果锁还存在就续期,没有就返回0
RedLock算法和底层源码分析 这个是官方的轮子续接上面的自研锁。
面试题 如果要自研一把redis分布式锁,有什么考虑?
1、按照JUC里面Lock接口规范编写
2、lock 加锁关键逻辑
加锁实际上就是在redis中,给Key设置一个值,为了避免死锁,给定一个过期时间
自旋等待,用while循环自旋,防止内存栈溢出
续期
3、unlock解锁关键逻辑
将Key键删除,需要防止删除别人的锁,比如客户端1删除客户端2的锁,并且加锁和解锁需要原子性,先判断锁是否是自己的,然后删,这时候需要原子性,所以用Lua脚本删,后期可以继续思考可重入性和自动续期,把setnx换成hset来解决可重入性,用定时器解决续期问题。
RedLock介绍 更规范的算法来使用Redis实现分布式锁,官方提出了一个Redlock的算法,它实现了我们认为比普通单实例方法更安全的DLM(分布式锁管理器),
那么RedLock的落地实现就是Redisson,那么我们的自研锁,有什么缺点?
也就是说从机上位的时候还没复制到主机的键,这时候另一个客户B也可以键锁成功,而且redis无论是单机,主从,哨兵,集群都有这样的风险。
Redlock算法设计理念
redis之父提出Redlock算法解决这个问题。我们有N个Redis主节点,这些节点是完全独立的,所以我们不适用复制。因此我们在不同的计算机或者虚拟机上运行5个Redis Master,以确保他们以独立的方式发生故障。
该方案也是基于(set 加锁、Lua 脚本解锁)进行改良的,所以redis之父antirez 只描述了差异的地方,大致方案如下。
假设我们有N个Redis主节点,例如 N = 5这些节点是完全独立的,我们不使用复制或任何其他隐式协调系统,
为了取到锁客户端执行以下操作:
1
获取当前时间,以毫秒为单位;
2
依次尝试从5个实例,使用相同的 key 和随机值(例如 UUID)获取锁。当向Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。(超时时间 > 锁失效时间)这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态 。如果一个实例不可用,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;
3
客户端通过当前时间减去步骤 1 记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是 3 个节点)的 Redis 节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功;
4
如果取到了锁,其真正有效时间等于初始有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
5
如果由于某些原因未能获得锁(无法在至少 N/2 + 1 个 Redis 实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁 (即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
该方案为了解决数据不一致的问题,直接舍弃了异步复制只使用 master 节点,同时由于舍弃了 slave,为了保证可用性,引入了 N 个节点,官方建议是 5。本次教学演示用3台实例来做说明。
客户端只有在满足下面的这两个条件时,才能认为是加锁成功。
条件1:客户端从超过半数(大于等于N/2+1)的Redis实例上成功获取到了锁;
条件2:客户端获取锁的总耗时没有超过锁的有效时间。
解决方案
容错公式 N = 2X + 1 (N是最终部署机器数,X是容错机器数)
1 先知道什么是容错
失败了多少个机器实例后我还是可以容忍的,所谓的容忍就是数据一致性还是可以Ok的,CP数据一致性还是可以满足
加入在集群环境中,redis失败1台,可接受。2X+1 = 2 * 1+1 =3,部署3台,死了1个剩下2个可以正常工作,那就部署3台。
加入在集群环境中,redis失败2台,可接受。2X+1 = 2 * 2+1 =5,部署5台,死了2个剩下3个可以正常工作,那就部署5台。
2 为什么是奇数?
最少的机器,最多的产出效果
落地实现就是Redisson
Redisson 的分布式锁适用于“最终一致性可接受、高并发优先”的业务(如秒杀、缓存重建),不适用于金融级强一致场景。
里面提供了很多api方便java操作。
使用Redisson
1、改POM
1 2 3 4 5 6 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.4</version > </dependency >
2、改配置写配置类RedisConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate (LettuceConnectionFactory lettuceConnectionFactory) { RedisTemplate<String,Object> redisTemplate = new RedisTemplate <>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer ()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer ()); redisTemplate.setHashKeySerializer(new StringRedisSerializer ()); redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer ()); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public Redisson redisson () { Config config = new Config (); config.useSingleServer().setAddress("redis://192.168.111.175:6379" ).setDatabase(0 ).setPassword("111111" ); return (Redisson) Redisson.create(config); } }
3、然后Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RestController @Api(tags = "redis分布式锁测试") public class InventoryController { @Autowired private InventoryService inventoryService; @ApiOperation("扣减库存,一次卖一个") @GetMapping(value = "/inventory/sale") public String sale () { return inventoryService.sale(); } @ApiOperation("扣减库存saleByRedisson,一次卖一个") @GetMapping(value = "/inventory/saleByRedisson") public String saleByRedisson () { return inventoryService.saleByRedisson(); } }
4、然后Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Service @Slf4j public class InventoryService2 { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; @Autowired private Redisson redisson; public String saleByRedisson () { String retMessage = "" ; String key = "RedisLock" ; RLock redissonLock = redisson.getLock(key); redissonLock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { redissonLock.unlock(); } return retMessage+"\t" +"服务端口号:" +port; } }
现在我们就实现了单机版的一次卖一个了。
需要注意在unlock的时候需要判断是否是自己的锁,然后才能解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service @Slf4j public class InventoryService { @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String port; @Autowired private DistributedLockFactory distributedLockFactory; @Autowired private Redisson redisson; public String saleByRedisson () { String retMessage = "" ; String key = "zzyyRedisLock" ; RLock redissonLock = redisson.getLock(key); redissonLock.lock(); try { String result = stringRedisTemplate.opsForValue().get("inventory001" ); Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result); if (inventoryNumber > 0 ) { stringRedisTemplate.opsForValue().set("inventory001" ,String.valueOf(--inventoryNumber)); retMessage = "成功卖出一个商品,库存剩余: " +inventoryNumber; System.out.println(retMessage); }else { retMessage = "商品卖完了,o(╥﹏╥)o" ; } }finally { if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) { redissonLock.unlock(); } } return retMessage+"\t" +"服务端口号:" +port; } }
这里是原子性的吗?是,首先外部判断只是为了判断如果不是自己的锁就不能解锁了,因为redisson内部也有判断,如果交给redisson判断就会抛出异常了,而且redisson内部是原子性了用lua脚本已经包装好了。
Redisson源码解读 1、加锁
2、可重入
3、续命
4、解锁
分析步骤
1、Redis分布式锁过期了,但是业务逻辑还没有处理完怎么办?
守护线程续命,额外起一个线程,定期检查线程是否还持有锁,如果有则延长过期时间。
Redisson里面就实现了这个方案,使用 看门狗 定期检查(每 1 / 3 的锁时间检查1次),如果线程还有锁,则刷新过期时间。
1 2 通过redisson新建出来的锁key,默认是30秒 加锁,然后先判断是否持有锁,是null也就是没锁,就加锁,如果有锁自动阻塞当前线程,然后进入到队列里面排队等待通知唤醒,然后加锁成功后,刷新时间
1 2 3 通过exists判断,如果锁不存在,则设置值和过期时间,加锁成功 通过Hexists判断,如果锁已经存在,并且锁的是当前线程,则证明是重入锁,加锁成功 如果锁已经存在但不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间(代表了锁key的剩余生存时间),加锁失败。
1 2 watch dog自动延期机制 里面有自动延期机制,也就是里面已经封装了异步定时器,每过locktime / 3,也就是30 / 3 每隔10秒钟续期一次
多机案例 之前已经实现了redisson单机单例,已经大概了解内部的机制和操作方法了,现在使用多机案例。
回顾之前集群的Bug,就是当主机挂了,然后从机没有复制到锁的时候,别的客户也能创建锁,那么这个锁就被重入了,多次扣减库存了就,所以,这里的多机是多master。
小总结
1 2 3 4 5 6 7 8 9 10 这个锁的算法实现了多redis实例的情况,相对于单redis节点来说,优点在于 防止了 单节点故障造成整个服务停止运行的情况且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。 Redisson 分布式锁支持 MultiLock 机制可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。 最低保证分布式锁的有效性及安全性的要求如下: 1.互斥;任何时刻只能有一个client获取锁 2.释放死锁;即使锁定资源的服务崩溃或者分区,仍然能释放锁 3.容错性;只要多数redis节点(一半以上)在使用,client就可以获取和释放锁 网上讲的基于故障转移实现的redis主从无法真正实现Redlock: 因为redis在进行主从复制时是异步完成的,比如在clientA获取锁后,主redis复制数据到从redis过程中崩溃了,导致没有复制到从redis中,然后从redis选举出一个升级为主redis,造成新的主redis没有clientA 设置的锁,这是clientB尝试获取锁,并且能够成功获取锁,导致互斥失效;
代码案例
红锁RedLock
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自不同的Redisson实例。
然后RedissonRedLock里面已经被我们封装好api给我们用了。
基于Redis的分布式MultiLock对象允许将Lock对象分组并将他们作为单个锁处理,每个RLock对象可能属于不同的Redisson实例。
MultiLock对象里面实现了Lock规范以及有看门狗 ,也就是只有 锁的所有者才能解锁他
另外Redisson还通过加锁的方法提供了leaseTime的参数来制定加锁的时间,超过这个时间后锁便自动解开了。这个怎么用呢?
第一种情况也就是这里不写unlock的话,不管业务多久,十秒后自动解锁。不能长期占用
第二种情况就是加锁可以有100秒重试的时间,但是加锁成功后10后锁失效
现在RLock失效了替代的是MultiLock ,就上面的接口失效但是效果一样的。
MultiLock多机案例
后面可以用Docker技术多开几台redis来操作。
1、docker走起3台redis的master机器,本次设置3台master各自独立没有从属关系
1 2 3 docker run -p 6381:6379 --name redis-master-1 -d redis docker run -p 6382:6379 --name redis-master-2 -d redis docker run -p 6383:6379 --name redis-master-3 -d redis
2、进入上一步刚启动的redis容器实例
1 2 3 docker exec -it redis-master-1 /bin/bash 或者 docker exec -it redis-master-1 redis-cli docker exec -it redis-master-2 /bin/bash 或者 docker exec -it redis-master-2 redis-cli docker exec -it redis-master-3 /bin/bash 或者 docker exec -it redis-master-3 redis-cli
3、建Module redis_redlock
4、改POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.3.10.RELEASE</version > <relativePath /> </parent > <groupId > com.atguigu.redis.redlock</groupId > <artifactId > redis_redlock</artifactId > <version > 0.0.1-SNAPSHOT</version > <properties > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.19.1</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.8</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-lang3</artifactId > <version > 3.4</version > <scope > compile</scope > </dependency > <dependency > <groupId > cn.hutool</groupId > <artifactId > hutool-all</artifactId > <version > 5.8.11</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-configuration-processor</artifactId > <optional > true</optional > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <configuration > <excludes > <exclude > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-configuration-processor</artifactId > </exclude > </excludes > </configuration > </plugin > </plugins > </build > </project >
5、写YML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 server.port =9090 spring.application.name =redlock spring.swagger2.enabled =true spring.redis.database =0 spring.redis.password =spring.redis.timeout =3000 spring.redis.mode =single spring.redis.pool.conn-timeout =3000 spring.redis.pool.so-timeout =3000 spring.redis.pool.size =10 spring.redis.single.address1 =192.168.111.185:6381 spring.redis.single.address2 =192.168.111.185:6382 spring.redis.single.address3 =192.168.111.185:6383
6、主启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.atguigu.redis.redlock;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RedisRedlockApplication { public static void main (String[] args) { SpringApplication.run(RedisRedlockApplication.class, args); } }
6、业务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 @Configuration @EnableConfigurationProperties(RedisProperties.class) public class CacheConfiguration { @Autowired RedisProperties redisProperties; @Bean RedissonClient redissonClient1 () { Config config = new Config (); String node = redisProperties.getSingle().getAddress1(); node = node.startsWith("redis://" ) ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } @Bean RedissonClient redissonClient2 () { Config config = new Config (); String node = redisProperties.getSingle().getAddress2(); node = node.startsWith("redis://" ) ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } @Bean RedissonClient redissonClient3 () { Config config = new Config (); String node = redisProperties.getSingle().getAddress3(); node = node.startsWith("redis://" ) ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(node) .setTimeout(redisProperties.getPool().getConnTimeout()) .setConnectionPoolSize(redisProperties.getPool().getSize()) .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } } @Data public class RedisPoolProperties { private int maxIdle; private int minIdle; private int maxActive; private int maxWait; private int connTimeout; private int soTimeout; private int size; } @ConfigurationProperties(prefix = "spring.redis", ignoreUnknownFields = false) @Data public class RedisProperties { private int database; private int timeout; private String password; private String mode; private RedisPoolProperties pool; private RedisSingleProperties single; } @Data public class RedisSingleProperties { private String address1; private String address2; private String address3; } @RestController @Slf4j public class RedLockController { public static final String CACHE_KEY_REDLOCK = "ATGUIGU_REDLOCK" ; @Autowired RedissonClient redissonClient1; @Autowired RedissonClient redissonClient2; @Autowired RedissonClient redissonClient3; boolean isLockBoolean; @GetMapping(value = "/multiLock") public String getMultiLock () throws InterruptedException { String uuid = IdUtil.simpleUUID(); String uuidValue = uuid+":" +Thread.currentThread().getId(); RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK); RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK); RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK); RedissonMultiLock redLock = new RedissonMultiLock (lock1, lock2, lock3); redLock.lock(); try { System.out.println(uuidValue+"\t" +"---come in biz multiLock" ); try { TimeUnit.SECONDS.sleep(30 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(uuidValue+"\t" +"---task is over multiLock" ); } catch (Exception e) { e.printStackTrace(); log.error("multiLock exception " ,e); } finally { redLock.unlock(); log.info("释放分布式锁成功key:{}" , CACHE_KEY_REDLOCK); } return "multiLock task is over " +uuidValue; } }
7、测试
Redis缓存过期淘汰策略 Redis是内存数据库,过期就没了或者手动删了就好了为什么还有个过期淘汰策略?
面试题 1、生产上你们的redis内存设置多少?
2、如何配置、修改redis的内存大小
3、如果内存满了你怎么办
4、redis清理内存的方式?定期删除和惰性删除了解过吗?
5、redis缓存淘汰策略有哪些?分别是什么?你用那个?
6、redis的LRU了解过吗?手写LRU
7、lru和lfu算法的区别是什么?Least Recently Used,最近最少使用,Least Recently Used,最近最少使用
Redis内存满了怎么办? 1、redis默认内存多少?在哪查看?如何设置修改?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 查看redis最大占用内存? 打开redis.conf配置文件,里面有maxmemory参数,maxmemory是bytes字节类型,注意转换。 如果没配?答案是0。 redis默认内存多少可以用? 如果不设置最大内存大小或者设置最大内存大小为0,在64位操作系统下不限制内存大小,在32位操作系统下最多使用3GB内存。 一般生产上如何配置 一般推荐Redis设置内存为最大物理内存的四分之三 如何修改redis内存设置 第一种直接进入redis.conf里面修改。写进配置文件 第二种config set maxmemory 用命令更改 什么命令查看redis内存使用情况 info memory config get maxmemory
2、真要打满了会怎么样?如果Redis内存使用超出了设置的最大值会怎么样?
1 2 会报错,OOM cpmmand not allowed这个报错,说不能超出最大内存 假设所有的key没有加上过期时间就会导致数据写满maxmemory,为了避免类似的情况,引出下一章内存淘汰策略
往redis里写的数据是怎么没了的?他是如何删除的? 1、redis过期键的删除策略
2、三种不同的删除策略
立即删除
1 2 3 4 Redis不可能时时刻刻遍历所有被设置了生存时间的key,来检测数据是否已经到达过期时间,然后对它进行删除。 立即删除能保证内存中数据的最大新鲜度,因为它保证过期键值会在过期后马上被删除,其所占用的内存也会随之释放。但是立即删除对cpu是最不友好的。因为删除操作会占用cpu的时间,如果刚好碰上了cpu很忙的时候,比如正在做交集或排序等计算的时候,就会给cpu造成额外的压力,让CPU心累,时时需要删除,忙死。。。。。。。 这会产生大量的性能消耗,同时也会影响数据的读取操作。
这种额略对CPU不友好,用处理器性能换取存储空间(拿时间换空间)
惰性删除
1 2 3 4 5 6 数据到达过期时间,不做处理。等下次访问该数据时, 如果未过期,返回数据 ; 发现已过期,删除,返回不存在。 惰性删除策略的缺点是,它对内存是最不友好的。 如果一个键已经过期,而这个键又仍然保留在redis中,那么只要这个过期键不被删除,它所占用的内存就不会释放。 在使用惰性删除策略时,如果数据库中有非常多的过期键,而这些过期键又恰好没有被访问到的话,那么它们也许永远也不会被删除(除非用户手动执行FLUSHDB),我们甚至可以将这种情况看作是一种内存泄漏–无用的垃圾数据占用了大量的内存,而服务器却不会自己去释放它们,这对于运行状态非常依赖于内存的Redis服务器来说,肯定不是一个好消息
总结:对内存不友好,用存储空间换取处理器的性能(拿空间换时间)
开启惰性淘汰,lazyfree-lazy-eviction=yes
上面两种方案都走极端,那么就有了第三种定期删除
定期删除是前两种策略的折中
1 2 3 4 5 6 7 8 9 10 11 12 定期删除策略每隔一段时间执行一次删除过期键操作并通过限制删除操作执行时长和频率来减少删除操作对CPU时间的影响。 周期性轮询redis库中的时效性数据,采用随机抽取的策略,利用过期数据占比的方式控制删除频度 特点1:CPU性能占用设置有峰值,检测频度可自定义设置 特点2:内存压力不是很大,长期占用内存的冷数据会被持续清理 总结:周期性抽查存储空间 (随机抽查,重点抽查) 举例: redis默认每隔100ms检查是否有过期的key,有过期key则删除。注意:redis不是每隔100ms将所有的key检查一次而是随机抽取进行检查(如果每隔100ms,全部key进行检查,redis直接进去ICU)。因此,如果只采用定期删除策略,会导致很多key到时间没有删除。 定期删除策略的难点是确定删除操作执行的时长和频率:如果删除操作执行得太频繁或者执行的时间太长,定期删除策略就会退化成立即删除策略,以至于将CPU时间过多地消耗在删除过期键上面。如果删除操作执行得太少,或者执行的时间太短,定期删除策略又会和惰性删除束略一样,出现浪费内存的情况。因此,如果采用定期删除策略的话,服务器必须根据情况,合理地设置删除操作的执行时长和执行频率。
3、上述步骤都太蠢了,都有漏洞
定期删除,从来没有被抽查到
惰性删除,也从来没有被点中使用过
这些步骤会导致大量的key堆积在内存中,导致redis空间内存紧张或者几乎快耗尽,必须有一个更好的兜底方案。
redis缓存淘汰策略 首先在redis配置文件里面[MEMORY MANAGEMENT]下面有8种淘汰策略
首先要先了解LRU和LFU
LRU:最近最少使用页面置换算法,淘汰最长时间未被使用的页面,看页面最后一次被使用到发生调度的时间长短,首先淘汰最长时间未被使用的页面。
LFU:最近最不常用页面置换算法,淘汰一定时期内被访问次数最少的页,看一定时间段内页面被使用的频率,淘汰一定时期内被访问次数最少的页
举个栗子
某次时期Time为10分钟,如果每分钟进行一次调页,主存块为3,若所需页面走向为2 1 2 1 2 3 4
假设到页面4时会发生缺页中断
若按LRU算法,应换页面1(1页面最久未被使用),但按LFU算法应换页面3(十分钟内,页面3只使用了一次)
可见LRU关键是看页面最后一次被使用到发生调度的时间长短,而LFU关键是看一定时间段内页面被使用的频率!
默认的策略是不驱逐。
上面的总结就是2个维度4个方面
2个维度是从过期键中筛选,一个是从所有键中筛选
4个方面是LRU,LFU,random,ttl
然后组合就成了8个选项
一般用哪种?allkeys-lru,用以保存最新数据。
如何配置,一个用config命令或者写进配置文件
首先尽量避免Bigkey,开启惰性淘汰。lazyfree-lazy-evition=yes
Redis经典五大类型源码以及底层实现 底层是C语言编写的。
Redis为什么快?高性能设计之epoll和IO多路复用深度解析 首先多路复用要解决的问题?这个技术是解决什么问题的?
并发多客户端连接,在多路复用之前最简单和典型的方案:同步阻塞网络IO模型
这种模式的特点就是用一个进程来处理一个网络连接(一个用户请求),比如一段典型的示例代码如下。
直接调用 recv 函数从一个 socket 上读取数据。
1 2 3 4 int main(){ ... recv(sock, ...) //从用户角度来看非常简单,一个recv一用,要接收的数据就到我们手里了。 }
我们来总结一下这种方式:
优点就是这种方式非常容易让人理解,写起代码来非常的自然,符合人的直线型思维。
缺点就是性能差,每个用户请求到来都得占用一个进程来处理,来一个请求就要分配一个进程跟进处理,
类似一个学生配一个老师,一位患者配一个医生,可能吗?进程是一个很笨重的东西。一台服务器上创建不了多少个进程。
解决什么问题
进程在 Linux 上是一个开销不小的家伙,先不说创建,光是上下文切换一次就得几个微秒。所以为了高效地对海量用户提供服务,必须要让一个进程能同时处理很多个 tcp 连接才行。现在假设一个进程保持了 10000 条连接,那么如何发现哪条连接上有数据可读了、哪条连接可写了 ?
我们当然可以采用循环遍历的方式来发现 IO 事件,但这种方式太低级了。
我们希望有一种更高效的机制,在很多连接中的某条上有 IO 事件发生的时候直接快速把它找出来 。
其实这个事情 Linux 操作系统已经替我们都做好了 ,它就是我们所熟知的 IO 多路复用机制。
这里的复用指的就是对进程的复用 ,类似于一个老师监考1000名学生,谁先举手,处理谁。
IO指的是网络IO
多路指的是多个客户端连接,连接就是套接字描述符,socket或者channel,指多条TCP连接
复用:用一个进程来处理多条的连接,使用单进程就能够实现同事处理多个客户端的连接
理念有了,落地实现是: 可以分select -> poll ->epoll三个阶段来描述
Redis单线程如何处理那么多并发客户端连接,为什么单线程,为什么快
Redis利用epoll 来实现IO多路复用,将连接信息和事件放到队列中 ,一次放到文件事件分派器 ,事件分派器将时间分发给时间处理器 。
Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 I/O 多路复用就是为了解决这个问题而出现
所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符 )
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器 。它的组成结构为4部分:
多个套接字、
IO多路复用程序、
文件事件分派器、
事件处理器。
因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型。
也就是说redis用多线程处理IO问题和监听派发这些,但是具体执行命令是单线程。
从案例里面看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 上午开会,错过了公司食堂的饭点, 中午就和公司的首席架构师一起去楼下的米线店去吃米线。我们到了一看,果然很多人在排队。 架构师马上发话了:嚯,请求排队啊!你看这位收银点菜的,像不像nginx的反向代理?只收请求,不处理,把请求都发给后厨去处理。 我们交了钱,拿着号离开了点餐收银台,找了个座位坐下等餐。 架构师:你看,这就是异步处理,我们下了单就可以离开等待,米线做好了会通过小喇叭“回调”我们去取餐; 如果同步处理,我们就得在收银台站着等餐,后面的请求无法处理,客户等不及肯定会离开了。 接下里架构师盯着手中的纸质号牌。 架构师:你看,这个纸质号牌在后厨“服务器”那里也有,这不就是表示会话的ID吗? 有了它就可以把大家给区分开,就不会把我的排骨米线送给别人了。过了一会, 排队的人越来越多,已经有人表示不满了,可是收银员已经满头大汗,忙到极致了。 架构师:你看他这个系统缺乏弹性扩容, 现在这么多人,应该增加收银台,可以没有其他收银设备,老板再着急也没用。 老板看到在收银这里帮不了忙,后厨的订单也累积得越来越多, 赶紧跑到后厨亲自去做米线去了。 架构师又发话了:幸亏这个系统的后台有并行处理能力,可以随意地增加资源来处理请求(做米线)。 我说:他就这点儿资源了,除了老板没人再会做米线了。 不知不觉,我们等了20分钟, 但是米线还没上来。 架构师:你看,系统的处理能力达到极限,超时了吧。 这时候收银台前排队的人已经不多了,但是还有很多人在等米线。 老板跑过来让这个打扫卫生的去收银,让收银小妹也到后厨帮忙。打扫卫生的做收银也磕磕绊绊的,没有原来的小妹灵活。 架构师:这就叫服务降级,为了保证米线的服务,把别的服务都给关闭了。 又过了20分钟,后厨的厨师叫道:237号, 您点的排骨米线没有排骨了,能换成番茄的吗? 架构师低声对我说:瞧瞧, 人太多, 系统异常了。然后他站了起来:不行,系统得进行补偿操作:退费。 说完,他拉着我,饿着肚子,头也不回地走了。
同步 调用者一直等待调用结果的通知后才能进行后续的执行,现在就要我可以等,等到结果为止
异步 指被调用方返回应答让调用者先回去,然后再计算调用结果,计算完最终结果后再通知并返回给调用方,异步调用要想获得结果一般通过回调。
同步于异步的理解 同步、异步的讨论对象是被调用者(服务提供者),重点在于获得调用结果的消息通知方式上。
阻塞 调用方一直在等待而且别的事情什么都不做,当前进程或者线程会被挂起,啥都不干
非阻塞 调用再发出去后,调用方先去忙别的事情,不会阻塞当前进程或者线程,而会立即返回
阻塞与非阻塞的理解 阻塞和非阻塞的讨论对象是调用者(服务请求者),重点在于等消息时候的行为,调用者是否能干其他的事情
总结 4种组合方式
同步阻塞,同步非阻塞,异步阻塞,异步非阻塞。
Unix网络编程中的五种IO模型
1、Blockiing IO 阻塞IO
2、NoneBlocking 非阻塞IO
3、IO multiplexing IO多路复用
后面两个不常用先不了解
4、signal driven IO 信号驱动IO
5、asynchronous IO 异步IO
Java验证
这里默认已经懂了JavaSE的Socket编程
1、背景:一个redisServer+2个Client
2、BIO
要想了解BIO如何实现先了解C语言有个recvfrom函数
这个函数用于从已连接的套接口上接受数据,并捕获数据发送源的地址。
作用:接受一个数据并保存源地址
进程阻塞于recvfrom的调用,然后上面应用程序就是用户态,然后系统调用,然后进入到内核态,无数据报准备好,然后等待数据,等待期间,应用程序让出CPU,进入等待队列。然后数据包准备好,开始复制数据报,然后内核唤醒进程读取数据,将数据从内核复制到用户空间,复制完成,返回成功指示。
所以BIO的特点是在IO执行的两个阶段都被block了。
先演示accept。accept监听
code案例
1、RedisServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class RedisServer { public static void main (String[] args) throws IOException { byte [] bytes = new byte [1024 ]; ServerSocket serverSocket = new ServerSocket (6379 ); while (true ) { System.out.println("-----111 等待连接" ); Socket socket = serverSocket.accept(); System.out.println("-----222 成功连接" ); } } }
2、RedisClient01
1 2 3 4 5 6 7 8 9 public class RedisClient01 { public static void main (String[] args) throws IOException { System.out.println("------RedisClient01 start" ); Socket socket = new Socket ("127.0.0.1" , 6379 ); System.out.println("------RedisClient01 connection over" ); } }
3、RedisClient02
1 2 3 4 5 6 7 8 public class RedisClient02 { public static void main (String[] args) throws IOException { System.out.println("------RedisClient02 start" ); Socket socket = new Socket ("127.0.0.1" , 6379 ); } }
再演示read
read读取
先启动BIO再启动client01验证后再启动Client02客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class RedisServerBIO { public static void main (String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket (6379 ); while (true ) { System.out.println("-----111 等待连接" ); Socket socket = serverSocket.accept(); System.out.println("-----222 成功连接" ); InputStream inputStream = socket.getInputStream(); int length = -1 ; byte [] bytes = new byte [1024 ]; System.out.println("-----333 等待读取" ); while ((length = inputStream.read(bytes)) != -1 ) { System.out.println("-----444 成功读取" +new String (bytes,0 ,length)); System.out.println("====================" ); System.out.println(); } inputStream.close(); socket.close(); } } }
然后RedisClient01
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RedisClient01 { public static void main (String[] args) throws IOException { Socket socket = new Socket ("127.0.0.1" ,6379 ); OutputStream outputStream = socket.getOutputStream(); while (true ) { Scanner scanner = new Scanner (System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit" )) { break ; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......" ); } outputStream.close(); socket.close(); } }
然后RedisClient02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RedisClient02 { public static void main (String[] args) throws IOException { Socket socket = new Socket ("127.0.0.1" ,6379 ); OutputStream outputStream = socket.getOutputStream(); while (true ) { Scanner scanner = new Scanner (System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit" )) { break ; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......" ); } outputStream.close(); socket.close(); } }
先启动BIO然后Client01发消息后,BIO能接收到,然后Client02再启动,然后发消息,BIO收不到消息。
因为还和1号绑定着还在1号阻塞。那然后关闭了1号通讯,然后2号挤压的2条消息变成1条消息发送过去了。
如果2号顾客连续挤压然后会导致服务器崩溃的风险,如何解决?
那么我们就利用多线程。
只要连接了一个socket,操作系统就分配一个线程来处理,这样Read方法堵塞在每个具体先呈上而不堵塞主线程 。就能操作多个socket,哪个线程中的socket有数据,就读哪个socket,各取所需。
程序服务端只负责监听是否有客户端连接,使用accept方法阻塞,任何一个线程上的socket有数据发送过来,read()就能立马读到,cpu就能进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class RedisServerBIOMultiThread { public static void main (String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket (6379 ); while (true ) { Socket socket = serverSocket.accept(); new Thread (() -> { try { InputStream inputStream = socket.getInputStream(); int length = -1 ; byte [] bytes = new byte [1024 ]; System.out.println("-----333 等待读取" ); while ((length = inputStream.read(bytes)) != -1 ) { System.out.println("-----444 成功读取" +new String (bytes,0 ,length)); System.out.println("====================" ); System.out.println(); } inputStream.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } },Thread.currentThread().getName()).start(); System.out.println(Thread.currentThread().getName()); } } }
这次利用多线程,然后监听到一个新的请求就新建一个线程处理。
Client01后面Client02一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RedisClient01 { public static void main (String[] args) throws IOException { Socket socket = new Socket ("127.0.0.1" ,6379 ); OutputStream outputStream = socket.getOutputStream(); while (true ) { Scanner scanner = new Scanner (System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit" )) { break ; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......" ); } outputStream.close(); socket.close(); } }
那这种还有什么问题?来一个请求new一个,百级千级别,如果是万级别就很浪费资源了
第一个办法:线程池。
这个在客户端链接少的情况下可以使用,但是在用户量大的情况下,你不知道线程池要多大,太大了内存可能不够,也不可行。
第二办法:NIO 非阻塞式IO
因为read方法堵塞了,所以要开辟多个线程,如果什么方法能使Read方法不堵塞,这样就不用开辟多个线程,这就用到了另一个IO模型,NIO
tomcat7之前就是用BIO多线程来解决多连接。
3、NIO
先看一下图片,交互变多了。当用户进程发出read操作时,如果内核中的数据还没准备好,那么他并不会阻塞用户进程,而是立刻返回一个error(返回一个标识)。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上得到了一个结果。用户进程判断结果是一个error后,他就知道数据还没准备好,于是他可以再次发送read操作。一旦内核中的数据准备好了,并且又再次受到用户进程的system call,那么它马上就将数据拷贝到用户内存,然后返回。 所以,NIO特点是用户进程需要不断的主动询问内核数据准备好了吗?一句话,用轮询替代阻塞
面试回答
在NIO模式中,一切都是非阻塞的:
accept()方法是非阻塞的,如果没有客户端连接,就返回无连接标识
read()方法是非阻塞的,如果read()方法读取不到数据就返回空闲中标识,如果读取到数据时只阻塞read()方法读数据的时间
在NIO模式中,只有一个线程:
当一个客户端与服务端进行连接,这个socket就会加入到一个数组中,隔一段时间遍历一次,
看这个socket的read()方法能否读到数据,这样一个线程就能处理多个客户端的连接和读取了
理论知道了,如何实现?
之前的socket是阻塞的,另外开发一套API。那么就有java.nio了,已经帮我们写好了。以前是ServerSocket,现在是ServerSocketChannel。
1、RedisServerNIO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class RedisServerNIO { static ArrayList<SocketChannel> socketList = new ArrayList <>(); static ByteBuffer byteBuffer = ByteBuffer.allocate(1024 ); public static void main (String[] args) throws IOException { System.out.println("---------RedisServerNIO 启动等待中......" ); ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress ("127.0.0.1" ,6379 )); serverSocket.configureBlocking(false ); while (true ) { for (SocketChannel element : socketList) { int read = element.read(byteBuffer); if (read > 0 ) { System.out.println("-----读取数据: " +read); byteBuffer.flip(); byte [] bytes = new byte [read]; byteBuffer.get(bytes); System.out.println(new String (bytes)); byteBuffer.clear(); } } SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null ) { System.out.println("-----成功连接: " ); socketChannel.configureBlocking(false ); socketList.add(socketChannel); System.out.println("-----socketList size: " +socketList.size()); } } } }
2、客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class RedisClient01 { public static void main (String[] args) throws IOException { System.out.println("------RedisClient01 start" ); Socket socket = new Socket ("127.0.0.1" ,6379 ); OutputStream outputStream = socket.getOutputStream(); while (true ) { Scanner scanner = new Scanner (System.in); String string = scanner.next(); if (string.equalsIgnoreCase("quit" )) { break ; } socket.getOutputStream().write(string.getBytes()); System.out.println("------input quit keyword to finish......" ); } outputStream.close(); socket.close(); } }
用轮询代替阻塞。那这种还有什么问题?
优缺点?NIO成功解决了BIO需要开启多线程的问题,NIO中一个线程能解决多个Socket连接,用轮询替代阻塞。
问题一:客户端少的时候十分好用,但是客户端多怎么办?假设一万个连接,但是2个有数据,也需要遍历一万次,这时候就会很多无用功,每次遇到-1也要返回。
问题二:这个遍历过程是在用户态进行的,用户态判断socket是否有数据还是调用内核态的read方法实现的,这就涉及到用户态和内核态的切换,每遍历一次就要切换一次,开销很大。
优点:不会阻塞在内核的等待数据过程,每次发起的 I/O 请求可以立即返回,不用阻塞等待,实时性较好。
缺点:轮询将会不断地询问内核,这将占用大量的 CPU 时间,系统资源利用率较低,所以一般 Web 服务器不使用这种 I/O 模型。
结论:让Linux内核搞定上述需求,我们将一批文件描述符通过一次系统调用传给内核由内核层去遍历,才能真正解决这个问题。IO多路复用应运而生,也即将上述工作直接放进Linux内核,不再两态转换而是直接从内核获得结果,因为内核是非阻塞的。
应用程序不再自己在用户态循环检查每个 socket 是否可读;
而是一次性把所有关心的 fd(文件描述符)交给内核 ,说:“你帮我盯着这些,哪个 ready 了告诉我”。
所以才有了后面的select poll epoll函数。这三个是升级关系
现在就是把代码放到linux内核里面这样就不用频繁的用户态和内核态切换了。
4、IO多路复用
先了解一下基本概念:首先IO多路复用的模型,就是多个Socket复用一根网线这个功能是在内核+驱动层实现的
大家都用过nginx,nginx使用epoll接收请求,ngnix会有很多链接进来, epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。redis类似同理。
然后还需要了解FileDescriptor
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
简称 fd
现在开始IO多路复用
IO multiplexing就是我们说的select,poll,epoll,有些技术书籍也称这种IO方式为event driven IO事件驱动IO。就是通过一种机制,一个进程可以监视多个描述符(一个老师监考多个考生),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
可以基于一个阻塞对象并同时在多个描述符上等待就绪,而不是使用多个线程(每个文件描述符一个线程,每次new一个线程),这样可以大大节省系统资源。所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符 而这些文件描述符(套接字描述符Socket)其中的任意一个进入读就绪状态,select,poll,epoll等函数就可以返回。
说人话
1 2 3 4 5 6 7 8 9 10 11 模拟一个tcp服务器处理30个客户socket,一个监考老师监考多个学生,谁举手就应答谁。 假设你是一个监考老师,让30个学生解答一道竞赛考题,然后负责验收学生答卷,你有下面几个选择: 第一种选择:按顺序逐个验收,先验收A,然后是B,之后是C、D。。。这中间如果有一个学生卡住,全班都会被耽误,你用循环挨个处理socket,根本不具有并发能力。 第二种选择:你创建30个分身线程,每个分身线程检查一个学生的答案是否正确。 这种类似于为每一个用户创建一个进程或者线程处理连接。 第三种选择,你站在讲台上等,谁解答完谁举手。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。此时E、A又举手,然后去处理E和A。。。这种就是IO复用模型。Linux下的select、poll和epoll就是干这个的。 将用户socket对应的fd注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式。这样,整个过程只在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor反应模式。
在代码中select其实就是把NIO中用户态要遍历的 fd数组(我们的每一个socket连接,安装进ArrayList里面那个)拷贝到了内核态,让内核来遍历,因为用户态判断socket是否有数据还是要调用内核态的,所有拷贝到内核态后,这样遍历判断的时候就不用一直用户态和内核态频繁切换了。
现在再来深刻的理解Redis单线程如何处理那么多并发客户端连接,为什么单线程,为什么快
Redis的IO多路复用
Redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,依次放到事件分派器,事件分派器将事件分发给事件处理器。
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符) 所谓 I/O 多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要 select 、 poll 、 epoll 来配合。多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
所谓 I/O 多路复用机制,就是说通过一种考试监考机制,一个老师可以监视多个考生,一旦某个考生举手想要交卷了,能够通知监考老师进行相应的收卷子或批改检查操作。所以这种机制需要调用班主任(select/poll/epoll)来配合。多个考生被同一个班主任监考,收完一个考试的卷子再处理其它人,无需等待所有考生,谁先举手就先响应谁,当又有考生举手要交卷,监考老师看到后从讲台走到考生位置,开始进行收卷处理。
Reactor设计模式
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术。
Reactor 模式中有 2 个关键组成:
1)Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
2)Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际办理人。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
现在利用linux的内核处理监控,所以每一个网络连接其实都对应的是一个文件描述符
Redis 服务采用 Reactor 的方式来实现文件事件处理器(每一个网络连接其实都对应一个文件描述符)
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。
它的组成结构为4部分:
多个套接字、
IO多路复用程序、
文件事件分派器、
事件处理器。因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型
上面理论理解了,落地的函数下面讲解
select、poll、epoll都是IO多路复用的具体的实现
1、C语言的结构体
1 2 3 4 5 6 7 8 9 10 11 struct Books { char name[50 ]; int book_id; } book; typedef struct Books { char name[50 ]; int book_id; } Books;
2、select方法
Linux官网或者用man命令来查看select方法的介绍
1 2 3 4 5 6 +1是8个人吃饭准备9分饭菜 select 函数监视的文件描述符分3类,分别是readfds、writefds和exceptfds,将用户传入的数组拷贝到内核空间 调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except)或超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。 当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
我们上面用java写的代码思想封装进了linux内核里面用C语言。
优点 :select 其实就是把NIO中用户态要遍历的fd数组(我们的每一个socket链接,安装进ArrayList里面的那个)拷贝到了内核态,让内核态来遍历,因为用户态判断socket是否有数据还是要调用内核态的,所有拷贝到内核态后,这样遍历判断的时候就不用一直用户态和内核态频繁切换了
从代码中可以看出,select系统调用后,返回了一个置位后的&rset,这样用户态只需进行很简单的二进制比较,就能很快知道哪些socket需要read数据,有效提高了效率
缺点 :1、bitmap最大1024位,一个进程最多只能处理1024个客户端
2、&rset不可重用,每次socket有数据就相应的位会被置位
3、文件描述符数组拷贝到了内核态(只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)),仍然有开销。select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
4、select并没有通知用户态哪一个socket有数据,仍然需要O(n)的遍历。select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历 。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)
我们自己模拟写的是,RedisServerNIO.java,只不过将它内核化了。
总结 :select方式,既做到了一个线程处理多个客户端连接(文件描述符),又减少了系统调用的开销(多个文件描述符只有一次 select 的系统调用 + N次就绪状态的文件描述符的 read 系统调用
poll方法
优点:1、poll使用pollfd数组来代替select中的bitmap,数组没有1024的限制,可以一次管理更多的client。它和 select 的主要区别就是,去掉了 select 只能监听 1024 个文件描述符的限制。:
2、当pollfds数组中有事件发生,相应的revents置位为1,遍历的时候又置位回零,实现了pollfd数组的重用
缺点:poll 解决了select缺点中的前两条,其本质原理还是select的方法,还存在select中原来的问题
1、pollfds数组拷贝到了内核态,仍然有开销
2、poll并没有通知用户态哪一个socket有数据,仍然需要O(n)的遍历
epoll方法
int epoll_create(int size)
参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
见上图
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
等待epfd上的io事件,最多返回maxevents个事件。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大。
epoll执行流程:
多路复用快的原因在于,操作系统提供了这样的系统调用,使得原来的 while 循环里多次系统调用,
变成了一次系统调用 + 内核层遍历这些文件描述符。
epoll是现在最先进的IO多路复用器,Redis、Nginx,linux中的Java NIO都使用的是epoll。
这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。
1、一个socket的生命周期中只有一次从用户态拷贝到内核态的过程,开销小
2、使用event事件通知机制,每次socket中有数据会主动通知内核,并加入到就绪链表中,不需要遍历所有的socket
在多路复用IO模型中,会有一个内核线程不断地去轮询多个 socket 的状态,只有当真正读写事件发送时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有真正有读写事件进行时,才会使用IO资源,所以它大大减少来资源占用
大总结 :select是内核里面全量遍历fd在bit1024长度数组里面,谁有就返回数量然后返回bit数组回去,然后用户态再遍历数组。然后到了poll,poll用pollfd数组代替Bit,大小限制解除,但还是每次都需要循环遍历数组,然后数组有时间发生就置为1没有就置为0,,但还是只能返回事件个数然后返回整个数组,然后用户态遍历整个数组。最后epoll是内核态收到socket连接的时候就把文件描述符放到队首然后会用的时候只用返回队首的前几个就好了,这样后续就不用再次遍历数组而是直接在用户态读取处理即可。
那为什么三个都保有?
Redis对IO多路复用函数的选择,因为如果Redis安装在Linux用epoll,如果是其他的比如Windows就是select了。
案例 如何做个迷你版的微信抢红包。
1、需求分析:各种节假日,高并发需求,一个总的大红包会拆成多个小红包,每个人只能抢一次,需要有记录,需要计时,从完整抢完到发出,红包过期,退回,只能用redis。
2、架构设计
1 2 3 4 5 6 7 8 9 10 难点: 1 拆分算法如何 红包其实就是金额,拆分算法如何 ?给你100块,分成10个小红包(金额有可能小概率相同,有2个红包都是2.58), 如何拆分随机金额设定每个红包里面安装多少钱? 2 次数限制 每个人只能抢一次,次数限制 3 原子性 每抢走一个红包就减少一个(类似减库存),那这个就需要保证库存的-----------------------原子性,不加锁实现 你认为存在redis什么数据类型里面?set ?hash? list?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 关键点: 发红包 抢红包: 抢,不加锁并且原子性,还能支持高并发。 每个人一次且有抢红包记录 记红包:记录每个人抢了多少 拆红包: 拆红包算法: 1、所有人抢到金额之和等于红包金额,不能超过,也不能少于 2、每个人至少抢到1分钱 3、要保证所有人抢到金额的几率相等 结论: 抢红包业务通用算法: 二倍均值法 剩余红包金额为M,剩余人数为N,那么有如下公式: 每次抢到的金额 = 随机区间 (0, (剩余红包金额M ÷ 剩余人数N ) X 2) 这个公式,保证了每次随机金额的平均值是相等的,不会因为抢红包的先后顺序而造成不公平。 举个栗子: 假设有10个人,红包总额100元。 第1次: 100÷10 X2 = 20, 所以第一个人的随机范围是(0,20 ),平均可以抢到10元。假设第一个人随机到10元,那么剩余金额是100-10 = 90 元。 第2次: 90÷9 X2 = 20, 所以第二个人的随机范围同样是(0,20 ),平均可以抢到10元。假设第二个人随机到10元,那么剩余金额是90-10 = 80 元。 第3次: 80÷8 X2 = 20, 所以第三个人的随机范围同样是(0,20 ),平均可以抢到10元。 以此类推,每一次随机范围的均值是相等的。
发红包和抢红包思路就是用list把分好的放入list,抢的时候就lpop出红包 。redis原来就保证原子性和高并发
记录红包: 盘点+汇总,防止作弊,同一个用户不能抢两次,思路就是hset记录红包流水号被谁抢了多少,这样一个hash代表一个抢红包,然后里面记录谁抢了多少的记录以及还剩多少红包
3、编码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 @RestController public class RedPackageController { public static final String RED_PACKAGE_KEY = "redpackage:" ; public static final String RED_PACKAGE_CONSUME_KEY = "redpackage:consume:" ; @Resource private RedisTemplate redisTemplate; @RequestMapping("/send") public String sendRedPackage (int totalMoney,int redPackageNumber) { Integer[] splitRedPackages = splitRedPackage(totalMoney, redPackageNumber); String key = RED_PACKAGE_KEY+IdUtil.simpleUUID(); redisTemplate.opsForList().leftPushAll(key,splitRedPackages); redisTemplate.expire(key,1 ,TimeUnit.DAYS); return key+"\t" +"\t" + Ints.asList(Arrays.stream(splitRedPackages).mapToInt(Integer::valueOf).toArray()); } @RequestMapping("/rob") public String rodRedPackage (String redPackageKey,String userId) { Object redPackage = redisTemplate.opsForHash().get(RED_PACKAGE_CONSUME_KEY + redPackageKey, userId); if (redPackage == null ) { Object partRedPackage = redisTemplate.opsForList().leftPop(RED_PACKAGE_KEY + redPackageKey); if (partRedPackage != null ) { redisTemplate.opsForHash().put(RED_PACKAGE_CONSUME_KEY + redPackageKey,userId,partRedPackage); System.out.println("用户: " +userId+"\t 抢到多少钱红包: " +partRedPackage); return String.valueOf(partRedPackage); } return "errorCode:-1,红包抢完了" ; } return "errorCode:-2, message: " +"\t" +userId+" 用户你已经抢过红包了" ; } private Integer[] splitRedPackage(int totalMoney, int redPackageNumber) { int useMoney = 0 ; Integer[] redPackageNumbers = new Integer [redPackageNumber]; Random random = new Random (); for (int i = 0 ; i < redPackageNumber; i++) { if (i == redPackageNumber - 1 ) { redPackageNumbers[i] = totalMoney - useMoney; }else { int avgMoney = (totalMoney - useMoney) * 2 / (redPackageNumber - i); redPackageNumbers[i] = 1 + random.nextInt(avgMoney - 1 ); } useMoney = useMoney + redPackageNumbers[i]; } return redPackageNumbers; } }
但是一个用户如果高并发的情况下也可能多抢,那么这时候可以用Lua脚本。
多学一手 如何批量删除红包
1 redis-cli -a 密码 keys "red*" | xargs redis-cli del