跳到主要内容

21、SpringCloud Alibaba - Sentinel服务熔断降级DegradeSlot

一、DegradeRule

 	/**
     * 降级策略:慢调用比例、异常比例、异常数
     */
 	private int grade = RuleConstant.DEGRADE_GRADE_RT;

    /**
     * 阈值:最大RT、比例阈值、异常数
     */
    private double count;

    /**
     * 降级恢复时间,多久后结束降级
     */
    private int timeWindow;

    /**
     * 触达熔断的最小请求数量
     */
    private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

    /**
     * RT 模式的慢比例阈值
     */
    private double slowRatioThreshold = 1.0d;
 	/**
     * 统计时长
     */
    private int statIntervalMs = 1000;

 

一、DegradeSlot

1、 entry();

在entry方法中,对熔断规则的状态进行判断,CLOSE 状态通过规则,OPEN 状态时检验熔断时间是否已经过去,如果过去了则更新状态为 HALF_OPEN。

·1、、entry( )

	@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
   
     
        //执行熔断规则检查
        performChecking(context, resourceWrapper);
		//下一个节点
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

2、、performChecking( )

 void performChecking(Context context, ResourceWrapper r) throws BlockException {
   
     
 		//获取该资源的熔断规则
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
   
     
            return;
        }
        for (CircuitBreaker cb : circuitBreakers) {
   
     
        	//规则检验
            if (!cb.tryPass(context)) {
   
     
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

3、、tryPass( )

 	@Override
    public boolean tryPass(Context context) {
   
     
        // Template implementation.
        //熔断处于关闭状态,返回成功
        if (currentState.get() == State.CLOSED) {
   
     
            return true;
        }
        //熔断处于打开状态
        if (currentState.get() == State.OPEN) {
   
     
            // For half-open state we allow a request for probing.
            //校验降级时间是否已经过了,从打开状态转换成半开状态
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }

4、、retryTimeoutArrived( )

	protected boolean retryTimeoutArrived() {
   
     
        return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
    }

5、、fromOpenToHalfOpen( )

 	protected boolean fromOpenToHalfOpen(Context context) {
   
     
 		//更新状态,由 OPEN 到 HALF_OPEN
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
   
     
        	//通知观察者状态变化
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            Entry entry = context.getCurEntry();
            //注册exitHandlers,在执行链 Slot chain 执行完退出后,会回调exitHandlers
            entry.whenTerminate(new BiConsumer<Context, Entry>() {
   
     
                @Override
                public void accept(Context context, Entry entry) {
   
     
                    // Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
                    // Without the hook, the circuit breaker won't recover from half-open state in some circumstances
                    // when the request is actually blocked by upcoming rules (not only degrade rules).
                    //如果出现了BlockException
                    if (entry.getBlockError() != null) {
   
     
                        // Fallback to OPEN due to detecting request is blocked
                        //将状态恢复,由 HALF_OPEN 到 OPEN
                        currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                        //通知监听者
                        notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                    }
                }
            });
            return true;
        }
        return false;
    }
    

2、 exit();

 	public void exit(Context context, ResourceWrapper r, int count, Object... args) {
   
     
        Entry curEntry = context.getCurEntry();
        //出现异常,直接执行下一个节点
        if (curEntry.getBlockError() != null) {
   
     
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        //没有熔断规则,直接执行下一个节点
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
   
     
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
   
     
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
   
     
            	//调用每一个熔断规则的 onRequestComplete,分别有ResponseTimeCircuitBreaker、ExceptionCircuitBreaker
                circuitBreaker.onRequestComplete(context);
            }
        }
		//执行下一个节点
        fireExit(context, r, count, args);
    }

三、ResponseTimeCircuitBreaker

慢调用比例的处理逻辑,时间窗口使用的是SlowRequestLeapArray ,对应的bucket是 SlowRequestCounter

1、 SlowRequestLeapArray;

static class SlowRequestLeapArray extends LeapArray<SlowRequestCounter> {
   
     

        public SlowRequestLeapArray(int sampleCount, int intervalInMs) {
   
     
            super(sampleCount, intervalInMs);
        }

        @Override
        public SlowRequestCounter newEmptyBucket(long timeMillis) {
   
     
            return new SlowRequestCounter();
        }

        @Override
        protected WindowWrap<SlowRequestCounter> resetWindowTo(WindowWrap<SlowRequestCounter> w, long startTime) {
   
     
            w.resetTo(startTime);
            w.value().reset();
            return w;
        }
    }

2、 SlowRequestCounter;

 static class SlowRequestCounter {
   
     
        private LongAdder slowCount;
        private LongAdder totalCount;

        public SlowRequestCounter() {
   
     
        	//慢请求数量
            this.slowCount = new LongAdder();
            //总请求数量
            this.totalCount = new LongAdder();
        }

        public LongAdder getSlowCount() {
   
     
            return slowCount;
        }

        public LongAdder getTotalCount() {
   
     
            return totalCount;
        }

        public SlowRequestCounter reset() {
   
     
            slowCount.reset();
            totalCount.reset();
            return this;
        }

        @Override
        public String toString() {
   
     
            return "SlowRequestCounter{" +
                "slowCount=" + slowCount +
                ", totalCount=" + totalCount +
                '}';
        }
    }

3、 onRequestComplete();

  public void onRequestComplete(Context context) {
   
     
  		//当前时间窗口的慢请求计数器
        SlowRequestCounter counter = slidingCounter.currentWindow().value();
        Entry entry = context.getCurEntry();
        if (entry == null) {
   
     
            return;
        }
        long completeTime = entry.getCompleteTimestamp();
        if (completeTime <= 0) {
   
     
            completeTime = TimeUtil.currentTimeMillis();
        }
        //计算响应时间
        long rt = completeTime - entry.getCreateTimestamp();
        //大于阈值,为慢请求
        if (rt > maxAllowedRt) {
   
     
            counter.slowCount.add(1);
        }
        //总数量也加1
        counter.totalCount.add(1);
		//处理熔断规则状态
        handleStateChangeWhenThresholdExceeded(rt);
    }

4、 handleStateChangeWhenThresholdExceeded();

 private void handleStateChangeWhenThresholdExceeded(long rt) {
   
     
 		//已经是打开状态
        if (currentState.get() == State.OPEN) {
   
     
            return;
        }
        //半开状态
        if (currentState.get() == State.HALF_OPEN) {
   
     
            // In detecting request
            // TODO: improve logic for half-open recovery
            if (rt > maxAllowedRt) {
   
     
            	//熔断时间过去后的下一次请求处于半开状态,此时又超时了,直接由半开变为打开
                fromHalfOpenToOpen(1.0d);
            } else {
   
     
            	//没有超时,由半开变为关闭
                fromHalfOpenToClose();
            }
            return;
        }
		//获取窗口计数器
        List<SlowRequestCounter> counters = slidingCounter.values();
        long slowCount = 0;
        long totalCount = 0;
        for (SlowRequestCounter counter : counters) {
   
     
        	//慢请求数量
            slowCount += counter.slowCount.sum();
            //总请求数量
            totalCount += counter.totalCount.sum();
        }
        //总数量 达不到 触达熔断的最小请求数量
        if (totalCount < minRequestAmount) {
   
     
            return;
        }
        //慢请求比例
        double currentRatio = slowCount * 1.0d / totalCount;
        if (currentRatio > maxSlowRequestRatio) {
   
     
        	//打开熔断
            transformToOpen(currentRatio);
        }
    }

5、 transformToOpen();

 	protected void transformToOpen(double triggerValue) {
   
     
        State cs = currentState.get();
        switch (cs) {
   
     
            case CLOSED:
                fromCloseToOpen(triggerValue);
                break;
            case HALF_OPEN:
                fromHalfOpenToOpen(triggerValue);
                break;
            default:
                break;
        }
    }

6、 fromCloseToOpen();

状态由Close 到 Open

 protected boolean fromCloseToOpen(double snapshotValue) {
   
     
        State prev = State.CLOSED;
        if (currentState.compareAndSet(prev, State.OPEN)) {
   
     
        	//更新熔断结束时间
            updateNextRetryTimestamp();
			//通知状态变化
            notifyObservers(prev, State.OPEN, snapshotValue);
            return true;
        }
        return false;
    }

 protected void updateNextRetryTimestamp() {
   
     
 		//当前时间加上熔断恢复时间
        this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
    }

7、 fromHalfOpenToOpen();

状态由HALF_OPEN 到 Open

	 protected boolean fromHalfOpenToOpen(double snapshotValue) {
   
     
        if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
   
     
            updateNextRetryTimestamp();
            notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
            return true;
        }
        return false;
    }

