.NetCore实践——熔断与限流

·

2 min read

Polly策略类型

polly 的策略类型分为两类:

  1. 被动策略(异常处理、结果处理)

  2. 主动策略(超时处理、断路器、舱壁隔离、缓存)

熔断和限流通过下面主动策略来实现:

  1. 降级响应

  2. 失败重试

  3. 断路器

  4. 舱壁隔离

Policy 类型状态说明
CircuitBreaker(断路器)有状态共享失败率,以决定是否熔断
Bulkhead(舱壁隔离)有状态共享容量使用情况,以决定是否执行动作
Cache(缓存)有状态共享缓存的对象,以决定是否命中
其他策略无状态

熔断

熔断就是让我们的上游服务器一段时间内对下游服务器不进行调用。

这里解释一下上游服务器和下游服务器,比如说A调用B,那么A就是上游服务器,B就是下游服务器。

那么为什么要熔断呢?比如说A调用B,现在A调用B 10次有8次是错误的,那么这个时候就要想一件事,代码没有变过,那么肯定是量变成了质变。

这时候B之所以不可用,那么是因为请求太多了,处理不过来(比如内存升高了,io 99%了等)。

那么这个时候A就不进行调用了,隔一段时间后再进行调用。也就是A对B的这条线进行了熔断处理。

 /// <summary>
 /// 定义断路器策略(根据次数简单限制)
 /// </summary>
 /// <returns></returns>
 static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
 {
     return HttpPolicyExtensions
         .HandleTransientHttpError()
         /*
          如果在重试 Http 请求时出现 5 个连续故障,则会中断或断开线路。
           此时,电路将断开 30 秒:在此期间,断路器会立即中止呼叫,而不是拨打电话。 
          */
         .CircuitBreakerAsync(
            handledEventsAllowedBeforeBreaking: 5,
            durationOfBreak: TimeSpan.FromSeconds(30),
             onBreak: ((result, span) =>
             {
                 // 熔断的时候处理事件
             }),
             onReset: (() =>
             {
                 // 恢复的时候的处理
             }),
             onHalfOpen: (() =>
             {
                 // 恢复之前进行处理
             })
             );
 }

// 使用:Program.cs
services.AddHttpClient("breaker")
        .AddPolicyHandler(GetCircuitBreakerPolicy())

CircuitBreakerAsync 表示断路器,这个用来实现熔断的。

handledEventsAllowedBeforeBreaking 表示失败5次,进行熔断。

durationOfBreak熔断的时间

其实上面这种不常用,因为限制比较死,比如说5次就熔断。

一般使用百分比计算:

/// <summary>
/// 进阶断路器策略(根据百分比来计算)
/// </summary>
/// <returns></returns>
static IAsyncPolicy<HttpResponseMessage> GetAdvancedCircuitBreakerPolicy()
{
    return Policy<HttpResponseMessage>
        .Handle<HttpRequestException>()
        .AdvancedCircuitBreakerAsync(
            // failureThreshold 和 samplingDuration一般是组合起来用的,表示是10秒内失败次数要有80%就会熔断。
            failureThreshold: 0.8, // 失败的比例
            samplingDuration: TimeSpan.FromSeconds(10), // 失败的时间
            minimumThroughput: 100, // 10秒内必须有100个请求才会出根据其他的条件进行熔断判断。
            durationOfBreak: TimeSpan.FromSeconds(10),
            onBreak: (result, span) =>
            {
                // 熔断的时候处理事件
            },
            onReset: () =>
            {
                // 恢复的时候的处理
            },
            onHalfOpen: () =>
            {
                // 恢复之前进行处理
            }
        );
}

// 使用:Program.cs
services.AddHttpClient("breaker")
        .AddPolicyHandler(GetAdvancedCircuitBreakerPolicy())

failureThreshold 表示失败的比例、samplingDuration 表示失败的时间

failureThresholdsamplingDuration一般是组合起来用的,表示是10秒内失败次数要有80%就会熔断。

minimumThroughput 表示10秒内必须有100个请求才会出根据其他的条件进行熔断判断。

上面就是熔断了,那么什么是服务降级呢?

服务降级

服务降级是指 当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务和页面有策略的不处理或换种简单的方式处理,从而释放服务器资源以保证核心业务正常运作或高效运作。

