Redis高级篇

Redis单线程 VS 多线程 入门

面试题

1、redis到底是单线程还是多线程

2、IO多路复用

3、redis为什么快?

Redis为什么选择单线程?

redis3以及之前是单线程的。

redis4之后才慢慢支持多线程 支持异步删除,部分多线程,直到redis6/7之后才稳定,完全支持多线程。

那以前我们说redis是单线程是什么意义

主要是指Redis的网络IO和键值对读写是由一个线程来完成的,Redis在处理客户端的请求时包括获取(socket读),解析,执行,内容返回(socket写)等都由一个顺序串行的主线程处理,这就是所谓的单线程,这也是redis对外提供剪枝存储服务的主要流程。

image-20251203164325376

但是Redis的其他功能,比如持久化RDB、AOF、异步删除、集群数据同步等等,其实是由额外的线程执行的。Redis命令工作线程是单线程的,但是,整个Redis来说,是多线程的。

也就是说一个命令set但是这时候需要RDB,AOF,这时候就会新开一个线程,处理这些。

但是操作十大类型的原子性操作都是单线程的。

Redis3单线程时代但性能依旧很快的原因

从硬件的角度,Redis是基于内存的操作,所有数据都在内存中,因此所有的运算都是内存级别的,所以性能高。

数据结构简单,Redis的数据结构是专门设计的,这些简单的数据结构的查找和操作大部分是O(1)

多路复用和非阻塞 I/O,Redis使用 I/O多路复用功能来箭筒多个socket连接客户端,这样就可以使用一个线程连接来处理多个请求,减少线程切换带来

IO 多路复用 = 用一个线程,通过操作系统提供的机制(如 epoll),同时监听多个网络连接的 I/O 事件,并在事件发生时及时处理,从而实现高并发、低资源消耗的网络服务。

避免上下文竞争,因为是单线程所以不会有锁竞争等。

Redis是基于内存操作的, 因此他的瓶颈可能是机器的内存或者网络带宽而非CPU,所以后面会支持多线程。

在Redis4之前,一直使用单线程的主要原因

1、开发、维护更简单

2、即时使用单线程模型也并发的处理多客户端的请求,主要使用的是IO多路复用和非阻塞IO

3、对Redis系统来说,主要的性能瓶颈是内存或网络带宽而并非CPU

既然单线程那么好,为什么逐渐加入多线程特性?

1、硬件的发展,CPU都是多核时代了,如果还是单线程对性能的使用效率不够充分

2、单线程的痛点是什么?不得不修改?

对于redis del k1,这些命令都是原子的。如果del 的是一个hash结构非常复杂的大key,可能会删半天。这就会造成Redis主线程的卡顿。

解决的方法就是使用 惰性删除(不立即删除,而是下次有人访问的时候删除并告诉访问的人没有了),于是在Redis4版本后,新增了多线程模块,当然4版本中的多线程主要是为了解决删除数据效率比较低的问题。

1
2
3
unlink k1  异步删除k1
flushdb async 异步删除db
flushall async 异步删除所有redis

redis6/7的多线程特性和IO多路复用的入门篇

首先影响Redis性能的主要因素:CPU、内存、网络IO

最后Redis的瓶颈可以初步定为网络IO

Redis6/7中,第一个新特性就是多线程。随着网络硬件性能的提升,Redis的性能瓶颈有时会出现在网络IO的处理上,也就是说,单个主线程处理网络请求的速度跟不上底层网络硬件的速度。

为了应对这个问题:采用多个IO线程来处理网络请求,提高网络请求处理的并行速度。

但是,Redis的多IO献策会给你只是用来处理网络请求的(也就是客户端socket连接),对于读写操作命令Redis仍然使用单线程来处理。而继续使用单线程执行命令操作,就不用为了保证Lua脚本,事务的原子性,额外开发多线程互斥加锁机制了,这样一来,Redis线程模型实现就简单了

总结就是网络IO请求是多线程,真正执行命令还是单线程

只要连同一个redis服务器,就命令都是一样的,如果每次连都连一个新的会大费周章

就像多人共用一个 Excel 表格,频繁创建和销毁 Redis 连接代价很高

1
2
3
4
5
6
7
8
9
10
# 每次都新建连接
redis-cli -a 11111 SET k1 v1
redis-cli -a 11111 SET k2 v2
redis-cli -a 11111 GET k1
# 启动一个 redis-cli 会话,复用连接
redis-cli -a 11111 <<EOF
SET k1 v1
SET k2 v2
GET k1
EOF

主线程和IO线程是怎么协作完成请求处理的?

四个阶段

首先主线程,接受建立连接请求,获取Socket(也就是redic-cli -a 密码 -c -p 端口号是标配)。然后将Socket放入全局等待队列。然后以论询方式将Socket连接分配给IO线程。然后主线程阻塞,等待IO县城完成请求读取和解析。然后IO线程开始执行,IO线程将Socket和线程绑定。然后IO线程读取Socket中的请求并解析。请求解析完成。然后回到主线程开始执行。然后主线程执行请求的命令操作。然后主线程请求的命令操作执行完成。然后主线程将结果数据写入缓冲区。然后主线程阻塞,等待IO线程完成数据回写Socket。然后IO线程开始执行。IO线程将结果数据回写Socket。然后Socket回写完成(相当于返回OK)。然后主线程开始执行,清空等待队列,等待后续请求。

