Redis缓存

1、Redis缓存介绍,实现以及淘汰策略;

2、Redis缓存异常现象;

3、缓存一致性(Redis6新特性)

1、Redis缓存介绍,实现以及淘汰策略

Redis作为缓存使用,已经非常普遍了。由于其完全基于内存,且有丰富的存储结构以及持久化功能,此外处理性能极高,使其广受公司的青睐。缓存场景有很多,比如计数器,统计UV/PV等等。

由于其是放在内存中,那么就必然有一个最大内存的限制,如果达到了限制,就需要进行清除。内存限制可以配置。如果不配置,那么在64位操作系统就是没限制;在32位系统最大是3GB。

一般情况下,Redis会定时或定期清除已过期的数据,此外当内存超过最大限制时,在进行写入操作时,会同时直接垃圾回收。

如果超过内存限制,那么必须要进行垃圾回收。之前在垃圾回收其实也有过介绍。 垃圾回收机制的实现(php,python,golang,java)

那么Redis的垃圾回收的触发主要有定时删除和惰性删除,使用的策略就是主要的那几种,比如LRU,TTL等等。具体如下:

# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key according to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations

Redis默认使用的是noeviction.

先看下Redis的value的结构体:

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

说一下上面的LRU,Redis中的LRU是采用惰性删除的执行方式,而且并不是真正意义上的LRU,真正的LRU是建立一个双向链表,每当有数据加入就加入头部,如果超过最大长度限制,就从尾部移除。

LRU时钟获取方式:

