RabbitMQ 高级特性--延时队列
延时队列
延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。
例如下面的业务场景:
在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果 15 分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
- 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
- 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
- 还可以用延迟队列,锁座成功后会发送 1 条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;
你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚 22 点准时参加会议。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟) 就会通知提醒参会人员做好参会准备,会议马上开始...
同样的,这也可以通过轮询"会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息, 而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。
可以使用死信队列完成吗?
如果队列中的消息 ttl 时间都不一样,用死信队列来完成延时队列的功能时会出现问题:假设第 1 条消息过期时间为 40s,第 2 条过期时间为 20s ,队列只检查队列头的消息,如果过期了就放入死信队列,不过期则不动,并不会检查后面的消息 ttl,当第 1 条消息 40s 后过期了,放入死信队列里。再判断第 2 条,但这个时候第 2 条消息已经过期了20s! 很明显不对
不过遗憾的是,在 AMQP 协议和 RabbitMO 中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列
"来变相的实现。
可以使用 rabbitmq_delayed_message_exchange
插件实现。
这里和 TTL 方式有个很大的不同就是 TTL 存放消息在死信队列里,而基于插件存放消息在延时交换器(x-delayed-message exchange
)里。
- 生产者将消息(msg)和路由键(routingKey)发送指定的延时交换机上
- 延时交换机存储消息等待消息到期根据路由键(routingKey)找到绑定自己的队列(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(consumer)
如何使用
下载插件
安装插件
将插件拷贝到 rabbitmq-server 的安装路径: /usr/lib/rabbitmq/lib/rabbitmq_server-<你的rabbit版本>/plugins
安装后查看
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启 rabbitmq-servver
systemctl restart rabbitmq-server
交换器应用
使用延迟消息交换器需要声明一个 x-delayed-message
类型的交换器,示例如下:
var args = new Dictionary<string, object>
{
{"x-delayed-type", "direct"}
};
_channel.ExchangeDeclare(exchangeName, "x-delayed-message", true, false, args);
上面的示例当我们声明一个交换器时,我们提供了一个x-delayed-type
参数,值设置为 direct
。这是想告诉交换器希望它路由消息的行为、绑定等等像 direct
类型交换器一样;在上面示例中,我们的交换器就像direct
交换器一样。我们也可以传递 topic
、fanout
或者其它插件提供的自定义交换器类型。
发布延迟消息
用户必须使用名为x-delay
的特殊 header
发布延迟消息,该 header
需要一个整数,表示 RabbitMQ 应延迟消息的毫秒数
。值得注意的是,这里的延迟意味着消息延迟路由到队列或其它交换器。
exhange(交换器)没有消费者的概念。因此,一旦延迟过期,插件将尝试将消息路由到与 exchange 的路由规则匹配的队列。请注意,如果消息不能路由到任何队列,那么它将被丢弃。
以下是添加x-delay
头(header)到消息并且发布到 exchange 的示例代码:
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
{"x-delay", 5000} // 消息延迟 5000 毫秒
};
_channel.BasicPublish(_exchangeName,"",properties,body);
上面的示例中,消息在被插件路由之前将被延迟5秒钟。该示例假设你已经建立了到RabbitMQ的连接并获得了一个信道。
完整代码
1. 生产者 DelayedExchangeProducer.cs
public class DelayedExchangeProducer: IRabbitProducer
{
private RabbitOptions _rabbitOptions;
private IModel _channel;
private string _exchangeName;
private string _routingKey;
public DelayedExchangeProducer(IOptions<RabbitOptions> options)
{
_rabbitOptions = options.Value;
_channel = RabbitHelper.CreateConnection(_rabbitOptions).CreateModel();
}
public void Init(string exchangeName, string queueName, string routingKey)
{
_exchangeName = exchangeName;
var args = new Dictionary<string, object>
{
{"x-delayed-type", "direct"}
};
_channel.ExchangeDeclare(_exchangeName, "x-delayed-message", true, false, args);
}
public void PushMessage(string message)
{
}
public void PushMessage(string routeKey, object message)
{
_routingKey = routeKey;
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
var properties = _channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
{"x-delay", 20000} // 消息延迟 20000 毫秒
};
Console.WriteLine($"{DateTime.Now.ToString()}\n 发送消息:{JsonConvert.SerializeObject(message)}");
_channel.BasicPublish(_exchangeName, _routingKey,properties,body);
}
}
2. 消费者 DelayedConsumerService.cs
public class DelayedConsumerService : RabbitConsumerService
{
public DelayedConsumerService(IOptions<RabbitOptions> options) : base(options)
{
QueueName = "queue.delay";
ExchangeName = "ex.delay.test";
RouteKey = "delay.test";
}
public override void Register()
{
Channel.QueueDeclare(QueueName, true, false, false, null);
Channel.QueueBind(QueueName, ExchangeName, RouteKey);
Channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(Channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(DateTime.Now.ToString());
Console.WriteLine(
$"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}");
if (Process(message))
Channel.BasicAck(ea.DeliveryTag, false);
else
Channel.BasicNack(ea.DeliveryTag, false, true);
};
Channel.BasicConsume(QueueName, false, consumer);
}
public override bool Process(string message)
{
return true;
}
}
3. Program.cs
// 注册后台服务
builder.Services.AddHostedService<DelayedConsumerService>();
// 依赖注入 IRabbitProducer
builder.Services.AddSingleton<IRabbitProducer, DelayedExchangeProducer>();
4. 测试
_rabbitProducer.Init("ex.delay.test","queue.delay","");
_rabbitProducer.PushMessage("delay.test",$"延迟消息测试!");
5. 结果:
测试生产者代码:
06/26/2022 20:55:00
发送消息:"延迟消息测试!"
// 消费者:
06/26/2022 20:55:20
Queue:queue.delay Exchange: ex.delay.test RoutingKey: delay.test BindingKey:delay.test Message: "延迟消息测试!"
查看已发送到exchange的延迟消息数量:
延迟消息插件优点
- 不需要为延迟消息单独创建单独的路由、交换器、队列
延迟消息插件的缺点
- 不支持对已发送消息进行管理,只能在Web管理页面查看发送的数量DM
- 集群中只有一个副本(保存在当前节点下的Mnesia表中),如果节点不可用或关闭插件会丢失消息
- 目前该插件只支持
disk
节点,不支持ram
节点 - 性能比原生的差一点(普通的Exchange收到消息后直接路由到队列,而延迟队列需要判断消息是否过期,未过期的需要保存在表中,时间到了再捞出来路由)