阶段一:服务端和客户端建立Socket连接,并分配处理线程

首先,主线程负责接受建立连接请求。当有客户端请求和实例建立Socket连接时,主线程会创建和客户端的连接,并把Socket放入全局等待队列中。紧接着,主线程通过轮询方法把Socket连接分配给IO线程。

阶段二: IO线程读取并解析请求

主线程一旦把Socket分配给IO线程,就会进入阻塞状态,等待IO县城完成客户端请求读取和解析。因为有多个IO线程在并行处理,所以这个过程很快就完成。

阶段三:主线程执行请求操作

等到IO线程解析完请求,主线程还是会以单线程的方式执行这些命令操作。

阶段四:IO线程回写Socket和主线程清空全局队列

当主线程执行完请求操作后,会把需要返回的结果写入缓冲区,然后,主线程会阻塞等待IO线程,把这些结果回写到Socket中,并返回给客户端。和IO线程读取和解析请求一样,IO线程回写Socket时,也是有多个线程在并发执行,所以回写Socket的速度也很快。等到IO线程回写Socket完毕,主线程会清空全局队列,等待客户端的后续请求。

image-20251204014238814

image-20251204014246021

读到这里会有个问题:首先这个发送Socket请求然后有一个连接然后用IO线程解析命令这个是多线程的,那么多线程的IO线程谁先解析完谁给主线程执行命令,那么不就没办法保证解析的顺序,并且返回给主线程的顺序也没法保证吗?

Redis 的多线程 I/O 是“并行读取 + 串行提交”

IO 线程可以并行读取多个 socket 的数据,但必须按“socket 在事件循环中的就绪顺序”将解析结果提交给主线程。

后面会更加详细解释。

现在还有个问题,那这个IO线程是什么呢?

Unix网络编程中的五种IO模型

Blocking IO 阻塞IO

NoneBlocking IO 非阻塞IO NIO

IO multiplexing IO多路复用

1、Linxu世界一切皆文件

文件描述符,简称FD,句柄。FileDescriptor

文件描述符是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,他是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

image-20251204022627879

这个在JAVA1.0就有了,上面看不懂,通俗来说就是:我连上了我的redis,然后通过用redis-cli -a 密码连接,现在连上了,也就是java发起了一个连接请求,操作系统分配的听到了,然后会返回一个fd

文件描述符(FD) = 操作系统给“打开的资源”发的一个“编号门票”
你连 Redis、读文件、开网络连接……只要用了系统资源,OS 就给你一张“票”(FD),以后你干啥都靠这张票找 OS 要服务。

2、首次浅谈IO多路复用,IO多路复用是什么

一种同步的IO模型,实现一个线程监视 多个文件句柄一旦某个文件句柄就绪,就能够通知到对应应用程序进行相应的读写操作,没有文件句柄就绪时 就会阻塞应用程序,从而释放CPU资源。

Redis没法保证发送顺序,因为这是分布式系统的正常,因为可能因为网络波动或者别的因素影响发送到达的顺序,但 Redis 100% 保证:同一个客户端连接内的命令,严格按照发送顺序执行。

详细概念解释

I/O:网络I/O,尤其在操作系统层面指数据在内核态和用户态之间的读写操作。

网络 I/O 操作(比如读写 socket)一定会在“用户态”和“内核态”之间切换,因为只有操作系统内核才能直接操作网卡、管理 TCP 连接、收发数据包。

多路:多个客户端连接(连接就是套接字描述符,即socket或者channel)

复用:复用一个或几个线程。

IO多路复用:也就是说一个或者一组线程处理多个TCP连接,使用单进程就能够实现同事处理多个客户端的连接,无需创建或者维护过多的进程/线程

一句话:一个服务端进程可以同时处理多个套接字描述符。实现IO多路复用的模型有3中:可以分select ->poll -> epoll三个阶段来描述。

3、场景体验,说人话引出epoll

模拟一个tcp服务器处理30个客户socket。

假设你是一个监考老师,让30个学生解答一道竞赛考题,然后负责验收学生答卷,你有下面几个选择:

1、轮询:按顺序逐个验收,这中间如有一个学生卡住,全班都耽误,用循环挨个处理socket,根本不具有并发能力。

2、来一个new一个,1对1服务:每个分身线程检查一个学生的答案是否正确。这种类似于为每一个用户创建一个进程或者线程处理连接

3、响应式处理,1对多服务:你站在讲台上等,谁解答完谁拒收。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。这就是IO复用模型,Linux下的select、poll、epoll就是干这个的。

将用户socket对应的文件描述符(FileDescriptor)注册进epoll,然后epoll帮你监听那些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用 非阻塞模式。这样,整个过程,只有在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor反应模式。

image-20251204134523948

在单个线程 通过记录跟踪每一个Socket的状态来同事管理多个IO流,一个服务端进程可以同时处理多个套接字付描述符。目的是尽量多的提高服务器的吞吐能力。

大家都用过nginx,nginx用epoll接受请求,nginx会有很多连接进来,epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。redis类似同理,这就是IO多路复用原理,有请求就相应,没请求不打扰。

4、小总结

只使用一个服务端进程可以同时处理多个套接字描述符连接。

redis-cli -a 密码这个命令让客户端连接服务器了,内部的处理是复杂的。

