Skip to main content

Command Palette

Search for a command to run...

RabbitMQ 高级特性-- TTL 机制 & 死信队列

Updated
3 min read
E

Science is gold.

1. TTL 机制

​ 在京东下单,订单创建成功,等待支付,一般会给用户 30 分钟时间。如果在这段时间内用户没有支付,则默认订单取消。

该如何实现?

  • 定期轮询(数据库、后台服务等)

    • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    • 优点: 设计实现简单
    • 缺点: 需要对数据库进行大量的I0操作,效率低下。
  • RabbitMQ

    • 使用 TTL
  • ...

TTLTime to Live 的简称,即过期时间。

RabpitMQ 可以对消息和队列两个维度来设置TTL。 任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

目前有两种方法可以设置消息的 TTL。

  1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。

    // 设置队列属性
    var arguments = new Dictionary<string, object>
    {
       {"x-message-ttl", 30000}, // 设置队列的 TTL
        {"x-expires", 10000} // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,可以存活多久)
    };
    Channel.QueueDeclare(QueueName, true, false, false,arguments);
    
  2. 消息自身进行单独设置,每条消息的TTL可以不同。

    var properties = _channel.CreateBasicProperties();
    properties.DeliveryMode = 2;
    properties.Expiration = "10000"; // 设置单条消息过期时间
    var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
    _channel.BasicPublish(_exchangeName, routeKey, true, properties, body);
    

如果两种方法一起使用,则消息的TTL以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信(Dead Message),消费者默认就无法再收到该消息。当然,"死信”也是可以被取出来消费的,下一小节我们会讲解。

此外,还可以通过命令行方式设置全局 TTL:

rabbitmqctl set_policy [policy name] ".*" '{"message-ttl":30000, "expires":10000}' --apply-to queues

默认规则

  1. 如果不设置 TTL,则表示消息不会过期;
  2. 如果 TTL 设置为 0,表示除非此时可以直接将消息投递到消费者,否则该消息被立刻丢弃

注意理解 message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般 TTL 相关的参数单位都是毫秒(ms)

