RabbitMQ—Publisher Confirms (发布者确认)
channel 支持 publisher confirms
Publisher confirms 是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此不是默认支持的。 要使用channel中的 ConfirmSelect
方法开启发布者确认:
var channel = connection.CreateModel();
channel.ConfirmSelect();
几种方案
策略1: Publishing Messages Individually(单独发布消息)
发布确认最简单的方式就是发布一条消息然后同步等待确认。
while(TrereAreMessageToPublish())
{
byte[] body = ...;
IBasicProperties properties = ...;
channel.BasicPublish(exchange,queue,properties,body);
// 5s 超时
channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}
上面的例子中,发布消息通过使用channel.WaitForConfirmsOrDie(TimeSpan)
方法等待消息确认。 消息一旦被确认方法将获得返回值。如果消息超时时间内未被确认或者消息被拒绝,该方法会抛出异常。
该策略有一个很大的弊端就是发布时间明显变慢,因为确认消息时会阻塞后续要发布的消息。该方案每秒能发布的消息数量不超过几百条,但对于某些应用来说也足够了。
策略2:Publishing Message in Batches(批量发布消息)
为了优化上一个方案,我们可以发布一批消息,然后等待这批消息被确认。
var batchSize =100;
var outstandingMessageCount = 0; // 未确认/解决的消息数量
while(ThereAreMessageToPublish())
{
byte[] body = ...;
IBasicProperties properties = ...;
channel.BasicPublish(exchange, queue, properties, body);
outstandingMessageCount ++ ;
if(outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
outstandingMessageCount = 0 ;
}
}
if(outstandingMessageCount>0)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}
批量发布确认消息比上一个方案吞吐量有了明显提高(一个 Rabbit MQ 远程节点提升 20~30 倍)。
但缺点是我们不知道发生错误失败是具体原因,所以必须把整批数据放在内存里记录日志或者重发消息。并且这个方案仍然是同步的,还会阻塞正在发送的消息。
策略3: Handling Publisher Confirms Asynchronously(异步处理发布者确认)
通过 broker
异步确认消息,只需要在客户端注册一个回调事件用来通知消息的确认情况:
var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
{
// 消息被确认的代码
};
channel.BasicNacks += (sender,ea) =>
{
// 消息否定应答时的代码
}
一共有两个回调:消息被确认
和消息无应答
(可以认为消息丢失)。2个回调都包含一个对应的 EventArg
参数(ea
),它包含:
delivery tag
:消息被确认或丢失的序列号。multiple
: 为 boolean 类型。 false 表示 只有一条消息被确认或者丢失。 True 表示所有具有较低或相等序列号的消息被确认/丢失
序列号可以在消息发布前通过 channel.NextPublishSeqNo
获得:
var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange,queue,properties,body);
有个简单的办法把消息和序列号关联起来:使用字典 Dictionary
。假设我们要发布 string
类型的消息:
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange,queue,properties,Encoding.UTF8.GetBytes(body));
我们需要在消息确认时清空 Dictionary ,丢失时做一些日志记录
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
var cleanOutstandingCnfirms(ulong sequenceNumber, bool multiple)
{
if(multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach(var entry in confirmed)
{
outstandingConfirms.TryRemove(entry.Key,out _);
}
}else
{
outstandingConfirms.TryRemove(sequenceNumber);
}
}
channel.BasicAcks += (sender,ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender,ea) =>
{
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed.Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
}
// ... publishing code
如何跟踪未完成的确认?
我们的示例使用
ConcurrentDictionary
来跟踪未完成的确认。这种数据结构很方便有几个原因。它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地将条目清理到给定的序列 id(以处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。
综上所述,异步处理发布确认通常需要以下几步:
- 提供一种将发布序列号与消息相关联的方法。
- 在通道上注册确回调事件,以便在发布者确认/否定应答 以执行适当的操作时收到通知,例如记录或重新发布已确认的消息。在此步骤中,序列号与消息的关联机制也可能需要进行一些清理。
- 在发布消息之前跟踪发布序列号。
关于消息重发
从相应的回调中重新发布 nack-ed 消息可能很诱人,但应该避免这种情况,因为确认回调是 I/O 线程中分派的,channel 不应该执行操作的。更好的解决方案是将消息排入内存队列,该队列由发布线程轮询。像ConcurrentQueue这样的类 将是在确认回调和发布线程之间传输消息的好选择。
总结
在一些场景下,确认消息成功发送到 broker 是必须的。Publisher Confirms 本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,典型的技术是:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,批量同步等待确认:简单,合理的吞吐量,但当出现问题时很难推理。
- 异步处理:最好的性能和资源使用,在错误的情况下很好的控制,但可以参与正确实现。
⚠️注意:
批量发布实现起来很简单,但在发布者拒绝确认的情况下,很难知道哪些消息无法发送到代理。
异步处理发布者确认更涉及实现,但提供更好的粒度和更好地控制在发布消息被 nack 时执行的操作。