RabbitMQ 高级特性-- TTL 机制 & 死信队列

1. TTL 机制

​ 在京东下单,订单创建成功,等待支付,一般会给用户 30 分钟时间。如果在这段时间内用户没有支付,则默认订单取消。

该如何实现?

  • 定期轮询(数据库、后台服务等)

    • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    • 优点: 设计实现简单
    • 缺点: 需要对数据库进行大量的I0操作,效率低下。
  • RabbitMQ

    • 使用 TTL
  • ...

TTLTime to Live 的简称,即过期时间。

RabpitMQ 可以对消息和队列两个维度来设置TTL。 任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

目前有两种方法可以设置消息的 TTL。

  1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。

    // 设置队列属性
    var arguments = new Dictionary<string, object>
    {
       {"x-message-ttl", 30000}, // 设置队列的 TTL
        {"x-expires", 10000} // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,可以存活多久)
    };
    Channel.QueueDeclare(QueueName, true, false, false,arguments);
    
  2. 消息自身进行单独设置,每条消息的TTL可以不同。

    var properties = _channel.CreateBasicProperties();
    properties.DeliveryMode = 2;
    properties.Expiration = "10000"; // 设置单条消息过期时间
    var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
    _channel.BasicPublish(_exchangeName, routeKey, true, properties, body);
    

如果两种方法一起使用,则消息的TTL以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信(Dead Message),消费者默认就无法再收到该消息。当然,"死信”也是可以被取出来消费的,下一小节我们会讲解。

此外,还可以通过命令行方式设置全局 TTL:

rabbitmqctl set_policy [policy name] ".*" '{"message-ttl":30000, "expires":10000}' --apply-to queues

默认规则

  1. 如果不设置 TTL,则表示消息不会过期;
  2. 如果 TTL 设置为 0,表示除非此时可以直接将消息投递到消费者,否则该消息被立刻丢弃

注意理解 message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般 TTL 相关的参数单位都是毫秒(ms)

代码

  1. 生产者 TtlProducer.cs

    public class TtlProducer : IRabbitProducer
    {
        private RabbitOptions _rabbitOptions;
        private IModel _channel;
        private string _exchangeName;
    
        public TtlProducer(IOptions<RabbitOptions> options)
        {
            _rabbitOptions = options.Value;
            _channel = RabbitHelper.CreateConnection(_rabbitOptions).CreateModel();
        }
    
        public void Init(string exchangeName, string queueName, string routingKey)
        {
            try
            {
                _exchangeName = exchangeName;
                _channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);
            }
            catch (Exception e)
            {
                Console.WriteLine($"初始化 {nameof(TtlProducer)} 失败:{e.Message}");
            }
        }
    
        public void PushMessage(string message)
        {
            throw new NotImplementedException();
        }
    
        public void PushMessage(string routeKey, object message)
        {
            var properties = _channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;
            // properties.Expiration = "10000"; // 设置单条消息过期时间
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            Console.WriteLine($"向交换器 {_exchangeName} ,routing key:{routeKey} 发送消息:{JsonConvert.SerializeObject(message)}");
            _channel.BasicPublish(_exchangeName, routeKey, true, properties, body);
        }
    }
    
  1. 消费者 TtlConsumerService.cs

    public class TtlConsumerService : RabbitConsumerService
    {
        public TtlConsumerService(IOptions<RabbitOptions> options) : base(options)
        {
            ExchangeName = "ex.ttl";
            QueueName = "queue.ttl";
            RouteKey = "#";
        }
    
        public override void Register()
        {
            try
            {
                Channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true);
                // 设置队列属性
                var arguments = new Dictionary<string, object>
                {
                    {"x-message-ttl", 30000}, // 设置队列的 TTL
                    {"x-expires", 10000} // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,可以存活多久)
                };
                Channel.QueueDeclare(QueueName, true, false, false, arguments);
                Channel.QueueBind(QueueName, ExchangeName, RouteKey);
                Channel.BasicQos(0,1,false);
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    Thread.Sleep(10000);
                    string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(
                        $"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
                    if (Process(message))
                        Channel.BasicAck(ea.DeliveryTag, false);
                    else
                        Channel.BasicNack(ea.DeliveryTag, false, true);
                };
                Channel.BasicConsume(QueueName, false, consumer);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    
        public override bool Process(string message)
        {
            return true;
        }
    }
    

2 死信队列

死信交换器(DLX),全称为 Dead-Letter-Exchange。消息在一个队列中编程死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时绑定 DLX 的队列就称为“死信队列”。

比如:

用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统采用 MQ 异步通讯。

在定义业务队列时可以考虑指定一个死信交换器,并绑定一个死信队列。当消息标称死信时,该消息就会被发送到死信队列上,这样方便我们查看消息失败的原因。

以下几种情况导致消息变为死信

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  2. 消息过期
  3. 队列达到最大长度

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

通过给声明正常的业务队列设置 arguments 指定当前队列的死信交换器及死信 routingKey:

var arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000}, // 设置队列 TTL
                {"x-dead-letter-exchange", _deadLetterExchange}, // 设置该队列所关联的死信交换器(当队列消息 ttl 到期后仍然没有消费,则加入死信队列
                {"x-dead-letter-routing-key",_deadLetterRoutingKey} // 设置该队列所关联的死信交换器的 routingKey,如果没有特殊指定,使用原队列的 routingKey
            };
