Redis-cluster及predis客户端实现
Redis-Cluster集群未来会逐步取代Codis,因为它是Redis的亲生的,此外不需要其他任何的代理,更符合未来的趋势,就像目前如日中天的区块链,它也是去中心化的自治分布式系统。目前我们公司已经把所有的codis下线了,全部换成Redis-cluster集群。
Redis-cluster不再像之前一样可以选择database,它只支持slot,一共化分为16384个槽位。
槽位算法:
Cluster 默认会对 key 值使用 crc32 算法进行 hash 得到一个整数值,然后用这个整数
值对 16384 进行取模来得到具体槽位。
如果客户端访问了一个server,该节点收到命令后,会检查对应slot是不是在这,如果不是的话,还要分两个情况:
如果正在进行resharding过程,就返回ASK命令;
如果没有,就返回Moved命令,告诉具体的到底在哪个节点上。
对于客户端一般都是要实现获取根据key获取槽位的算法,从而准确地定位。
举个例子,Predis:
类:RedisCluster
public function getSlotByKey($key)
{
$key = $this->extractKeyTag($key);
//hash,使用的就是CRC32算法
$hash = $this->distributor->hash($key);
$slot = $this->distributor->getSlot($hash);
return $slot;
}
//获取槽位的方法
public function getSlot($hash)
{
$this->initialize();
$ringKeys = $this->ringKeys;
$upper = $this->ringKeysCount - 1;
$lower = 0;
while ($lower <= $upper) {
$index = ($lower + $upper) >> 1;
$item = $ringKeys[$index];
if ($item > $hash) {
$upper = $index - 1;
} elseif ($item < $hash) {
$lower = $index + 1;
} else {
return $item;
}
}
return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->ringKeysCount)];
}
建立链接获取结点的方法:
public function getConnection(CommandInterface $command)
{
$slot = $this->strategy->getSlot($command);
if (!isset($slot)) {
throw new NotSupportedException(
"Cannot use '{$command->getId()}' over clusters of connections."
);
}
$node = $this->distributor->getBySlot($slot);
return $node;
}
对于客户端来讲,如果初始选择到错误的槽位,对应服务器会给他发送异常,可能值令是Moved或者Ask指令,信息中包括槽位编号和目标地址。
protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
{
$details = explode(' ', $error->getMessage(), 2);
switch ($details[0]) {
case 'MOVED':
return $this->onMovedResponse($command, $details[1]);
case 'ASK':
return $this->onAskResponse($command, $details[1]);
default:
return $error;
}
}
protected function onMovedResponse(CommandInterface $command, $details)
{
list($slot, $connectionID) = explode(' ', $details, 2);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
}
if ($this->useClusterSlots) {
//得到slot map ,使用的命令是cluster slots
$this->askSlotsMap($connection);
}
$this->move($connection, $slot);
$response = $this->executeCommand($command);
return $response;
}
当接到moved响应消息的时侯,要重新根据消息建立连接,获取到正确的结点,然后发送消息。
Redis-Cluster要禁止大Key,因为大key的迁移会导致系统稳定性下降,主要是集群在迁移的过程中时阻塞的。
最后说句无关的,就是Predis底层建立链接,发送消息的实现,就是建立socket链接:
/**
* {@inheritdoc}
*/
public function connect()
{
if (!$this->isConnected()) {
$this->resource = $this->createResource();
return true;
}
return false;
}
* {@inheritdoc}
*/
protected function createResource()
{
$parameters = $this->parameters;
if ($parameters->scheme === 'unix') {
$address = $parameters->path;
$domain = AF_UNIX;
$protocol = 0;
} else {
if (false === $address = self::getAddress($parameters)) {
$this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
}
$domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
$protocol = SOL_TCP;
}
$socket = @socket_create($domain, SOCK_STREAM, $protocol);
if (!is_resource($socket)) {
$this->emitSocketError();
}
$this->setSocketOptions($socket, $parameters);
$this->connectWithTimeout($socket, $address, $parameters);
return $socket;
}
其读和写都通过已经建立的资源:
/**
* {@inheritdoc}
*/
protected function write($buffer)
{
$socket = $this->getResource();
while (($length = strlen($buffer)) > 0) {
$written = socket_write($socket, $buffer, $length);
if ($length === $written) {
return;
}
if ($written === false) {
$this->onConnectionError('Error while writing bytes to the server.');
}
$buffer = substr($buffer, $written);
}
}
*/
public function read()
{
$socket = $this->getResource();
$reader = $this->reader;
while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) {
$this->emitSocketError();
}
phpiredis_reader_feed($reader, $buffer);
}
if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
return phpiredis_reader_get_reply($reader);
} else {
$this->onProtocolError(phpiredis_reader_get_error($reader));
return;
}
}
看了一下源码,其实发现如果你想写一个Redis的客户端还是没问题的,只要了解Redis传输的协议文本,熟悉Redis,根据Redis是完全可以些出来的,记得刚学习golang的时侯,就自己写了一个,起名叫hredis,还上传了,虽然不怎么好,哈哈。不过,目前无论哪种语言,都已经有成熟的库了,没必要重复造轮子。
现在一般大公司,并不直接提供redis cluster给员工用,很多的选择是,首先将cluster用K8S部署,然后在前面加proxy,类似mysql的proxy,再加一层LVS。这样就是减小客户端的复杂度,让其可以像操作单机一样操作集群,其他工作都留给运维。比如我们公司使用的就是这种思想,虽然也有一部分直接操作redis-cluster,但前者使用方式已然是主流。当然,要说这样搭建方式没有直连的性能好。
架构类似下面:
上述架构的特点:
由K8S管理调度集群有更好的弹性伸缩能力和高可用特性,提供容器资源隔离,提高稳定性。通过LVS可以负载均衡请求;通过Proxy使得集群向上是透明的,简便的;通过容器资源隔离,提供更稳定的服务性能。此外,加一个monitor来监控整个redis架构,可使用grafna,类似下面:
微信分享/微信扫码阅读