客户端请求服务端时,实际就是在服务端你的Socket文件中写入客户端对应的文件描述符,如果有多个客户端同时请求服务端,为每次请求分配一个线程类似每次来都New一个就比较耗费服务端资源。因此我们只使用一个线程来监听多个文件描述符,这就是IO多路复用。

采用多路IO复用技术可以让单个线程高效的处理多个连接请求一个服务端进程可以同时处理多个套接字描述符。

5、面试题:redis为什么这么快

IO多路复用+epoll函数使用,才是redis为什么这么快的直接原因,而不仅仅是单线程命令+redis安装在内存中。

signal driven IO 信号驱动IO

asynchronous IO 异步IO

总结

Redis工作线程是单线程的,但是,整个Redis来说是多线程的。

主线程和IO线程是怎么协作完成请求处理的?

IO的读和写本身是堵塞的,比如当socket中有数据时,Redis会通过调用先将数据从内核态空间拷贝到用户态空间,再交给Redis调用,而这个拷贝的过程就是阻塞的,当数据量越大的时候拷贝所需要的时间就越多,而这些操作都是基于单线程完成的。

image-20251204142727646

从Redis6开始,就新增了多线程的功能来提高IO的读写性能,他的主要实现思路是将主线程的IO读写任务拆分给一组独立的线程去执行,这样就可以使用多个socket的读写可以并行化了,采用多路IO复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),将最耗时的Socket的读取、请求及诶、写入单独外包出去,剩下命令执行仍然由主线程串行执行并和内存的数据交互。

image-20251204143236171

这样网络IO操作就变成多线程化了,其他核心部分仍然是线程安全的,是个不错的折中方法。

image-20251204143640027

redis7默认是否开启了多线程?

如果你在实际应用中,发现Redis实例的CPU开销不大但吞吐量却没有提升,可以考虑使用Redis7的多线程机制,加速网络处理,进而提升实例的吞吐量。

Redis将所有数据放在内存中,内存的响应时长大约为100纳秒,对于小数据包,Redis服务器可以处理8W到10W的QPS(“每秒请求数”),这是Redis处理极限了,对于80%的公司来说,单线程的Redis已经足够使用了。

在Redis6 / 7 之后,多线程机制默认是关闭的,如果需要使用多线程功能,需要在redis.conf中完成两个设置

1
2
3
设置io-thread-do-reads配置项为yes,表示启用多线程。
io-threads 4 设置线程个数,官方的建议是如果为4核的CPU,建议线程数设置为2或3,如果8核就设置6,线程数一定小于机器核数
unlink key / flushall async命令

BigKey

面试题

海量数据里查询某一固定前缀的key

如何生产上限制key * /flushdb / flushall 等危险命令以防止误删误用

memory usage命令用过吗?

bigkey问题,多大算big?如何发现?如何删除?如何处理?

BigKey你做过调用吗?惰性释放lazyfree了解过吗?

Morekey问题,生产上redis数据库有1000W条记录,如何遍历?key *可以吗?

MoreKey案例

如何写个小脚本,写500W条key,操作多个key问题。

1、大批量插入redis

Linux Bash下执行,插入100w

通过redis提供的管道 --pipe命令插入100w大批量数据

1
2
3
4
for((i = 1; i <=100*10000 ; i++));do echo "set k$i v&i" >> /tmp/redisTest.txt;done;
more /tmp/redisText.txt
那么怎么让这些命令灌进入redis里面
cat /tmp/redisTest.txt | /opt/redis-7.0.0/src/redis-cli -h 127.0.0.1 -p 6379 -a 111111 --pipe

现在测试数据准备好了,然后这时候在生产环境中,还能用keys *吗

1
flushdb这些命令没有offset或者limit这些,所以执行的时候会锁住,导致后续的请求整个链路卡住,十几秒结束后,数据库产生了雪崩效应

如何限制keys * 这些危险命令防止误删误用

通过配置设置禁用这些命令,在redis.conf里面的security里面

1
2
3
4
5
security大概在1042行附近
rename-command keys ""
rename-command flushdb ""
rename-command flushall ""
然后重启就生效了

不用keys 避免卡顿,那用什么

用 scan命令 类似Limit但是不完全相同

scan命令用于迭代数据库中的键,里面有sscan(set),hscan(hash),zscan(zset)

1
2
3
4
5
6
7
8
9
scan 游标 [匹配元素] [count 个数]默认10个
基于游标的迭代器,每次被调用后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为scan命令的游标参数,一次来延续之前的迭代过程。
scan返回两个元素数组,第一个元素是游标,第二个元素是匹配的元素的数组
如果返回的游标是0,表示迭代结束。
支持模糊查询,一次返回的数量不可控,只能是大概率符合count参数

scan的顺序不是从第0位一直到末尾的,而是采用了高位进位加法来遍历。这样特殊的方式进行遍历,是考虑到字典的扩容和缩容,避免槽位的遍历重复和遗漏

scan 0 match k* count 15

BigKey案例

多大算BigKey

大的内容不是key本身,而是它对应的value

参考《阿里云开发规范》

string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000

非字符串的bitkey,不要使用del删除,(除了string可以用del删除其他的),使用hscan、sscan、zscan方式渐进式删除,同时要注意防止Bigkey过期时间自动删除问题,比如200w个元素的zset设置1小时过期,然后删除的时候就会造成阻塞

string和二级结构