unsigned int getLRUClock(void) {
    return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

Redis的实际操作步骤:当进行一个操作时,如果超出了最大内存限制,就随机选择5个key(选择什么样的key取决于配置的策略时volatile还是allkeys;选择几个还要根据配置来server.maxmemory_samples),然后淘汰最旧的key。这个最旧是根据每个对象的lru字段来判定的,该字段每次被访问的时候都会进行更新。如果内存还超,就继续执行上面的操作,直到低于最大限制。Redis之所以选择这种方式是为了减少额外的消耗。

看上面的算法有个问题就是这种随机选择的key,导致新添加的key也会有被剔除的可能。基于此,Redis3.0进行了一些优化。它主要是维护一个候选池(大小为16),池中的数据根据对象的lru时钟字段进行排序。第一次随机选取的key都会放入池中,随后每次随机选取的key只有在访问时间小于池中最小的时间才会放入池中,直到候选池被放满。当放满后,如果有新的key需要放入,则将池中lru最大的移除去。

当需要进行内存淘汰的时候,则直接从池中选取最近访问时间最小(最久没被访问)的key淘汰掉就行。

下面完全按照Redis3之后的LRU思想来实现的。

from datetime import datetime
import random
import time


#相当于Redis的value结构体
class Value:

    def __init__(self, value):
        self.value = value
        self.lru = datetime.now().time()  # 获取当前时间戳


class RedisLRU:
    RANDOM_KEY_NUM = 10

    POOL_KEY_MAX_NUM = 6

    def __init__(self, size=10):
        self.size = size
        self.cache = {}
        self.deal_pool = {}

    def get(self, key):
        if key in self.cache:
            vue_obj = self.cache[key]
            self.cache[key].lru = datetime.now().time()
            if key in self.deal_pool:
                self.deal_pool[key].lru = self.cache[key].lru
            print(self.deal_pool[key].lru)
            return vue_obj.value if isinstance(vue_obj, Value) else None
        return None

    def set(self, key, value):
        if len(self.cache) >= self.size:
            self.recyle()
        vue_obj = Value(value)
        self.cache[key] = vue_obj

    def mset(self, dict_item):
        if not isinstance(dict_item, dict):
            return
        while len(self.cache) + len(dict_item) >= self.size:
            self.recyle()
        for key, value in dict_item.items():
            vue_obj = Value(value)
            self.cache[key] = vue_obj

    def recyle(self):
        """
        算法:每次随机选择5个key,放入到pool中,按序排列,放入的key一定是比当前最小的还小。如果放满了,直接剔除lru最大的那个
        :return:
        """
        random_keys = random.sample(self.cache.keys(), RedisLRU.RANDOM_KEY_NUM)
        for key in random_keys:
            if len(self.deal_pool) < RedisLRU.POOL_KEY_MAX_NUM:
                if key not in self.deal_pool:
                    self.deal_pool[key] = self.cache[key]
        if len(self.deal_pool) == RedisLRU.POOL_KEY_MAX_NUM:
            # 删除pool LRU最大的
            max_pool_key = max(self.deal_pool.keys(), key=(lambda k: self.deal_pool[k]))
            self.deal_pool.pop(max_pool_key)
        # 直接淘汰poll中LRU最小的
        key = min(self.deal_pool.keys(), key=(lambda k: self.deal_pool[k].lru))
        self.cache.pop(key)
        self.deal_pool.pop(key)


if __name__ == "__main__":
    cache = RedisLRU()
    num_list = range(1, 15)
    for x in num_list:
        cache.set(str(x), x)
        time.sleep(1)
    minke = str(min(map(lambda y: int(y), cache.cache.keys())))
    print(cache.get(minke))
    cache.set("30", 30)
    print(cache.cache.keys())

上面只是近似Redis的类LRU做法。真实的LRU就是一个双向链表。进入的会放在头部,淘汰从末尾淘汰。之所以Redis没选择这种方式,主要还是考虑到性能的问题。本身Redis就是一个大的dict。

其实上面的LRU是有问题的,因为新加入且未被访问的也有可能被淘汰。而这个key很有可能将来被访问到。

Redis4.0开始引入LFU机制。简单描述就是为每个对象维护一个计数器,然后同样是LRU的思想,在池子里淘汰计数器最小的。

看上面,乍一看是存在问题的。

1、如果一个key存放了很长时间,计数值很大了,但后续可能都不会再访问了,那就一直都不会被清除吗?

2、新加入的对象,还没有被访问,如果单纯按照访问次数,那进来就得被淘汰。

对于第一个问题,是这样,LFU算法同样还是用lru字段,只是这24bit的前16位表示最近一次计数器变化的时间,后8位表示计数器。对象每次被访问,计数会增加,但增加的速度是随着时间变小的,即不是匀速增加,类似初中物理,加速度是逐渐变小的,最多是255。此外,随着时间的推移,计数值会不断衰减的。如果长时间不访问,就算计数值很大,也是会慢慢衰减,最后被淘汰。

对于第二个问题,新生的key,value对象的计数值默认是5。

更新LFU的逻辑源码:

/* Update LFU when an object is accessed.
 * Firstly, decrement the counter if the decrement time is reached.
 * Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
    //先根据时间戳进行相应的减少
    unsigned long counter = LFUDecrAndReturn(val);
    //因为被访问,计数自增
    counter = LFULogIncr(counter);
    val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}




/* If the object decrement time is reached decrement the LFU counter but
 * do not update LFU fields of the object, we update the access time
 * and counter in an explicit way when the object is really accessed.
 * And we will times halve the counter according to the times of
 * elapsed time than server.lfu_decay_time.
 * Return the object frequency counter.
 *
 * This function is used in order to scan the dataset for the best object
 * to fit: as we check for the candidate, we incrementally decrement the
 * counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
   //获取低8位counter
    unsigned long ldt = o->lru >> 8;
    unsigned long counter = o->lru & 255;
    unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
        counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}


/* Logarithmically increment a counter. The greater is the current counter value
 * the less likely is that it gets really implemented. Saturate it at 255. */
uint8_t LFULogIncr(uint8_t counter) {
    if (counter == 255) return 255;
    double r = (double)rand()/RAND_MAX;
    double baseval = counter - LFU_INIT_VAL;
    if (baseval < 0) baseval = 0;
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

2、Redis缓存异常现象

目前比较有代表性的就是缓存穿透、缓存雪崩、缓存击穿等等。

缓存雪崩就是在几乎同一时间有大量的key都过期了,导致大量key的缓存无法查询到。

对于缓存雪崩问题,可以进行限流或者增加一级客户端缓存等手段来避免。此外,不同的Key的实效时间尽量不要分布在同一个时间段,至少都不要集中在并发量较高的时间段,比如大促,秒杀等等。

缓存击穿是某个key在某个时间过期了,而这个时间又突然有高并发的请求过来了,那么就会同时将大量请求打到DB上。

对于缓存击穿,当有多个请求过来时,利用分布式锁,先得到锁的线程如果读不到缓存,就读数据库,并写入到缓存。同时呢,其他线程如果此时拿不到锁,那么就等待一会儿,等拿到锁的写好缓存之后并释放锁之后,才可读取出数据。

缓存穿透的意思就是访问根本不存在的key,然后将请求都打到DB上。这个主要会出现的场景是有人恶意攻击。

对于缓存穿透,当我们从DB中查询的数据是空时,即一定是不存在的key,写同样写到DB中,只不过过期时间设置的短一下,或者当真正有这个数据存在的,再从DB写入到缓存中。

业界还有提到的方案是布隆过滤器,

。这过滤器相当于一个list,简单思想就是对于每一个Key,进行3次hash,分别得到三个值,然后对应位置写成1。如果不全是1的,就证明不存在,不存在的就不查缓存了。但我觉得因为他是通过Hash来计算位置的,是存在hash冲突的风险的,所以还是有一定的误伤的。比如碰巧对应key的三个位置都是1,但可能这三个位置的值不是真实它计算出来的,那么我们还认为他是应该存在的,还是会一遍遍的去查缓存。

3、缓存一致性

正常我们为了提高查询效率,不直接去触及Mysql那一层,而是建立缓存。建立缓存一般都会建立一级缓存和二级缓存。一级缓存就是本地缓存,二级缓存就是Redis等缓存。通常我们查询Redis后,业务客户端会在服务器本地写上一层缓存。对于这种情况,主要存在的一个问题是如果Mysql数据有变更,缓存如果不及时变化会导致数据不一致的问题。

对于上述问题,一般的做法是如果Mysql数据有变化,会直接删除Redis的缓存,待下次再查询时会再写一次Redis缓存。但是对于本地产生的缓存还是很难处理的。因为我们现在的系统都是分布式的,就算是本地缓存设置了失效时间,但每个节点的失效时间也是不同的,同样会存在脏读或者幻读的问题。

基于此,Redis6迎来了一个新的特性,学名叫Redis client side caching。它是用某种模式保证Redis缓存有变化时,可以及时通知客户端,从而保证数据的一致性。它主要是通过RESP3协议实现的。主要就是提供更丰富的语义,来达到通知客户端某种信息的目的。

简单描述实现方式就是服务器会保留每一个唯一的连接,任何发送过只读某个key的都会被记录。对应key如果被更新或删除,都会通知到对应的客户端,告诉本地缓存已经失效,本地缓存可以直接删除缓存。

目前Java的lettuce已经实现了客户端缓存。

它的一个基本思想就是:为每一个连接都建立一个listener,该listener主要处理Redis服务端push的消息。

如果缓存数据被更新过了,Redis服务端会主动Push消息到对应客户端.上面的listener就会进行处理。

看下代码:

class DefaultRedisCache<K, V> implements RedisCache<K, V> {

    private final StatefulRedisConnection<K, V> connection;

    private final RedisCodec<K, V> codec;

    public DefaultRedisCache(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) {
        this.connection = connection;
        this.codec = codec;
    }

    @Override
    public V get(K key) {
        return connection.sync().get(key);
    }

    @Override
    public void put(K key, V value) {
        connection.sync().set(key, value);
    }

    @Override
    public void addInvalidationListener(java.util.function.Consumer<? super K> listener) {
        //如果push的消息是invalidate,就对传过来的listener进行处理。
        connection.addListener(message -> {
            if (message.getType().equals("invalidate")) {

                List<Object> content = message.getContent(codec::decodeKey);
                List<K> keys = (List<K>) content.get(1);
                keys.forEach(listener);
            }
        });
    }

    @Override
    public void close() {
        connection.close();
    }

}

可以看到,这是一个嵌套的listner,那具体处理消息的listener是谁呢?

当我们要使用客户端缓存时,需要调用ClientSideCaching.enable

 public static <K, V> CacheFrontend<K, V> enable(CacheAccessor<K, V> cacheAccessor, StatefulRedisConnection<K, V> connection,
            TrackingArgs tracking) {

        connection.sync().clientTracking(tracking);

        return create(cacheAccessor, connection);
    }


}

首先先同步发一个client tracking on的命令,开启客户端缓存。然后调用:

 private static <K, V> CacheFrontend<K, V> create(CacheAccessor<K, V> cacheAccessor, RedisCache<K, V> redisCache) {

        ClientSideCaching<K, V> caching = new ClientSideCaching<>(cacheAccessor, redisCache);
        //redis cache 是将遍历消息所有在该类加入的listener,内层的listener也是实际要执行的
        redisCache.addInvalidationListener(caching::notifyInvalidate);
        //添加一个Listner,最终执行的操作,即清除本地缓存。
        caching.addInvalidationListener(cacheAccessor::evict);

        return caching;
    }


    //RedisCache调用,真正执行的遍历listener
    private void notifyInvalidate(K key) {

        for (java.util.function.Consumer<K> invalidationListener : invalidationListeners) {
            invalidationListener.accept(key);
        }
    }

    @Override
    public void close() {
        redisCache.close();
    }

    public void addInvalidationListener(java.util.function.Consumer<K> invalidationListener) {
        invalidationListeners.add(invalidationListener);
    }

看到上面Lettuce实现的客户端缓存,主要有几个对象,一个当前连接,进程内缓存以及Redis缓存无效通知的消费者。

public class ClientSideCaching<K, V> implements CacheFrontend<K, V> {

    private final CacheAccessor<K, V> cacheAccessor;

    private final RedisCache<K, V> redisCache;

    private final List<Consumer<K>> invalidationListeners = new CopyOnWriteArrayList<>();

    private ClientSideCaching(CacheAccessor<K, V> cacheAccessor, RedisCache<K, V> redisCache) {
        this.cacheAccessor = cacheAccessor;
        this.redisCache = redisCache;
    }

下面是使用lettuce的一个简单例子:

    @Test
    public void testRedisClientSideCache() throws Exception{
        RedisURI redisURI = RedisURI.builder().withHost("127.0.0.1").withPort(6379).build();
        RedisClient redisClient = RedisClient.create(redisURI);
        StatefulRedisConnection<String,String> connection = redisClient.connect();
        Map<String,String> clientCache = new ConcurrentHashMap<>();
        CacheFrontend<String,Object> frontend = ClientSideCaching.enable(
                CacheAccessor.forMap(clientCache),connection, TrackingArgs.Builder.enabled());
        String key = "test";
        while (true) {
            String cachedValue = frontend.get(key);
            System.out.println("当前的值为:--->"+cachedValue);
            Thread.sleep(3000);}
    }

我草,我今天突然觉得Lettuce挺香的。

参考资料:

Redis官网

理解Redis的内存回收机制和过期淘汰策略

Redis 6.0 客户端缓存特性及实践

【Redis源码分析】Redis中的LRU算法实现

Redis的LFU算法

缓存穿透、缓存击穿、缓存雪崩,看这篇就够了

--------EOF---------
本文微信分享/扫码阅读