sentinel学习
最近自己在写秒杀服务的时候,接触到了阿里的sentinel,发现这真是个好东西。如果想要实现流控和熔断降级,不需要自己在造轮子实现,只需要一个注解即可。虽然市面上也有很多类似的,比如Hytrix,但感觉没有sentinel好用。
本文主要分两个部分阐述。一个是简单使用,一个是sentinel原理实现。
一、sentinel使用
技术栈:springboot2.2.4、nacos、sentinel1.7.1、sentinel-dashboard
nacos主要是要持久化规则的,否则在dashboard中维护的,重启之后就没了。
1、pom引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
<version>1.7.0</version>
</dependency>
2、springboot配置
cloud:
nacos:
discovery:
server-addr: 10.38.163.218:80 #配置Nacos地址
sentinel:
transport:
dashboard: localhost:8080 #配置sentinel dashboard地址
port: 8719
datasource:
ds1:
nacos:
server-addr: 10.38.163.218:80
dataId: hbnnmall-sentinel-service
groupId: DEFAULT_GROUP
data-type: json
rule-type: flow
ds2:
nacos:
server-addr: 10.38.163.218:80
dataId: hbnnmall-sentinel-degrade
groupId: DEFAULT_GROUP
data-type: json
rule-type: degrade
3、流控和降级规则配置
4、服务使用sentinel
@Override
@SentinelResource(value = "secKillRateLimit", blockHandlerClass = CustomSentinelException.class,blockHandler = "exceptionHandler")
public SecKillResult secKill(SecKillRequest secKillRequest) throws Exception{
log.info("Start to call secKill ,request:{}",secKillRequest);
rateLimiteService.seckillRateLimit(secKillRequest);
secKillRiskService.antiCheat(secKillRequest);
......
}
服务如果达到规则设置的阈值,sentinel会自动拦截,会抛出BlockException的子类。如果要自定义异常处理函数,需要单独开发。但对应函数如果在其他类中,必须是static静态方法,且请求和响应类必须相同,这个是重点。
public class CustomSentinelException {
public static SecKillResult exceptionHandler(SecKillRequest secKillRequest, BlockException ex){
log.error("blocked by rate limit rule. request:{},ex:{}",secKillRequest,ex);
throw new BussinessException(BizExceptionEnum.SECKILL_SOMUCH_HOST.getCode(),BizExceptionEnum.SECKILL_SOMUCH_HOST.getMessage());
}
public static String testSentinel(BlockException ex){
log.error("degrade by rule,er:",ex);
throw new BussinessException(500,"Sorry,后端服务被降级了");
}
}
加入之后,启动nacos,sprinbott,sentinel-dashboard.然后需要主动调用一下sentinel注解服务,这样dashboard才能看见。
上面简单介绍了sentinel使用。下面说下sentinel实现的原理。
二、sentinel实现原理
说明:这个只是我自己记录写给自己看的,本身写得并不是很细致,其实可以看这篇文章: sentinel-tutorial 讲解地非常细致,很赞!
上面代码通过SentinelSource注解就可以实现了。SentinelSource注解源码:
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
//首先获取原始的方法
Method originMethod = resolveMethod(pjp);
//获取SentinelSource注解
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
//获取资源名称,主要是value值,如果没有就取方法名
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
//在真正执行目的方法前,执行SphU.entry方法
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
Object result = pjp.proceed();
return result;
//sentinel如果拦截了,都会抛出BlockException的子类。
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
上面的Sphu.entry就是sentinel实现控制的入口。sentinel会为每个资源创建一个调用链,用来进行各种规则的check。
随着一些列调用,最后在第一次调用Env的静态变量时,会触发初始化:
/**
* Sentinel Env. This class will trigger all initialization for Sentinel.
*
* <p>
* NOTE: to prevent deadlocks, other classes' static code block or static field should
* NEVER refer to this class.
* </p>
*
* @author jialiang.linjl
*/
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
上面重要的是初始化Init.Func,通过SPI机制加载。 实现类都在META-INF中。
随后会执行Csph的entryWithType方法,最后会进行处理chain的初始化,sentinel称之为插槽,slot。
下面的方法比较重要:
根据优先级
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//首先会获得当前线程的上下文,是一个ThreadLocal变量。一个上下文代表一个入口节点。
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.创建上线文用的是ReentrantLock
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//创建调用链,ProcessorSlotChain对象
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//开始依次执行各个slot的规则校验
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
创建ProcessorSlotChain也是使用SPI机制。实现类是DefaultSlotChainBuilder
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.实现类就是DefaultSlotChainBuilder,是通过SPI机制加载的
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
它定义的slot有如下几种:
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
从上面可以看出端倪,ProcessorSlotChain对象类似一个链表。
可以看下其源码:
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
/**
* Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
*
* @param next processor to be added.
*/
@Override
public void setNext(AbstractLinkedProcessorSlot<?> next) {
addLast(next);
}
@Override
public AbstractLinkedProcessorSlot<?> getNext() {
return first.getNext();
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
first.exit(context, resourceWrapper, count, args);
}
}
-
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; -
ClusterBuilderSlot
则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据; -
StatisticSlot
则用于记录、统计不同纬度的 runtime 指标监控信息; -
FlowSlot
则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; -
AuthoritySlot
则根据配置的黑白名单和调用来源信息,来做黑白名单控制; -
DegradeSlot
则通过统计信息以及预设的规则,来做熔断降级; -
SystemSlot
则通过系统的状态,例如 load1 等,来控制总的入口流量;
从上面代码中已也可以看到,最先执行的slot是NodeSelector,
first.entry进入之后,如果next不为空,就会开始调用具体的slot实现类。我这里主要看FlowSlot和DegradeSlot。
FlowSlot的entry方法:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
看一下checkFlow,主要是调用FlowChecker的方法:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//根据资源名,获取对应的流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//针对每个规则进行校验
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
规则配置,我采用的是push模式,即用nacos推送。此外还有PULL模式,API模式。线上一般都用push模式。
/*
* 规则持久化 - 推模式
* Sentinel控制台不再是调用客户端的API推送规则数据,而是将规则推送到Nacos或其他远程配置中心
* Sentinel客户端通过连接Nacos,来获取规则配置;并监听Nacos配置变化,如发生变化,就更新本地缓存(从而让本地缓存总是和Nacos一致)
* Sentinel控制台也监听Nacos配置变化,如发生变化就更新本地缓存(从而让Sentinel控制台的本地缓存总是和Nacos一致)
* */
@Configuration
public class NacosDataSourceConfig {
@Autowired
private SentinelProperties sentinelProperties;
@Bean
public NacosDataSourceConfig init() throws Exception {
// NacosSource初始化,从Nacos中获取熔断规则
sentinelProperties.getDatasource().entrySet().stream().filter(map -> {
return map.getValue().getNacos() != null;
}).forEach(map -> {
NacosDataSourceProperties nacos = map.getValue().getNacos();
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new NacosDataSource<List<FlowRule>>(nacos.getServerAddr(),
nacos.getGroupId(), nacos.getDataId(), source -> JSON.parseObject(source,
new TypeReference<List<FlowRule>>() {}));
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
});
return new NacosDataSourceConfig();
}
}
此外,sentinel还可以进行动态加载配置。如果配置更改了,它也会进行updateValue操作。
接着看具体的限流策略,其实这个在初始化时,根据我们的配置决定的:
FlowRuleUtil
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//如果是根据QPS限流
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//controllerBeahvior是我们自己配置的,上面有例子。
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//这是默认的
return new DefaultController(rule.getCount(), rule.getGrade());
}
下面的就是采用某种具体的限流策略了。之前在限流算法的文章里也讲述过具体的限流算法。sentinel主要采用的还是滑动窗口算法。
降级slot主要是会根据RT,异常比例和异常数等规则类型来进行拦截,上面的截图有相关的配置。
DegradeSlot的代码不是很复杂。
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
// Sentinel will degrade the service only if count exceeds.
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
接下来要研究一下集群的使用。
参考资料:
Spring Cloud Alibaba Sentinel 整合 Feign 的设计实现
微信分享/微信扫码阅读