string是value,最大512MB,但是大于等于10KB的就是bigkey

list、hash、set和zset,个数超过5000就是Bigkey

哪些危害

1、内存不均,集群迁移困难

2、超时删除,大key删除不方便

3、网络流量阻塞

如何产生

1、社交类,王心凌粉丝列表,粉丝逐步递增

2、汇总统计,某个报表,积累多了

如何发现

redis-cli --bigkeys

好处:会给一个总结,给出每种数据结构Top1 bigkey,同时给出每种数据类型的键值个数+平均大小

不足:想查询大于10kb的所有Key,--bigkeys参数就无能为例了,需要用到memory usage来计算每个键值的字节数。

1
2
redis-cli -h 127.0.0.1 -p 6379 -a 密码 -bigkeys
如果加参数 -i 0.1 也就是每隔100条 scan指令就会休眠0.1s,ops(每秒操作数)就不会剧烈抬升,但是扫描的时间会变长。

MEMORY USAGE键

给出一个key和他的值,在RAM内存中所占用的字节数,返回的结果是key的值以及为管理该key分配的内存总字节数。

1
memory usage k1

如何删除

1
2
3
4
5
6
heset customer:001 id 11 cname li4 age 23 score 80
hdel customer:001 score age
这样就先把field一个个删除了
对于string类型 直接del删除 如果过于庞大unlink异步删除
对于hash类型,使用hscan每次获取少量field-value,再使用hdel删除每个field
hscan + hdel

image-20251204202830356

1
2
3
4
5
6
7
8
list
使用ltrim渐进式逐步删除,直到全部删除完成

ltrim是对一个列表进行修剪,让列表只保留指定区间的元素,不在指定区间的元素被删除。
举例
rpush list 1 2 3 4 5
ltrim list 0 2
这样List里面就剩下1 2 3

image-20251204203757806

1
2
3
4
5
set
使用sscan每次获取部分元素,再使用srem命令删除每个元素
SSCAN key cursor [MATCH pattern] [COUNT count]
sscan set 0 遍历所有元素
srem set a b 删除元素a b

image-20251204205627451

1
2
3
4
5
6
7
zset
使用zscan每次获取部分元素,再使用zremrangebyrank命令删除每个元素
zadd salary 3000 z3 4000 li4 5000 w5 7000 s7
zrange salary 0 -1 withscores
zscan salary 0 渐进式遍历 salary 这个有序集合
zremrangebyrank salary 0 1 按排名范围删除元素 —— 删除 salary 中 排名从 0 到 1 的元素(包含两端)
zrange salary 0 -1 withscores

image-20251204234455781

BigKey生产调优

redis.conf 配置文件LAZY FREEING相关说明

vim redis.conf

redis删除有两种方法,一个是阻塞的del,一个是非阻塞的unlink,

默认删除对象使用阻塞的方法。如果需要非阻塞型的需要修改一些配置,能提高性能。

image-20251205004353796

1
2
3
lazyfree-lazy-server-del  UNLINK key 是 Redis 4.0+ 提供的异步删除命令
replica-lazy-flush 当 Redis 内部自动触发删除操作(非用户直接调用 DEL/UNLINK)时,是否使用惰性删除?
lazyfree-lazy-user-del 当从节点(Replica)执行 FLUSHDB 或 FLUSHALL(通常由主节点同步过来的命令)时,是否使用惰性删除?

缓存双写一致性之更新策略讨论

image-20251205005942934

默认MySQL有数据。

上面的业务逻辑用java代码如何写?

面试题

1、你只要用缓存,就可能涉及到redis缓存和数据库双存储双写,如何解决一致性问题?

2、双写一致性,你先动缓存redis还是数据库mysql哪一个?why?

3、延迟双删你做过吗?会有那些问题?

4、有一种情况,微服务查询redis没有,mysql有,为保证数据双写一致性回写redis你需要注意什么?双检加锁策略你了解过吗?如何尽量避免缓存击穿?

5、redis和mysql双写100%会出现纰漏,做不到强一致性,如何保证 最终一致性?

缓存双写一致性

如果redis中有数据,需要和数据库中的值相同

如果redis中无数据,数据库中的值要是最新值,且准备回写redis

缓存按照操作来分,细分2种:只读缓存(只做查询缓存没有第三步,不回写)。

大部分是 读写缓存,那么读写缓存又有两种策略

1、同步直写策略

只要完成了第二步,Mysql尽量的同步写redis缓存,缓存和数据库中的数据一致,要想保证数据一致,就采用同步直写策略。(热点敏感数据,VIP数据)

2、异步缓写策略

业务不用这么急,正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,比如仓库、物流系统,积分变更等。

还有种情况是异常情况出现了,不得不将失败的动作重新修补,有可能需要借助kafka或者RabbitMQ等消息中间件,实现重试重写。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Service
@Slf4j
public class UserService{
//定义key的前缀
public static final String CACHE_KEY_USER = "user:";
@Resource
private UserMapper userMapper;
@Resource
private RedisTemplate redisTemplate;

/**
* 业务逻辑没有写错,对于QPS(每秒操作数)<= 1000)可以使用,但是大厂不行
* @param id
* @return
*/
public User findUserById(Integer id){
User user = null;
String key = CACHE_KEY_USER + id;

//1 先从redis里面查询,如果有直接返回结果,如果没有再去查询mysql
user = (User) redisTemplate.opsForValue().get(key);

if(user == null){
//2 redis里面无,继续查询mysql
user = userMapper.selectByPrimaryKey(id);
if(user == null){
//3.1 redis+mysql 都无数据
//你具体细化,防止多次穿透,我们业务规定,记录下这个null值的key,列入黑名单或者记录或者异常.....
return user;
}else{
//3.2 mysql有,需要将数据写回redis,保证下一次的缓存命中率
redisTemplate.opsForValue().set(key,user);
}
}
return user;
}
}

