Redis缓存
1、Redis缓存介绍;
2、Redis缓存异常现象;
3、缓存一致性(Redis6新特性)
1、Redis缓存介绍
Redis作为缓存使用,已经非常普遍了。由于其完全基于内存,且有丰富的存储结构以及持久化功能,此外处理性能极高,使其广受公司的青睐。缓存场景有很多,比如计数器,统计UV/PV等等。
由于其是放在内存中,那么就必然有一个最大内存的限制,如果达到了限制,就需要进行清除。内存限制可以配置。如果不配置,那么在64位操作系统就是没限制;在32位系统最大是3GB。
Redis支持数据的过期设置,如果已经超过了过期时间,可以进行删除操作,支持惰性删除和定期删除两种;
此外,如果超过内存限制,redis会进行垃圾回收。之前在垃圾回收其实也有过介绍。 垃圾回收机制的实现(php,python,golang,java)
要注意,过期删除和内存淘汰并不是一回事,过期删除是删除已经过了有效期的key。在访问时会判断key是否过期,如果过期直接删除;此外,Redis会每隔一段时间扫描过期字典的数据,每次清除一定数量的过期key。这两个操作都是在主线程中操作。
1)惰性删除:只有当访问某个 key 时,才判断这个 key 是否已过期,如果已过期,则从实例中删除。
2)定期删除:Redis 内部维护了一个定时任务,默认每隔 100 毫秒(1秒10次)就会从全局的过期哈希表中随机取出 20 个 key,然后删除其中过期的 key,如果过期 key 的比例超过了 25%,则继续重复此过程,直到过期 key 的比例下降到 25% 以下,或者这次任务的执行耗时超过了 25 毫秒,才会退出循环。
如果我们想删除某个key,之前我们是用del,这是同步删除;Redis后期又提供了一个unlink方法,这个是异步删除。
内存淘汰:
Redis支持的淘汰策略就是主要的那几种,比如LRU,TTL等等。具体如下:
# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key according to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations
Redis默认使用的是noeviction.
先看下Redis的value的结构体:
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
说一下上面的LRU,Redis中的LRU是采用惰性删除的执行方式,而且并不是真正意义上的LRU,真正的LRU是建立一个双向链表,每当有数据加入就加入头部,如果超过最大长度限制,就从尾部移除。
LRU时钟获取方式:
unsigned int getLRUClock(void) {
return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}
Redis的实际操作步骤:当进行一个操作时,如果超出了最大内存限制,就随机选择5个key(选择什么样的key取决于配置的策略时volatile还是allkeys;选择几个还要根据配置来server.maxmemory_samples),然后淘汰最旧的key。这个最旧是根据每个对象的lru字段来判定的,该字段每次被访问的时候都会进行更新。如果内存还超,就继续执行上面的操作,直到低于最大限制。Redis之所以选择这种方式是为了减少额外的消耗。
看上面的算法有个问题就是这种随机选择的key,导致新添加的key也会有被剔除的可能。基于此,Redis3.0进行了一些优化。它主要是维护一个候选池(大小为16),池中的数据根据对象的lru时钟字段进行排序。第一次随机选取的key都会放入池中,随后每次随机选取的key只有在访问时间小于池中最小的时间才会放入池中,直到候选池被放满。当放满后,如果有新的key需要放入,则将池中lru最大的移除去。
当需要进行内存淘汰的时候,则直接从池中选取最近访问时间最小(最久没被访问)的key淘汰掉就行。
下面完全按照Redis3之后的LRU思想来实现的。
from datetime import datetime
import random
import time
#相当于Redis的value结构体
class Value:
def __init__(self, value):
self.value = value
self.lru = datetime.now().time() # 获取当前时间戳
class RedisLRU:
RANDOM_KEY_NUM = 10
POOL_KEY_MAX_NUM = 6
def __init__(self, size=10):
self.size = size
self.cache = {}
self.deal_pool = {}
def get(self, key):
if key in self.cache:
vue_obj = self.cache[key]
self.cache[key].lru = datetime.now().time()
if key in self.deal_pool:
self.deal_pool[key].lru = self.cache[key].lru
print(self.deal_pool[key].lru)
return vue_obj.value if isinstance(vue_obj, Value) else None
return None
def set(self, key, value):
if len(self.cache) >= self.size:
self.recyle()
vue_obj = Value(value)
self.cache[key] = vue_obj
def mset(self, dict_item):
if not isinstance(dict_item, dict):
return
while len(self.cache) + len(dict_item) >= self.size:
self.recyle()
for key, value in dict_item.items():
vue_obj = Value(value)
self.cache[key] = vue_obj
def recyle(self):
"""
算法:每次随机选择5个key,放入到pool中,按序排列,放入的key一定是比当前最小的还小。如果放满了,直接剔除lru最大的那个
:return:
"""
random_keys = random.sample(self.cache.keys(), RedisLRU.RANDOM_KEY_NUM)
for key in random_keys:
if len(self.deal_pool) < RedisLRU.POOL_KEY_MAX_NUM:
if key not in self.deal_pool:
self.deal_pool[key] = self.cache[key]
if len(self.deal_pool) == RedisLRU.POOL_KEY_MAX_NUM:
# 删除pool LRU最大的
max_pool_key = max(self.deal_pool.keys(), key=(lambda k: self.deal_pool[k]))
self.deal_pool.pop(max_pool_key)
# 直接淘汰poll中LRU最小的
key = min(self.deal_pool.keys(), key=(lambda k: self.deal_pool[k].lru))
self.cache.pop(key)
self.deal_pool.pop(key)
if __name__ == "__main__":
cache = RedisLRU()
num_list = range(1, 15)
for x in num_list:
cache.set(str(x), x)
time.sleep(1)
minke = str(min(map(lambda y: int(y), cache.cache.keys())))
print(cache.get(minke))
cache.set("30", 30)
print(cache.cache.keys())
上面只是近似Redis的类LRU做法。真实的LRU就是一个双向链表。进入的会放在头部,淘汰从末尾淘汰。之所以Redis没选择这种方式,主要还是考虑到性能的问题。本身Redis就是一个大的dict。
其实上面的LRU是有问题的,因为新加入且未被访问的也有可能被淘汰。而这个key很有可能将来被访问到。
Redis4.0开始引入LFU机制。简单描述就是为每个对象维护一个计数器,然后同样是LRU的思想,在池子里淘汰计数器最小的。
看上面,乍一看是存在问题的。
1、如果一个key存放了很长时间,计数值很大了,但后续可能都不会再访问了,那就一直都不会被清除吗?
2、新加入的对象,还没有被访问,如果单纯按照访问次数,那进来就得被淘汰。
对于第一个问题,是这样,LFU算法同样还是用lru字段,只是这24bit的前16位表示最近一次计数器变化的时间,后8位表示计数器。对象每次被访问,计数会增加,但增加的速度是随着时间变小的,即不是匀速增加,类似初中物理,加速度是逐渐变小的,最多是255。此外,随着时间的推移,计数值会不断衰减的。如果长时间不访问,就算计数值很大,也是会慢慢衰减,最后被淘汰。
对于第二个问题,新生的key,value对象的计数值默认是5。
更新LFU的逻辑源码:
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
//先根据时间戳进行相应的减少
unsigned long counter = LFUDecrAndReturn(val);
//因为被访问,计数自增
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
/* If the object decrement time is reached decrement the LFU counter but
* do not update LFU fields of the object, we update the access time
* and counter in an explicit way when the object is really accessed.
* And we will times halve the counter according to the times of
* elapsed time than server.lfu_decay_time.
* Return the object frequency counter.
*
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
//获取低8位counter
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
/* Logarithmically increment a counter. The greater is the current counter value
* the less likely is that it gets really implemented. Saturate it at 255. */
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
2、Redis缓存异常现象
目前比较有代表性的就是缓存穿透、缓存雪崩、缓存击穿等等。
缓存雪崩就是在几乎同一时间有大量的key都过期了,导致大量key的缓存无法查询到。
对于缓存雪崩问题,可以进行限流或者增加一级客户端缓存等手段来避免。此外,不同的Key的实效时间尽量不要分布在同一个时间段,至少都不要集中在并发量较高的时间段,比如大促,秒杀等等。
缓存击穿是某个key在某个时间过期了,而这个时间又突然有高并发的请求过来了,那么就会同时将大量请求打到DB上。
对于缓存击穿,当有多个请求过来时,利用分布式锁,先得到锁的线程如果读不到缓存,就读数据库,并写入到缓存。同时呢,其他线程如果此时拿不到锁,那么就等待一会儿,等拿到锁的写好缓存之后并释放锁之后,才可读取出数据。
缓存穿透的意思就是访问根本不存在的key,然后将请求都打到DB上。这个主要会出现的场景是有人恶意攻击。
对于缓存穿透,当我们从DB中查询的数据是空时,即一定是不存在的key,写同样写到DB中,只不过过期时间设置的短一下,或者当真正有这个数据存在的,再从DB写入到缓存中。
业界还有提到的方案是布隆过滤器,
。这过滤器相当于一个list,简单思想就是对于每一个Key,进行3次hash,分别得到三个值,然后对应位置写成1。如果不全是1的,就证明不存在,不存在的就不查缓存了。但我觉得因为他是通过Hash来计算位置的,是存在hash冲突的风险的,所以还是有一定的误伤的。比如碰巧对应key的三个位置都是1,但可能这三个位置的值不是真实它计算出来的,那么我们还认为他是应该存在的,还是会一遍遍的去查缓存。
3、缓存一致性
正常我们为了提高查询效率,不直接去触及Mysql那一层,而是建立缓存。建立缓存一般都会建立一级缓存和二级缓存。一级缓存就是本地缓存,二级缓存就是Redis等缓存。通常我们查询Redis后,业务客户端会在服务器本地写上一层缓存。对于这种情况,主要存在的一个问题是如果Mysql数据有变更,缓存如果不及时变化会导致数据不一致的问题。
对于上述问题,一般的做法是如果Mysql数据有变化,会直接删除Redis的缓存,待下次再查询时会再写一次Redis缓存。但是对于本地产生的缓存还是很难处理的。因为我们现在的系统都是分布式的,就算是本地缓存设置了失效时间,但每个节点的失效时间也是不同的,同样会存在脏读或者幻读的问题。
基于此,Redis6迎来了一个新的特性,学名叫Redis client side caching。它是用某种模式保证Redis缓存有变化时,可以及时通知客户端,从而保证数据的一致性。它主要是通过RESP3协议实现的。主要就是提供更丰富的语义,来达到通知客户端某种信息的目的。
简单描述实现方式就是服务器会保留每一个唯一的连接,任何发送过只读某个key的都会被记录。对应key如果被更新或删除,都会通知到对应的客户端,告诉本地缓存已经失效,本地缓存可以直接删除缓存。
目前Java的lettuce已经实现了客户端缓存。
它的一个基本思想就是:为每一个连接都建立一个listener,该listener主要处理Redis服务端push的消息。
如果缓存数据被更新过了,Redis服务端会主动Push消息到对应客户端.上面的listener就会进行处理。
看下代码:
class DefaultRedisCache<K, V> implements RedisCache<K, V> {
private final StatefulRedisConnection<K, V> connection;
private final RedisCodec<K, V> codec;
public DefaultRedisCache(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) {
this.connection = connection;
this.codec = codec;
}
@Override
public V get(K key) {
return connection.sync().get(key);
}
@Override
public void put(K key, V value) {
connection.sync().set(key, value);
}
@Override
public void addInvalidationListener(java.util.function.Consumer<? super K> listener) {
//如果push的消息是invalidate,就对传过来的listener进行处理。
connection.addListener(message -> {
if (message.getType().equals("invalidate")) {
List<Object> content = message.getContent(codec::decodeKey);
List<K> keys = (List<K>) content.get(1);
keys.forEach(listener);
}
});
}
@Override
public void close() {
connection.close();
}
}
可以看到,这是一个嵌套的listner,那具体处理消息的listener是谁呢?
当我们要使用客户端缓存时,需要调用ClientSideCaching.enable
public static <K, V> CacheFrontend<K, V> enable(CacheAccessor<K, V> cacheAccessor, StatefulRedisConnection<K, V> connection,
TrackingArgs tracking) {
connection.sync().clientTracking(tracking);
return create(cacheAccessor, connection);
}
}
首先先同步发一个client tracking on的命令,开启客户端缓存。然后调用:
private static <K, V> CacheFrontend<K, V> create(CacheAccessor<K, V> cacheAccessor, RedisCache<K, V> redisCache) {
ClientSideCaching<K, V> caching = new ClientSideCaching<>(cacheAccessor, redisCache);
//redis cache 是将遍历消息所有在该类加入的listener,内层的listener也是实际要执行的
redisCache.addInvalidationListener(caching::notifyInvalidate);
//添加一个Listner,最终执行的操作,即清除本地缓存。
caching.addInvalidationListener(cacheAccessor::evict);
return caching;
}
//RedisCache调用,真正执行的遍历listener
private void notifyInvalidate(K key) {
for (java.util.function.Consumer<K> invalidationListener : invalidationListeners) {
invalidationListener.accept(key);
}
}
@Override
public void close() {
redisCache.close();
}
public void addInvalidationListener(java.util.function.Consumer<K> invalidationListener) {
invalidationListeners.add(invalidationListener);
}
看到上面Lettuce实现的客户端缓存,主要有几个对象,一个当前连接,进程内缓存以及Redis缓存无效通知的消费者。
public class ClientSideCaching<K, V> implements CacheFrontend<K, V> {
private final CacheAccessor<K, V> cacheAccessor;
private final RedisCache<K, V> redisCache;
private final List<Consumer<K>> invalidationListeners = new CopyOnWriteArrayList<>();
private ClientSideCaching(CacheAccessor<K, V> cacheAccessor, RedisCache<K, V> redisCache) {
this.cacheAccessor = cacheAccessor;
this.redisCache = redisCache;
}
下面是使用lettuce的一个简单例子:
@Test
public void testRedisClientSideCache() throws Exception{
RedisURI redisURI = RedisURI.builder().withHost("127.0.0.1").withPort(6379).build();
RedisClient redisClient = RedisClient.create(redisURI);
StatefulRedisConnection<String,String> connection = redisClient.connect();
Map<String,String> clientCache = new ConcurrentHashMap<>();
CacheFrontend<String,Object> frontend = ClientSideCaching.enable(
CacheAccessor.forMap(clientCache),connection, TrackingArgs.Builder.enabled());
String key = "test";
while (true) {
String cachedValue = frontend.get(key);
System.out.println("当前的值为:--->"+cachedValue);
Thread.sleep(3000);}
}
我草,我今天突然觉得Lettuce挺香的。
参考资料:
微信分享/微信扫码阅读