sentinel学习

最近自己在写秒杀服务的时候,接触到了阿里的sentinel,发现这真是个好东西。如果想要实现流控和熔断降级,不需要自己在造轮子实现,只需要一个注解即可。虽然市面上也有很多类似的,比如Hytrix,但感觉没有sentinel好用。

本文主要分两个部分阐述。一个是简单使用,一个是sentinel原理实现。

一、sentinel使用

技术栈:springboot2.2.4、nacos、sentinel1.7.1、sentinel-dashboard

nacos主要是要持久化规则的,否则在dashboard中维护的,重启之后就没了。

1、pom引入依赖:

  <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
            <version>1.7.0</version>
        </dependency>

2、springboot配置

cloud:
    nacos:
      discovery:
        server-addr: 10.38.163.218:80 #配置Nacos地址
    sentinel:
      transport:
        dashboard: localhost:8080 #配置sentinel dashboard地址
        port: 8719

      datasource:
        ds1:
          nacos:
            server-addr:  10.38.163.218:80
            dataId: hbnnmall-sentinel-service
            groupId: DEFAULT_GROUP
            data-type: json
            rule-type: flow

        ds2:
          nacos:
            server-addr:  10.38.163.218:80
            dataId: hbnnmall-sentinel-degrade
            groupId: DEFAULT_GROUP
            data-type: json
            rule-type: degrade

3、流控和降级规则配置

4、服务使用sentinel


    @Override
    @SentinelResource(value = "secKillRateLimit", blockHandlerClass = CustomSentinelException.class,blockHandler = "exceptionHandler")
    public SecKillResult secKill(SecKillRequest secKillRequest) throws Exception{
        log.info("Start to call secKill ,request:{}",secKillRequest);
        rateLimiteService.seckillRateLimit(secKillRequest);
        secKillRiskService.antiCheat(secKillRequest);
   
     ......
   }

服务如果达到规则设置的阈值,sentinel会自动拦截,会抛出BlockException的子类。如果要自定义异常处理函数,需要单独开发。但对应函数如果在其他类中,必须是static静态方法,且请求和响应类必须相同,这个是重点。

public class CustomSentinelException {


    public static SecKillResult exceptionHandler(SecKillRequest secKillRequest, BlockException ex){
        log.error("blocked by rate limit rule. request:{},ex:{}",secKillRequest,ex);
        throw new BussinessException(BizExceptionEnum.SECKILL_SOMUCH_HOST.getCode(),BizExceptionEnum.SECKILL_SOMUCH_HOST.getMessage());
    }

    public static String testSentinel(BlockException ex){
        log.error("degrade by rule,er:",ex);
        throw new BussinessException(500,"Sorry,后端服务被降级了");
    }
}

加入之后,启动nacos,sprinbott,sentinel-dashboard.然后需要主动调用一下sentinel注解服务,这样dashboard才能看见。

上面简单介绍了sentinel使用。下面说下sentinel实现的原理。

二、sentinel实现原理

说明:这个只是我自己记录写给自己看的,本身写得并不是很细致,其实可以看这篇文章: sentinel-tutorial 讲解地非常细致,很赞!

上面代码通过SentinelSource注解就可以实现了。SentinelSource注解源码:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
       //首先获取原始的方法
        Method originMethod = resolveMethod(pjp);
 
        //获取SentinelSource注解
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        //获取资源名称,主要是value值,如果没有就取方法名
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            //在真正执行目的方法前,执行SphU.entry方法
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            Object result = pjp.proceed();
            return result;
           //sentinel如果拦截了,都会抛出BlockException的子类。
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

上面的Sphu.entry就是sentinel实现控制的入口。sentinel会为每个资源创建一个调用链,用来进行各种规则的check。

随着一些列调用,最后在第一次调用Env的静态变量时,会触发初始化:

/**
 * Sentinel Env. This class will trigger all initialization for Sentinel.
 *
 * <p>
 * NOTE: to prevent deadlocks, other classes' static code block or static field should
 * NEVER refer to this class.
 * </p>
 *
 * @author jialiang.linjl
 */
public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}

上面重要的是初始化Init.Func,通过SPI机制加载。 实现类都在META-INF中。

随后会执行Csph的entryWithType方法,最后会进行处理chain的初始化,sentinel称之为插槽,slot。

下面的方法比较重要:

 根据优先级 
 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        //首先会获得当前线程的上下文,是一个ThreadLocal变量。一个上下文代表一个入口节点。
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }


        if (context == null) {
            // Using default context.创建上线文用的是ReentrantLock
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        //创建调用链,ProcessorSlotChain对象
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
      
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //开始依次执行各个slot的规则校验
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

创建ProcessorSlotChain也是使用SPI机制。实现类是DefaultSlotChainBuilder

 public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // Resolve the slot chain builder SPI.实现类就是DefaultSlotChainBuilder,是通过SPI机制加载的
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                + slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }

    private SlotChainProvider() {}
}