Channel.QueueDeclare(QueueName, true, false, false, arguments);

代码

  1. 生产者继续用 TtlProducer.cs

  2. 消费者:DeadLetterConsumerService.cs

    public class DeadLetterConsumerService : RabbitConsumerService
    {
        private string _deadLetterExchange = "ex.dlx";
        private string _deadLetterQueue = "queue.dlx";
        private string _deadLetterRoutingKey = "routing.key.dlx.teset";
    
        public DeadLetterConsumerService(IOptions<RabbitOptions> options) : base(options)
        {
            QueueName = "queue.biz";
            ExchangeName = "ex.biz";
            RouteKey = "#";
        }
    
        public override void Register()
        {
            try
            {
                // 定义一个正常的业务交换器
                Channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
                // 定义一个死信交换器(也是一个普通交换器)
                Channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Direct, true);
                Channel.BasicQos(0,1,false);
                var arguments = new Dictionary<string, object>
                {
                    {"x-message-ttl", 10000}, // 设置队列 TTL
                    {"x-dead-letter-exchange", _deadLetterExchange}, // 设置该队列所关联的死信交换器(当队列消息 ttl 到期后仍然没有消费,则加入死信队列
                    {"x-dead-letter-routing-key",_deadLetterRoutingKey} // 设置该队列所关联的死信交换器的 routingKey,如果没有特殊指定,使用原队列的 routingKey
                };
                Channel.QueueDeclare(QueueName, true, false, false, arguments);
                Channel.QueueBind(QueueName,ExchangeName,RouteKey);
                // 声明死信队列
                Channel.QueueDeclare(_deadLetterQueue, true, false, false, null);
                // 绑定死信队列和死信交换器
                Channel.QueueBind(_deadLetterQueue,_deadLetterExchange,_deadLetterRoutingKey);
    
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    Thread.Sleep(5000);
                    string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(
                        $"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
                    if(Process(message))
                        Channel.BasicAck(ea.DeliveryTag,false);
                    else
                        Channel.BasicNack(ea.DeliveryTag,false,true);
                };
                Channel.BasicConsume(QueueName, false, consumer);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    
        public override bool Process(string message)
        {
            return true;
        }
    }
    
    1. Program.cs

      // 注册后台服务
      builder.Services.AddHostedService<DeadLetterConsumerService>();
      // 依赖注入 IRabbitProducer
      builder.Services.AddSingleton<IRabbitProducer, TtlProducer>();
      
    2. 测试:生产者向队列发送 100 条消息:

image-20220626194117933.png 可以看到,队列中的消息过期后全部进入死信队列 queue.dlx 中。