RabbitMQ 高级特性--延时队列

延时队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

例如下面的业务场景:

在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果 15 分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?

  1. 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
  2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
  3. 还可以用延迟队列,锁座成功后会发送 1 条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;

你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚 22 点准时参加会议。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟) 就会通知提醒参会人员做好参会准备,会议马上开始...

同样的,这也可以通过轮询"会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息, 而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。

可以使用死信队列完成吗?

如果队列中的消息 ttl 时间都不一样,用死信队列来完成延时队列的功能时会出现问题:假设第 1 条消息过期时间为 40s,第 2 条过期时间为 20s ,队列只检查队列头的消息,如果过期了就放入死信队列,不过期则不动,并不会检查后面的消息 ttl,当第 1 条消息 40s 后过期了,放入死信队列里。再判断第 2 条,但这个时候第 2 条消息已经过期了20s! 很明显不对

不过遗憾的是,在 AMQP 协议和 RabbitMO 中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列"来变相的实现。

可以使用 rabbitmq_delayed_message_exchange 插件实现。

这里和 TTL 方式有个很大的不同就是 TTL 存放消息在死信队列里,而基于插件存放消息在延时交换器x-delayed-message exchange)里。

image-20220626195207712.png

  1. 生产者将消息(msg)和路由键(routingKey)发送指定的延时交换机上
  2. 延时交换机存储消息等待消息到期根据路由键(routingKey)找到绑定自己的队列(queue)并把消息给它
  3. 队列(queue)再把消息发送给监听它的消费者(consumer)

如何使用

下载插件

下载地址:

image-20220626200206166.png

安装插件

将插件拷贝到 rabbitmq-server 的安装路径: /usr/lib/rabbitmq/lib/rabbitmq_server-<你的rabbit版本>/plugins

安装后查看

image-20220626201254235.png

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20220626201512604.png image-20220626201512604

重启 rabbitmq-servver

systemctl restart rabbitmq-server

交换器应用

使用延迟消息交换器需要声明一个 x-delayed-message 类型的交换器,示例如下:

var args = new Dictionary<string, object>
{
   {"x-delayed-type", "direct"}
};
_channel.ExchangeDeclare(exchangeName, "x-delayed-message", true, false, args);

上面的示例当我们声明一个交换器时,我们提供了一个x-delayed-type 参数,值设置为 direct。这是想告诉交换器希望它路由消息的行为、绑定等等像 direct 类型交换器一样;在上面示例中,我们的交换器就像direct 交换器一样。我们也可以传递 topicfanout 或者其它插件提供的自定义交换器类型。

发布延迟消息

用户必须使用名为x-delay的特殊 header 发布延迟消息,该 header 需要一个整数,表示 RabbitMQ 应延迟消息的毫秒数。值得注意的是,这里的延迟意味着消息延迟路由到队列或其它交换器。

exhange(交换器)没有消费者的概念。因此,一旦延迟过期,插件将尝试将消息路由到与 exchange 的路由规则匹配的队列。请注意,如果消息不能路由到任何队列,那么它将被丢弃。

以下是添加x-delay 头(header)到消息并且发布到 exchange 的示例代码:

var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
   {"x-delay", 5000} // 消息延迟 5000 毫秒
};
_channel.BasicPublish(_exchangeName,"",properties,body);

上面的示例中,消息在被插件路由之前将被延迟5秒钟。该示例假设你已经建立了到RabbitMQ的连接并获得了一个信道。

完整代码

1. 生产者 DelayedExchangeProducer.cs

   public class DelayedExchangeProducer: IRabbitProducer
   {
       private RabbitOptions _rabbitOptions;
       private IModel _channel;
       private string _exchangeName;
       private string _routingKey;

       public DelayedExchangeProducer(IOptions<RabbitOptions> options)
       {
           _rabbitOptions = options.Value;
           _channel = RabbitHelper.CreateConnection(_rabbitOptions).CreateModel();
       }

       public void Init(string exchangeName, string queueName, string routingKey)
       {
           _exchangeName = exchangeName;
           var args = new Dictionary<string, object>
           {
               {"x-delayed-type", "direct"}
           };
           _channel.ExchangeDeclare(_exchangeName, "x-delayed-message", true, false, args);
       }

       public void PushMessage(string message)
       {

       }

       public void PushMessage(string routeKey, object message)
       {
           _routingKey = routeKey;
           var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
           var properties = _channel.CreateBasicProperties();
           properties.Headers = new Dictionary<string, object>
           {
               {"x-delay", 20000} // 消息延迟 20000 毫秒
           };
           Console.WriteLine($"{DateTime.Now.ToString()}\n 发送消息:{JsonConvert.SerializeObject(message)}");
           _channel.BasicPublish(_exchangeName, _routingKey,properties,body);
       }
   }

2. 消费者 DelayedConsumerService.cs

   public class DelayedConsumerService : RabbitConsumerService
   {
       public DelayedConsumerService(IOptions<RabbitOptions> options) : base(options)
       {
           QueueName = "queue.delay";
           ExchangeName = "ex.delay.test";
           RouteKey = "delay.test";
       }

       public override void Register()
       {
           Channel.QueueDeclare(QueueName, true, false, false, null);
           Channel.QueueBind(QueueName, ExchangeName, RouteKey);
           Channel.BasicQos(0, 1, false);
           var consumer = new EventingBasicConsumer(Channel);
           consumer.Received += (model, ea) =>
           {
               string message = Encoding.UTF8.GetString(ea.Body.ToArray());
               Console.WriteLine(DateTime.Now.ToString());
               Console.WriteLine(
                   $"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
               if (Process(message))
                   Channel.BasicAck(ea.DeliveryTag, false);
               else
                   Channel.BasicNack(ea.DeliveryTag, false, true);
           };
           Channel.BasicConsume(QueueName, false, consumer);
       }

       public override bool Process(string message)
       {
           return true;
       }
   }

3. Program.cs

   // 注册后台服务
   builder.Services.AddHostedService<DelayedConsumerService>();
   // 依赖注入 IRabbitProducer
   builder.Services.AddSingleton<IRabbitProducer, DelayedExchangeProducer>();

4. 测试

   _rabbitProducer.Init("ex.delay.test","queue.delay","");
   _rabbitProducer.PushMessage("delay.test",$"延迟消息测试!");

5. 结果:

   测试生产者代码:
    06/26/2022 20:55:00
    发送消息:"延迟消息测试!"

   // 消费者:
   06/26/2022 20:55:20
   Queue:queue.delay       Exchange: ex.delay.test     RoutingKey: delay.test      BindingKey:delay.test   Message: "延迟消息测试!"
查看已发送到exchange的延迟消息数量:

image-20220626210354884.png

image-20220626210443550.png

延迟消息插件优点

  • 不需要为延迟消息单独创建单独的路由、交换器、队列

延迟消息插件的缺点

  • 不支持对已发送消息进行管理,只能在Web管理页面查看发送的数量DM
  • 集群中只有一个副本(保存在当前节点下的Mnesia表中),如果节点不可用或关闭插件会丢失消息
  • 目前该插件只支持 disk 节点,不支持ram 节点
  • 性能比原生的差一点(普通的Exchange收到消息后直接路由到队列,而延迟队列需要判断消息是否过期,未过期的需要保存在表中,时间到了再捞出来路由)