Redis-cluster及predis客户端实现

Redis-Cluster集群未来会逐步取代Codis,因为它是Redis的亲生的,此外不需要其他任何的代理,更符合未来的趋势,就像目前如日中天的区块链,它也是去中心化的自治分布式系统。目前我们公司已经把所有的codis下线了,全部换成Redis-cluster集群。

Redis-cluster不再像之前一样可以选择database,它只支持slot,一共化分为16384个槽位。

槽位算法:

Cluster 默认会对 key 值使用 crc32 算法进行 hash 得到一个整数值,然后用这个整数
值对 16384 进行取模来得到具体槽位。

如果客户端访问了一个server,该节点收到命令后,会检查对应slot是不是在这,如果不是的话,还要分两个情况:

如果正在进行resharding过程,就返回ASK命令;

如果没有,就返回Moved命令,告诉具体的到底在哪个节点上。

对于客户端一般都是要实现获取根据key获取槽位的算法,从而准确地定位。

举个例子,Predis:

类:RedisCluster

 public function getSlotByKey($key)
    {
        $key = $this->extractKeyTag($key);
         //hash,使用的就是CRC32算法
        $hash = $this->distributor->hash($key);
        $slot = $this->distributor->getSlot($hash);

        return $slot;
    }



 //获取槽位的方法
 public function getSlot($hash)
    {
        $this->initialize();

        $ringKeys = $this->ringKeys;
        $upper = $this->ringKeysCount - 1;
        $lower = 0;

        while ($lower <= $upper) {
            $index = ($lower + $upper) >> 1;
            $item = $ringKeys[$index];

            if ($item > $hash) {
                $upper = $index - 1;
            } elseif ($item < $hash) {
                $lower = $index + 1;
            } else {
                return $item;
            }
        }

        return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->ringKeysCount)];
    }

建立链接获取结点的方法:

 public function getConnection(CommandInterface $command)
    {
        $slot = $this->strategy->getSlot($command);

        if (!isset($slot)) {
            throw new NotSupportedException(
                "Cannot use '{$command->getId()}' over clusters of connections."
            );
        }

        $node = $this->distributor->getBySlot($slot);

        return $node;
    }

对于客户端来讲,如果初始选择到错误的槽位,对应服务器会给他发送异常,可能值令是Moved或者Ask指令,信息中包括槽位编号和目标地址。

   protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
    {
        $details = explode(' ', $error->getMessage(), 2);

        switch ($details[0]) {
            case 'MOVED':
                return $this->onMovedResponse($command, $details[1]);

            case 'ASK':
                return $this->onAskResponse($command, $details[1]);

            default:
                return $error;
        }
    }
  protected function onMovedResponse(CommandInterface $command, $details)
    {
        list($slot, $connectionID) = explode(' ', $details, 2);

        if (!$connection = $this->getConnectionById($connectionID)) {
            $connection = $this->createConnection($connectionID);
        }

        if ($this->useClusterSlots) {
           //得到slot map ,使用的命令是cluster slots
            $this->askSlotsMap($connection);
        }

        $this->move($connection, $slot);
        $response = $this->executeCommand($command);

        return $response;
    }

当接到moved响应消息的时侯,要重新根据消息建立连接,获取到正确的结点,然后发送消息。

Redis-Cluster要禁止大Key,因为大key的迁移会导致系统稳定性下降,主要是集群在迁移的过程中时阻塞的。

最后说句无关的,就是Predis底层建立链接,发送消息的实现,就是建立socket链接:

  /**
     * {@inheritdoc}
     */
    public function connect()
    {
        if (!$this->isConnected()) {
            $this->resource = $this->createResource();

            return true;
        }

        return false;
    }





   * {@inheritdoc}
     */
    protected function createResource()
    {
        $parameters = $this->parameters;

        if ($parameters->scheme === 'unix') {
            $address = $parameters->path;
            $domain = AF_UNIX;
            $protocol = 0;
        } else {
            if (false === $address = self::getAddress($parameters)) {
                $this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
            }

            $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
            $protocol = SOL_TCP;
        }

        $socket = @socket_create($domain, SOCK_STREAM, $protocol);

        if (!is_resource($socket)) {
            $this->emitSocketError();
        }

        $this->setSocketOptions($socket, $parameters);
        $this->connectWithTimeout($socket, $address, $parameters);

        return $socket;
    }

其读和写都通过已经建立的资源:
/**
     * {@inheritdoc}
     */
    protected function write($buffer)
    {
        $socket = $this->getResource();

        while (($length = strlen($buffer)) > 0) {
            $written = socket_write($socket, $buffer, $length);

            if ($length === $written) {
                return;
            }

            if ($written === false) {
                $this->onConnectionError('Error while writing bytes to the server.');
            }

            $buffer = substr($buffer, $written);
        }
    }


  */
    public function read()
    {
        $socket = $this->getResource();
        $reader = $this->reader;

        while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
            if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) {
                $this->emitSocketError();
            }

            phpiredis_reader_feed($reader, $buffer);
        }

        if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
            return phpiredis_reader_get_reply($reader);
        } else {
            $this->onProtocolError(phpiredis_reader_get_error($reader));

            return;
        }
    }


看了一下源码,其实发现如果你想写一个Redis的客户端还是没问题的,只要了解Redis传输的协议文本,熟悉Redis,根据Redis是完全可以些出来的,记得刚学习golang的时侯,就自己写了一个,起名叫hredis,还上传了,虽然不怎么好,哈哈。不过,目前无论哪种语言,都已经有成熟的库了,没必要重复造轮子。

现在一般大公司,并不直接提供redis cluster给员工用,很多的选择是,首先将cluster用K8S部署,然后在前面加proxy,类似mysql的proxy,再加一层LVS。这样就是减小客户端的复杂度,让其可以像操作单机一样操作集群,其他工作都留给运维。比如我们公司使用的就是这种思想,虽然也有一部分直接操作redis-cluster,但前者使用方式已然是主流。当然,要说这样搭建方式没有直连的性能好。

架构类似下面:

上述架构的特点:

由K8S管理调度集群有更好的弹性伸缩能力和高可用特性,提供容器资源隔离,提高稳定性。通过LVS可以负载均衡请求;通过Proxy使得集群向上是透明的,简便的;通过容器资源隔离,提供更稳定的服务性能。此外,加一个monitor来监控整个redis架构,可使用grafna,类似下面:

--------EOF---------
微信分享/微信扫码阅读