它定义的slot有如下几种:

 @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

从上面可以看出端倪,ProcessorSlotChain对象类似一个链表。

可以看下其源码:

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }

    /**
     * Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
     *
     * @param next processor to be added.
     */
    @Override
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        addLast(next);
    }

    @Override
    public AbstractLinkedProcessorSlot<?> getNext() {
        return first.getNext();
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        first.exit(context, resourceWrapper, count, args);
    }

}
  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

从上面代码中已也可以看到,最先执行的slot是NodeSelector,

first.entry进入之后,如果next不为空,就会开始调用具体的slot实现类。我这里主要看FlowSlot和DegradeSlot。

FlowSlot的entry方法:

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

看一下checkFlow,主要是调用FlowChecker的方法:

 public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //根据资源名,获取对应的流控规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
               //针对每个规则进行校验
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

规则配置,我采用的是push模式,即用nacos推送。此外还有PULL模式,API模式。线上一般都用push模式。

/*
 * 规则持久化 - 推模式
 * Sentinel控制台不再是调用客户端的API推送规则数据,而是将规则推送到Nacos或其他远程配置中心
 * Sentinel客户端通过连接Nacos,来获取规则配置;并监听Nacos配置变化,如发生变化,就更新本地缓存(从而让本地缓存总是和Nacos一致)
 * Sentinel控制台也监听Nacos配置变化,如发生变化就更新本地缓存(从而让Sentinel控制台的本地缓存总是和Nacos一致)
 * */
@Configuration
public class NacosDataSourceConfig {

    @Autowired
    private SentinelProperties sentinelProperties;

    @Bean
    public NacosDataSourceConfig init() throws Exception {

        // NacosSource初始化,从Nacos中获取熔断规则
        sentinelProperties.getDatasource().entrySet().stream().filter(map -> {
            return map.getValue().getNacos() != null;
        }).forEach(map -> {
            NacosDataSourceProperties nacos = map.getValue().getNacos();
            ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new NacosDataSource<List<FlowRule>>(nacos.getServerAddr(),
                    nacos.getGroupId(), nacos.getDataId(), source -> JSON.parseObject(source,
                    new TypeReference<List<FlowRule>>() {}));
            FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
        });
        return new NacosDataSourceConfig();
    }

}

此外,sentinel还可以进行动态加载配置。如果配置更改了,它也会进行updateValue操作。

接着看具体的限流策略,其实这个在初始化时,根据我们的配置决定的:

FlowRuleUtil

  private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        //如果是根据QPS限流
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            //controllerBeahvior是我们自己配置的,上面有例子。
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        //这是默认的
        return new DefaultController(rule.getCount(), rule.getGrade());
    }

下面的就是采用某种具体的限流策略了。之前在限流算法的文章里也讲述过具体的限流算法。sentinel主要采用的还是滑动窗口算法。

降级slot主要是会根据RT,异常比例和异常数等规则类型来进行拦截,上面的截图有相关的配置。

DegradeSlot的代码不是很复杂。

 @Override
    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        if (cut.get()) {
            return false;
        }

        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
        if (clusterNode == null) {
            return true;
        }

        if (grade == RuleConstant.DEGRADE_GRADE_RT) {
            double rt = clusterNode.avgRt();
            if (rt < this.count) {
                passCount.set(0);
                return true;
            }

            // Sentinel will degrade the service only if count exceeds.
            if (passCount.incrementAndGet() < rtSlowRequestAmount) {
                return true;
            }
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
            double exception = clusterNode.exceptionQps();
            double success = clusterNode.successQps();
            double total = clusterNode.totalQps();
            // If total amount is less than minRequestAmount, the request will pass.
            if (total < minRequestAmount) {
                return true;
            }

            // In the same aligned statistic time window,
            // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
            double realSuccess = success - exception;
            if (realSuccess <= 0 && exception < minRequestAmount) {
                return true;
            }

            if (exception / success < count) {
                return true;
            }
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
            double exception = clusterNode.totalException();
            if (exception < count) {
                return true;
            }
        }

        if (cut.compareAndSet(false, true)) {
            ResetTask resetTask = new ResetTask(this);
            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
        }

        return false;
    }

接下来要研究一下集群的使用。

参考资料:

sentinel官网-sentinel是如何实现的

sentinel源码分析

通过Sentinel了解限流算法

Spring Cloud Alibaba Sentinel 整合 Feign 的设计实现

sentinel核心源码分析

--------EOF---------
微信分享/微信扫码阅读