8、 fromHalfOpenToClose();

状态由HALF_OPEN 到 Close

 protected boolean fromHalfOpenToClose() {
   
     
        if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
   
     
        	//重置统计指标
            resetStat();
            notifyObservers(State.HALF_OPEN, State.CLOSED, null);
            return true;
        }
        return false;
    }

 public void resetStat() {
   
     
        // Reset current bucket (bucket count = 1).
        slidingCounter.currentWindow().value().reset();
    }


        public SlowRequestCounter reset() {
   
     
            slowCount.reset();
            totalCount.reset();
            return this;
        }

四、ExceptionCircuitBreaker

异常比例、异常数 熔断规则的处理逻辑,时间窗口使用的是SimpleErrorCounterLeapArray,对应的bucket是 SimpleErrorCounter

1、 SimpleErrorCounterLeapArray;

static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
   
     

        public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
   
     
            super(sampleCount, intervalInMs);
        }

        @Override
        public SimpleErrorCounter newEmptyBucket(long timeMillis) {
   
     
            return new SimpleErrorCounter();
        }

        @Override
        protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
   
     
            // Update the start time and reset value.
            w.resetTo(startTime);
            w.value().reset();
            return w;
        }
    }

2、 SimpleErrorCounter;

 static class SimpleErrorCounter {
   
     
 		//异常数量
        private LongAdder errorCount;
        //总数量
        private LongAdder totalCount;

        public SimpleErrorCounter() {
   
     
            this.errorCount = new LongAdder();
            this.totalCount = new LongAdder();
        }

        public LongAdder getErrorCount() {
   
     
            return errorCount;
        }

        public LongAdder getTotalCount() {
   
     
            return totalCount;
        }

        public SimpleErrorCounter reset() {
   
     
            errorCount.reset();
            totalCount.reset();
            return this;
        }

        @Override
        public String toString() {
   
     
            return "SimpleErrorCounter{" +
                "errorCount=" + errorCount +
                ", totalCount=" + totalCount +
                '}';
        }
    }