个人理解是服务降级是指降低了原有的服务体验,都属于服务降级。

熔断其实也是一种服务降级,但是单纯的熔断就降级的有点厉害了。

比如我去买东西,然后店直接关门了,服务体验下降了,体验降得比较厉害。

但是如果去买东西,店没有关闭,而是有一排服务员,告诉你现在因为供销商没到货买不到了,这体验是不是好点,这也是服务降级。

那么就看下第二种情况的服务降级怎么实现吧。

/// <summary>
/// 服务降级(fallback)
/// </summary>
/// <returns></returns>
static IAsyncPolicy<HttpResponseMessage> GetAdvancedCircuitBreakerPolicy2()
{
    var breakerPolicy = Policy<HttpResponseMessage>
        .Handle<HttpRequestException>()
        .AdvancedCircuitBreakerAsync(
            failureThreshold: 0.8,
            samplingDuration: TimeSpan.FromSeconds(10),
            minimumThroughput: 100,
            durationOfBreak: TimeSpan.FromMinutes(10),
            onBreak: (result, span) =>
            {
                // 熔断的时候处理事件
            },
            onReset: () =>
            {
                // 恢复的时候的处理
            },
            onHalfOpen: () =>
            {
                // 恢复之前进行处理
            }
            );
    var message = new HttpResponseMessage() { Content = new StringContent("不要慌,老板没有跑路,只是和老婆的妹妹出去了,要等一下!") };

    // 当熔断后,过来的请求会有 BrokenCircuitException 异常,
    var fallback = Policy<HttpResponseMessage>
        .Handle<BrokenCircuitException>()
        .FallbackAsync(message);

    // 将几个policy组合在一起,增加该组合策略
    var fallbackBreak = Policy.WrapAsync(fallback, breakerPolicy);
    return fallbackBreak;
}

// 使用:Program.cs
services.AddHttpClient("breaker")
        .AddPolicyHandler(GetAdvancedCircuitBreakerPolicy2())

上面代码就是当熔断后,过来的请求会有BrokenCircuitException异常,那么捕获到BrokenCircuitException异常,那么就告诉用户店没有倒闭,等一下就好。

这种就是比较优雅的降级。

上面这种var fallbackBreak = Policy.WrapAsync(fallback, breakPolicy); 就是将几个policy组合在一起,然后给某个或者某些HttpClient 增加该组合策略,例如Policy.WrapAsync(fallback, breakPolicy,xxx,yyy)等。

限流

为什么要限流呢? 如果没有限流其实熔断的意义是不大的。

为什么这么说呢? 比如说公司有1百万请求,然后这个时候熔断了,但是这100w请求还在。恢复服务,服务器又要进行熔断,一下子就瞬间爆炸。

        /// <summary>
        /// 限流(舱壁隔离)
        /// </summary>
        /// <returns></returns>
        static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicyAsync()
        {
            var bulk = Policy.BulkheadAsync<HttpResponseMessage>(
                // 可以并发处理30个请求
                maxParallelization: 30,
                // 如果并发处于30,那么会加入到队列中,队列中最大能存20个
                maxQueuingActions: 20,
                onBulkheadRejectedAsync: context => Task.CompletedTask
            );

            var message = new HttpResponseMessage() { Content = new StringContent("你没抢到号码,下次再来。") };

            // 如果队列中也存不下,那么就会抛出BulkheadRejectedException这种拒绝异常,一但出现异常,第二个策略就捕获到了,然后给出一些友好的提示。
            var fallbackBulk = Policy<HttpResponseMessage>
                .Handle<BulkheadRejectedException>()
                .FallbackAsync(message);

            var fallbackBreak = Policy.WrapAsync(bulk, fallbackBulk);
            return fallbackBreak;
        }

// 使用:Program.cs
services.AddHttpClient("breaker")
        .AddPolicyHandler(GetBulkheadPolicyAsync());

上面这个就是限流了。

maxParallelization 表示可以并发处理30个请求

maxQueuingActions表示如果并发处于30,那么会加入到队列中,队列中最大能存20个。

如果队列中也存不下,那么就会抛出BulkheadRejectedException这种拒绝异常,一但出现异常,第二个策略就捕获到了,然后给出一些友好的提示。