上面的代码中,如果在大厂里面是不行的,如果redis操作和Mysql操作间隔非常小,还没回写就重复查询到Mysql里面导致多次回写。

采用双检加锁策略

解决一堆线程查询到Mysql,一堆线程回写问题。

多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上 使用一个互斥锁来锁住它。 其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。

后面的线程进来发现已经有缓存了,就直接走缓存。

为什么要双检,这类似单例原则。

假设有两个线程 A 和 B 同时进入:

1
2
3
4
5
6
7
线程 A: if (instance == null) → true
线程 B: if (instance == null) → true (A 还没创建完)

线程 A: 获得锁 → 创建 instance
线程 A: 释放锁

线程 B: 获得锁 → 再次创建 instance!❌
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public String get(String key){
String value = redis.get(key);//查询缓存
if(value != null){
//缓存存在直接返回
return value;
}else{
//缓存不存在则对方法加锁
//假设请求量很大,缓存过期
synchronized(TestFuture.class){
value = redis.get(key);//再查一遍redis
if(value != null){
//查询数据直接返回
return value;
}else{
//二次查询缓存也不存在,直接查DB
value = dao.get(key);
//数据缓存
redis.setnx(key,value,time);
//返回
return value;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
@Slf4j
public class UserService{
//定义key的前缀
public static final String CACHE_KEY_USER = "user:";
@Resource
private UserMapper userMapper;
@Resource
private RedisTemplate redisTemplate;

/**
* 业务逻辑没有写错,对于QPS(每秒操作数)<= 1000)可以使用,但是大厂不行
* @param id
* @return
*/
public User findUserById2(Integer id){
User user = null;
String key = CACHE_KEY_USER + id;

//1 先从redis里面查询,如果有直接返回结果,如果没有再去查询mysql
user = (User) redisTemplate.opsForValue().get(key);

if(user == null){
//2 大厂用,对于高QPS的优化,进来就先加锁,保证一个请求操作,让外面的redis等待一下,避免击穿mysql
synchronized(UserService.class){
user = (User) redisTemplate.opsForValue().get(key);
//3 二次查redis还是null,可以去查mysql了(mysql默认有数据)
if(user == null){
//4 查询mysql拿数据
user = userMapper.selectByPrimaryKey(id);//mysql有数据默认
if(user == null){
return user;
}else{
//5 mysql里面有数据的,需要回写redis,完成数据一致性的同步工作
redisTemplate.opsForValue().setIfAbsent(key,user,7L,TimeUnit.DAYS);
}
}
}
}
return user;
}
}

数据库和缓存一致性的几种更新策略

目的:我们要达到最终一致性!

给缓存设置过期时间,定期清理缓存并回写

所有的 写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,切记,要以mysql的数据库写入库为准

上述的是一种方案,下面的也是。

一致性模型 特点 适用场景
强一致性 读一定拿到最新写入的值 银行转账、库存扣减
最终一致性 允许短暂不一致(上面的案例),但最终会一致 用户资料、文章阅读量、非核心业务

可以停机的情况

今天晚上更新数据,不用边跑边更新数据。首先挂牌报错,凌晨升级,温馨提示,服务降级。单线程,这样重量级的数据操作最好不要多线程。

不能停机的情况,4种更新策略

第一种:先更新数据库,再更新缓存(❌️)

但是如果停机了,百分之百只有一个线程在更新是可以的

异常问题1

1 先更新mysql的商品库存,当前商品的库存是100,更新为99

2 更新Mysql修改为99成功,然后更新redis

3 此时异常出现,更新redis失败了,这导致mysql数据正确,redis错误

4、这时候访问redis,读到了脏数据

异常问题2

A,B两个线程发起调用

正常逻辑

1、A update mysql 100

2、A update redis 100

3、B update mysql 80

4、B update redis 80

但是在多线程环境下,A、B有先有后并行

这时候

1 -> 3 -> 4 -> 2这样就出错了

最终mysql和redis数据不一致了

第二种:先更新缓存,在更新数据库(❌️)

这个不太推荐,一般把mysql作为底单数据库,保证最后解释。

异常问题2

A,B两个线程发起调用

正常逻辑

1、A update redis 100

2、A update mysql 100

3、B update redis 80

4、B update mysql 80

但是在多线程环境下,A、B有先有后并行

这时候

1 -> 3 -> 4 -> 2这样就出错了

最终mysql和redis数据不一致了。redis 80 mysql 100

第三种:先删除缓存,再更新数据库(❌️)

异常问题:

1、A现成先删除redis,然后更新mysql,此时mysql正在更新中,还没有结束(比如网络延时)

B突然出现要来读取缓存数据。

2、此时redis里面的数据是空的,B线程来读取,先去读取redis里面的数据,这时候已经被A线程删掉了

那么B就会从Mysql里面获得旧值,B线程发现redis里没有(缓存缺失)马上去mysql里面读取, 从数据库里读取来的是旧值

并且B会把获得的旧值写回redis,那么刚被A删除的旧数据又被写回了

3、这时候A更新完mysql,发现redis里面的缓存是脏数据。

解决方案:

采用延时双删策略:

image-20251205195913297

延时双删面试题

这个删除休眠该多久呢?

线程A sleep的时间,需要大于线程B读取数据再写入缓存的时间。

这个时间怎么确定呢?

第一种方法:在业务程序运行的时候,统计下程序读数据和写缓存的操作时间,自行评估自己的项目读数据业务逻辑耗时,以此为基础进行估算,然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加 百毫秒 即可。

这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

第二种方法:新启动一个后台监控程序,比如后面要讲的WatchDog监控程序,会加时

这种同步淘汰策略,吞吐量降低怎么办?

image-20251205204706286

第一次删除是为了防止其他线程读到脏数据

延时双删的缺点既有读取到脏数据又有污染缓存

后续看门狗WatchDog源码分析

第四种:先更新数据库,再删除缓存(⚠️)

异常问题

线程A更新数据库,还没更新完,B缓存立刻命中旧值,A更新缓存数据。

唯一的问题,读到的缓存旧值问题。

微软云是用的是先更新数据库,再删除缓存。阿里巴巴的canel也是类似的思想。

不可能保证强一致性,那如何保证最终一致性?

解决方案:消息中间件。

image-20251205220900975

1、更新数据库数据

2、数据库会将操作信息写入binlog日志当中

3、订阅程序(监控程序)提取出所需要的数据以及key

4、另起一段非业务代码,新开一个线程,异步啥的,获得该binlog信息,跟主业务分离

5、尝试删除缓存操作,发现缓存失败。如果成功就皆大欢喜

6、将这些信息发送到消息队列

7、重新从消息队列中获得该数据,重试操作

1
2
3
4
可以吧要删除的缓存值或者是要更新的数据库值暂存到消息队列中(kafka或者rabbitmq)
当程序没有能够成功的删除缓存值或者更新数据库值的时候,可以从消息队列中重新读取这些值,然后再次进行删除或者更新。
如果能成功的删除或者更新,我们就把这些值从消息队列中去除,以免重复操作,此时我们也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试。
如果重试超过一定次数后还是没有成功,我们就需要向业务层发送报错信息了,通知运维人员

实际场景 流量充值,线下发短信实际充值可能滞后5分钟,可以接受。电商发货,短信下发,但是物流明天见。

小总结

优先使用先更新数据库,再删除缓存方案。

理由如下:

1、先删除缓存值再更新数据库,有可能导致请求因缓存缺失而访问数据库,给数据库带来压力导致打满mysql。

2、如果业务应用中读取数据库和写缓存的时间不好估算,那么延迟双删中的等待时间就不好设置。

如果业务层要求必须读取一致性的数据,那么我们就需要在更新数据库的时候,先向Redis缓存客户端暂停并发读请求,等数据更新完,缓存删除后,再读取数据,从而保证数据一致性。但真实环境上,不推荐,分布式下很难做到实时一致性,一般都是最终一致性。

image-20251205223256987

Redis和MySQL数据双写一致性工程落地案例

复习+面试题

1、首先我们得到的比较好的方案是先更新mysql,再动redis,避免redis业务key突然消失,多线程请求集火打满mysql

2、懂,写操作

先更新数据库,再删除缓存。尝试使用双检加锁机制lock住mysql,只让一个请求线程回写redis,完成数据一致性。

3、面试提问:我想mysql有记录改动了(有增删改写操作),立刻同步反应到redis??如何做?你怎么知道mysql有改动?(Mysql监听)

首先Mysql有改动就有Binlog日志,动过就能知道Mysql有变更。那么现在就是需要能够监听到mysql变动并且能够通知给redis。

那么就用到了阿里巴巴的中间件canal

canal

是什么

阿里巴巴 Mysql binlog增量订阅组件、消费和解析。

能干嘛

数据库镜像

数据库实时备份

索引构建和实施维护(拆分异构索引、倒排索引等)

业务cache刷新

带业务逻辑的增量数据处理

image-20251205232432070

去哪下

Releases · alibaba/canal · GitHub

工作原理,面试回答

传统Mysql主从复制工作原理

MySQL的主从复制将经过如下步骤:

image-20251205233356573

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,(offset偏移量是否改变)

如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal工作原理

image-20251205233847309

有点类似消息中间件。

mysql-canal-redis双写一致性Coding

先处理Mysql(windows)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1、查看mysql版本  select version();   5.7.28
2、当前的主机二进制日志 show master status; mysql-bin.000085 13117(偏移量position)
3、show variables like 'log_bin' 默认是off,是否开放日志
4、开启mysql的binlog写入功能
5、打开mysql目录,最好提前备份,my.ini
在[mysqld]下面添加三行
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
(ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;)
6、重启mysql
7、再次查看是否开放日志
8、授权canal连接mysql账号
mysql默认的用户在mysql库的user表里
select * from mysql.user;
默认没有canal账户,此处新建+授权
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;

SELECT * FROM mysql.user;

处理canal服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1、下载
在github上下载canal.deployerxx.tar.gz
2、解压
解压后整体放入/mycananl路径下
在mycanal路径下tar -zxvf canal.deployerxx.tar.gz
3、配置
修改/mycanal/conf/example路径下instance.properties文件
换成自己的mysql主机master的IP地址
canal.instance.master.address=192.168.111.1:3306
换成自己在mysql新建的canal账户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

4、启动
进入到/mycanal/bin目录
然后./startup.sh 安装好java8

5、查看
判断canal是否启动成功

1、查看server日志
进入/mycanal/logs/canal然后运行cat canal log
2、查看样例example的日志
进入/mycanal/logs/example然后cat example.log

canal客户端(java编写业务程序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1、SQL脚本
这里举个样本案例,建个数据表库
CREATE TABLE `t_user` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`userName` varchar(100) NOT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

2、建module
canal_demo02
3、改POM
引入canal的jar包
4、写YML

5、主启动
@SpringBootApplication
public class CanalDemo02App{
public static void main(String[] args){
//SpringApplication.run(CanalDemo02App.class,args);
}
}
这个 Maven 模块(module)并不是一个独立运行的服务,而是一个“被其他程序依赖的监听逻辑库”。
这个类被 另一个真正的 Spring Boot 应用 所引用和启动
当前这个 CanalDemo02App 只是一个代码模块(library),不是可执行服务
只是提供 @Component、@Service 等 Bean
可以理解为一个Spring 组件库,里面提供各种Bean工具给其他程序调用
6、业务类

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.atguigu.canal</groupId>
<artifactId>canal_demo02</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
<relativePath/>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.16</druid.version>
<mapper.version>4.1.5</mapper.version>
<mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
</properties>

<dependencies>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<!--SpringBoot通用依赖模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--SpringBoot与AOP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<!--Mysql数据库驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--SpringBoot集成druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
<!--hutool-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!--persistence-->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0.2</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.8.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

YML

1
2
3
4
5
6
7
8
9
server.port=5555

# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false

业务类

1、RedisUtils 包装jedis的util

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RedisUtils
{
public static final String REDIS_IP_ADDR = "192.168.111.185";
public static final String REDIS_pwd = "111111";
public static JedisPool jedisPool;//利用池化技术

static {
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
}

public static Jedis getJedis() throws Exception {
if(null!=jedisPool){
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}

}

2、RedisCanalClientExample

简略版

1
2
3
4
5
6
7
8
9
10
11
12
public class RedisCanalClientExample
{
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.111.185";

//三大写操作
private static void redisInsert(List<Column> columns){}
private static void redisDelete(List<Column> columns){}
private static void redisUpdate(List<Column> columns){}
public static void printEntry(List<Entry> entrys) {}//打印实体方法
public static void main(String[] args){}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public class RedisCanalClientExample
{
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.111.185";

private static void redisInsert(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
}catch (Exception e){
e.printStackTrace();
}
}
}


private static void redisDelete(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.del(columns.get(0).getValue());
}catch (Exception e){
e.printStackTrace();
}
}
}

private static void redisUpdate(List<Column> columns)
{
JSONObject jsonObject = new JSONObject();
for (Column column : columns)
{
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
if(columns.size() > 0)
{
try(Jedis jedis = RedisUtils.getJedis())
{
jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
}catch (Exception e){
e.printStackTrace();
}
}
}

public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}


public static void main(String[] args)
{
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

//=================================
// 创建链接canal服务端
//利用阿里巴巴提供的api,传入相关参数直接用,Canal Server 的 IP(这里跟redis放一起所以写redisIP地址),Canal Server(阿里 Canal 服务)的 TCP 端口号,指定要订阅的 Canal 实例名称之前配的conf/example(这个就是实例名),用户名,密码(这里默认不写就会读取之前配置文件,写了就会覆盖)
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
11111), "example", "", "");
int batchSize = 1000;//每次批处理操作
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");默认所有库的所有表
connector.subscribe("bigdata.t_user");//一个库一个表,不要所有库都双写一致性
connector.rollback();//Canal 客户端对位点(position)的重置操作,目的是确保从最新位置开始消费 binlog,避免重复或遗漏。
int totalEmptyCount = 10 * _60SECONDS;//监听程序10分钟就结束了,可以修改为永久
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {//如果没什么变动
emptyCount++;
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}

题外话

如何监控多个表?java程序下配置过滤正则

image-20251206020210019

要么程序指定然后覆盖配置,要么在配置里面写白名单或者黑名单

忘记关闭jedis了?关闭资源代码简写try-with-resources释放资源

java在7后增加了try-with-resources,他是一个生命或多个资源的try语句。一个资源作为一个一个对象,必须在程序结束后关闭。try-with-resources语句确保在语句的最后每个资源都被关闭。也就是在try括号里面的资源,只要实现了AutoCloseable和Closeable的对象都可以用这个关闭资源。

案例落地实战bitmap/hyperloglog/GEO

面试题

面试题1

1、抖音电商直播,主播介绍的商品有评论,1个商品对应了1系列的评论,排序+展现+取前10条记录

2、用户在手机APP上的签到打卡信息:1天对应1系列用户的签到记录,新浪微博,钉钉打卡签到,来没来如何统计

3、应用网站上的网页访问信息:1个网页对应1系列的访问点击,淘宝网首页,每天有多少人浏览首页

4、你们公司系统上线后,说一下UV(Unique Visitor)—— 独立访客数、PV(Page View)—— 页面浏览量、DAU(Daily Active Users)—— 日活跃用户数分别是多少

面试题2

面试问记录对集合中的数据进行统计

1、在移动应用中,需要统计每天的新增用户数和第二天的留存用户数

2、在电商网站的商品评论中,需要统计评论列表中的最新评论

3、在签到打卡中,需要统计一个月内连续打卡的用户数

4、在网页访问记录中,需要统计独立访客量。

痛点:类似今日头条、抖音这些用户访问级别都是亿级别的,请问如何处理?

需求痛点

亿级数据的收集+清洗+统计+展现

一句话:存的进+取的快+多维度

真正有价值的是统计

统计的类型有哪些?

亿级系统中常见的四种统计

1、聚合统计

统计多个集合元素的居合结果,就是前面讲过的交差并等集合统计

1
2
3
4
复习命令
A-B 属于A不属于B的 SDIFF key [key ...]
A并B 属于A或者B的元素合并后 SUNION key [key ...]
A交B 属于A也属于B的共同拥有的 SINTER key [key...] SINTERCARD numkeys key [key ...] [LIMIT limit]

2、排序统计

抖音短视频最新评论留言的场景,请你设计一个展现列表。能够排序+分页的redis数据结构用什么合适?

zset 在面对需要展示最新列表,排行榜等场景时,如果数据更新频繁或者需要分页显示,建议使用Zset

3、二值统计

集合元素的取值就只有0和1两种。

在钉钉上班签到打卡的场景中,我们只用记录有或没。用 bitmap

4、基数统计

统一一个集合中 不重复的元素个数

用hyperloglog

hyperloglog

UV Unique Visitor,独立访客,一般理解为客户端IP 需要去重考虑

PV Page View , 页面浏览量,不用去重

DAU Daily Active User 日活跃用户量,登录或者使用了某个产品的用户数(去重复登录的用户)

常用语反应网站,互联网应用或者网络游戏的运营情况

MAU Monthly Active User 月活跃用户量

它只能用于统计巨量数量,不太设计具体的统计对象的内容和精准性

统计单日一个页面的访问量(PV),单次访问就算一次。

统计单日一个页面的用户访问量(UV),即按照用户为维度计算,单个用户一天内多次访问也只算一次。

多个key的合并统计,某个门户网站的所有模块的PV聚合统计就是整个网站的总PV。

1
2
3
4
5
6
7
去重复功能的基数估计法就是Hyperloglog
复习命令
pfadd hll01 2 3 4 5
pfadd hll02 2 4 4 4 6
pfcount hll02
pfmerge distResult hll01 hll02
pfcount distResult

去重复统计你会先想到哪些方式?(演化)

最先想到hashset

1
2
3
4
5
@Test
public void test(){
List<String> list = Arrays.asList("22.1.01.1","192.168.12.1");
HashSet<String> sets = new HashSet<>(list);
}

然后用bitmap

如果数据量大假设list里面有4个亿,hashset明显会性能下降。

如果数据显较大亿级统计,使用bitmaps同样会有这个问题。但是百万级别是可以的,方法是精确计算,并且百万级别可以应用。

bitmap是通过用位bit数组来表示各元素是否出现,每个元素对应一位,所需的总内存为N个bit。

基数计数则将每一个元素对应到bit数组中的其中一位,比如bit数组010010101(按照从零开始下标,有的就是1、4、6、8)。

新进入的元素只需要将已经有的bit数组和新加入的元素进行按位或计算就行。这个方式能大大减少内存占用且位操作迅速。

But,假设一个样本案例就是一亿个基数位值数据,一个样本就是一亿

如果要统计1亿个数据的基数位值,大约需要内存100000000/8/1024/1024约等于12M,内存减少占用的效果显著。

这样得到统计一个对象样本的基数值需要12M。

如果统计10000个对象样本(1w个亿级),就需要117.1875G将近120G,可见使用bitmaps还是不适用大数据量下(亿级)的基数计数场景,

但是bitmaps方法是精确计算的。

解决办法:概率算法

通过牺牲准确率来换取空间,对于不要求绝对准确率的场景下可以使用,因为概率算法不直接存储数据本身,通过一定的概率统计方法预估基数值,同时保证误差在一定范围内,由于又不储存数据故此可以大大节约内存。

HyperLogLog就是一种概率算法的实现。

原理说明

首先只是进行不重复的基数统计,不是集合页不保存数据,只记录数量而不是具体内容

有误差。误差仅仅只是0.81%左右

这个误差哪里来的?Redis之父安特雷兹说的

淘宝网站首页亿级UV的Redis统计方案

需求:UV的统计需要去重,平均是1~1.5亿,如果每天存1.5亿的IP,访问者来了后先查是否存在,不存在就加入。

方案讨论:

1、用Mysql ,傻X,不解释

2、用redis的hash结构,

1
2
3
redis——hash = <keyDay,<ip,1>>
按照ipv4的结构来说明,每个ipv4的地址最多是15个字节(ip = "192.168.111.1",最多xxx.xxx.xxx.xxx)
某一天的1.5亿 * 15个字节= 2G,一个月60G,redis死定了。o(╥﹏╥)o

3、hyperloglog

image-20251206103257350

为什么是只需要花费12Kb?

image-20251206103349172

HyperLogLogService

1

HyperLogLogController

1