代码

  1. 生产者 TtlProducer.cs

    public class TtlProducer : IRabbitProducer
    {
        private RabbitOptions _rabbitOptions;
        private IModel _channel;
        private string _exchangeName;
    
        public TtlProducer(IOptions<RabbitOptions> options)
        {
            _rabbitOptions = options.Value;
            _channel = RabbitHelper.CreateConnection(_rabbitOptions).CreateModel();
        }
    
        public void Init(string exchangeName, string queueName, string routingKey)
        {
            try
            {
                _exchangeName = exchangeName;
                _channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);
            }
            catch (Exception e)
            {
                Console.WriteLine($"初始化 {nameof(TtlProducer)} 失败:{e.Message}");
            }
        }
    
        public void PushMessage(string message)
        {
            throw new NotImplementedException();
        }
    
        public void PushMessage(string routeKey, object message)
        {
            var properties = _channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;
            // properties.Expiration = "10000"; // 设置单条消息过期时间
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            Console.WriteLine($"向交换器 {_exchangeName} ,routing key:{routeKey} 发送消息:{JsonConvert.SerializeObject(message)}");
            _channel.BasicPublish(_exchangeName, routeKey, true, properties, body);
        }
    }
    
  1. 消费者 TtlConsumerService.cs

    public class TtlConsumerService : RabbitConsumerService
    {
        public TtlConsumerService(IOptions<RabbitOptions> options) : base(options)
        {
            ExchangeName = "ex.ttl";
            QueueName = "queue.ttl";
            RouteKey = "#";
        }
    
        public override void Register()
        {
            try
            {
                Channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true);
                // 设置队列属性
                var arguments = new Dictionary<string, object>
                {
                    {"x-message-ttl", 30000}, // 设置队列的 TTL
                    {"x-expires", 10000} // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,可以存活多久)
                };
                Channel.QueueDeclare(QueueName, true, false, false, arguments);
                Channel.QueueBind(QueueName, ExchangeName, RouteKey);
                Channel.BasicQos(0,1,false);
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    Thread.Sleep(10000);
                    string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(
                        $"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
                    if (Process(message))
                        Channel.BasicAck(ea.DeliveryTag, false);
                    else
                        Channel.BasicNack(ea.DeliveryTag, false, true);
                };
                Channel.BasicConsume(QueueName, false, consumer);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    
        public override bool Process(string message)
        {
            return true;
        }
    }
    

2 死信队列

死信交换器(DLX),全称为 Dead-Letter-Exchange。消息在一个队列中编程死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时绑定 DLX 的队列就称为“死信队列”。

比如:

用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统采用 MQ 异步通讯。

在定义业务队列时可以考虑指定一个死信交换器,并绑定一个死信队列。当消息标称死信时,该消息就会被发送到死信队列上,这样方便我们查看消息失败的原因。

以下几种情况导致消息变为死信

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false
  2. 消息过期
  3. 队列达到最大长度

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

通过给声明正常的业务队列设置 arguments 指定当前队列的死信交换器及死信 routingKey:

var arguments = new Dictionary<string, object>
            {
                {"x-message-ttl", 10000}, // 设置队列 TTL
                {"x-dead-letter-exchange", _deadLetterExchange}, // 设置该队列所关联的死信交换器(当队列消息 ttl 到期后仍然没有消费,则加入死信队列
                {"x-dead-letter-routing-key",_deadLetterRoutingKey} // 设置该队列所关联的死信交换器的 routingKey,如果没有特殊指定,使用原队列的 routingKey
            };
Channel.QueueDeclare(QueueName, true, false, false, arguments);

代码

  1. 生产者继续用 TtlProducer.cs

  2. 消费者:DeadLetterConsumerService.cs

    public class DeadLetterConsumerService : RabbitConsumerService
    {
        private string _deadLetterExchange = "ex.dlx";
        private string _deadLetterQueue = "queue.dlx";
        private string _deadLetterRoutingKey = "routing.key.dlx.teset";
    
        public DeadLetterConsumerService(IOptions<RabbitOptions> options) : base(options)
        {
            QueueName = "queue.biz";
            ExchangeName = "ex.biz";
            RouteKey = "#";
        }
    
        public override void Register()
        {
            try
            {
                // 定义一个正常的业务交换器
                Channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, true, false, null);
                // 定义一个死信交换器(也是一个普通交换器)
                Channel.ExchangeDeclare(_deadLetterExchange, ExchangeType.Direct, true);
                Channel.BasicQos(0,1,false);
                var arguments = new Dictionary<string, object>
                {
                    {"x-message-ttl", 10000}, // 设置队列 TTL
                    {"x-dead-letter-exchange", _deadLetterExchange}, // 设置该队列所关联的死信交换器(当队列消息 ttl 到期后仍然没有消费,则加入死信队列
                    {"x-dead-letter-routing-key",_deadLetterRoutingKey} // 设置该队列所关联的死信交换器的 routingKey,如果没有特殊指定,使用原队列的 routingKey
                };
                Channel.QueueDeclare(QueueName, true, false, false, arguments);
                Channel.QueueBind(QueueName,ExchangeName,RouteKey);
                // 声明死信队列
                Channel.QueueDeclare(_deadLetterQueue, true, false, false, null);
                // 绑定死信队列和死信交换器
                Channel.QueueBind(_deadLetterQueue,_deadLetterExchange,_deadLetterRoutingKey);
    
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    Thread.Sleep(5000);
                    string message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(
                        $"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
                    if(Process(message))
                        Channel.BasicAck(ea.DeliveryTag,false);
                    else
                        Channel.BasicNack(ea.DeliveryTag,false,true);
                };
                Channel.BasicConsume(QueueName, false, consumer);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
        }
    
        public override bool Process(string message)
        {
            return true;
        }
    }
    
    1. Program.cs

      // 注册后台服务
      builder.Services.AddHostedService<DeadLetterConsumerService>();
      // 依赖注入 IRabbitProducer
      builder.Services.AddSingleton<IRabbitProducer, TtlProducer>();
      
    2. 测试:生产者向队列发送 100 条消息:

image-20220626194117933.png 可以看到,队列中的消息过期后全部进入死信队列 queue.dlx 中。

More from this blog

C# 标准性能测试高级用法(Benchmark)

在 C# 标准性能测试 已经告诉大家如何使用 BenchmarkDotNet 测试性能,本文会告诉大家高级的用法。 建议是创建一个控制台项目用来做性能测试,这个项目要求是 dotnet framework 4.6 以上,建议是 dotnet 7 的版本。使用这个项目引用需要测试的项目,然后在里面写测试的代码。 例如被测试项目有一个类 Foo 里面有一个叫 Lindexidb 的方法,接下来的任务是需要测试这个 Lindexidb 方法的性能 最简单的测试的代码 public class FooP...

Jan 9, 20247 min read

.NetCore 实践——HttpClientFactory[一]

HttpClientFactory介绍 HttpClientFactory 主要有下面的功能: 管理内部HttpMessageHandler 的生命周期,灵活应对资源问题和DNS刷新问题 支持命名话、类型化配置,集中管理配置,避免冲突。 灵活的出站请求管道配置,轻松管理请求生命周期 内置管道最外层和最内层日志记录器,有information 和 Trace 输出 核心对象: HttpClient HttpMessageHandler SocketsHttpHandler De...

Jan 9, 20245 min read

认识 MSBuild - 1

前言 很多人一谈到 MSBuild,脑子里就会出现 “XML”、“只能用 VS 的属性框图形界面操作”、“可定制性和扩展性差” 和 “性能低” 等印象,但实际上这些除了 “XML” 之外完全都是刻板印象:这些人用着 Visual Studio 提供的图形界面,就完全不愿意花个几分钟时间翻翻文档去理解 MSBuild 及其构建过程。 另外,再加上 vcxproj (Visual C++ 项目)的默认 MSBuild 构建文件写得确实谈不上好(默认只能项目粒度并行编译,想要源码级并行编译你得加钱),...

Jan 9, 20245 min read

.NetCore实战——工作单元模式(UnitOfWork):管理好你的事务

工作单元模式有如下几个特性: 1、使用同一上下文 2、跟踪实体的状态 3、保障事务一致性 我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化 接下来看一下代码 为了实现工作单元模式,这里定义了一个工作单元的接口 public interface IUnitOfWork : IDisposable { Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); Task<b...

Jan 9, 20242 min read
E

Edward Chu's blog

41 posts