3、 onRequestComplete();

 public void onRequestComplete(Context context) {
   
     
        Entry entry = context.getCurEntry();
        if (entry == null) {
   
     
            return;
        }
        Throwable error = entry.getError();
        //当前时间窗口的慢请求计数器
        SimpleErrorCounter counter = stat.currentWindow().value();
        //有异常时,异常数量加1
        if (error != null) {
   
     
            counter.getErrorCount().add(1);
        }
        //总数量加1
        counter.getTotalCount().add(1);
		//处理熔断状态变化
        handleStateChangeWhenThresholdExceeded(error);
    }

4、 handleStateChangeWhenThresholdExceeded();

 private void handleStateChangeWhenThresholdExceeded(Throwable error) {
   
     
 		//打开状态
        if (currentState.get() == State.OPEN) {
   
     
            return;
        }
        //半开状态
        if (currentState.get() == State.HALF_OPEN) {
   
     
            // In detecting request
            if (error == null) {
   
     
            	//没有异常,由半开变为关闭
                fromHalfOpenToClose();
            } else {
   
     
            	//有异常,由半开变为打开
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        //关闭状态
        //获取时间窗口异常计数器
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
   
     
        	//异常总数
            errCount += counter.errorCount.sum();
            //请求总数
            totalCount += counter.totalCount.sum();
        }
        //总数小于触达熔断的最小请求数量
        if (totalCount < minRequestAmount) {
   
     
            return;
        }
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
   
     
            // Use errorRatio
            //异常比例
            curCount = errCount * 1.0d / totalCount;
        }
        //比较异常比例或者比较异常数
        if (curCount > threshold) {
   
     
        	//打开熔断,跟ResponseTimeCircuitBreaker一样调用父类AbstractCircuitBreaker的方法
            transformToOpen(curCount);
        }
    }