RabbitMQ—Publisher Confirms (发布者确认)

channel 支持 publisher confirms

Publisher confirms 是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此不是默认支持的。 要使用channel中的 ConfirmSelect方法开启发布者确认:

var channel = connection.CreateModel();
channel.ConfirmSelect();

几种方案

策略1: Publishing Messages Individually(单独发布消息)

发布确认最简单的方式就是发布一条消息然后同步等待确认

while(TrereAreMessageToPublish())
{
  byte[] body = ...;
  IBasicProperties properties = ...;
  channel.BasicPublish(exchange,queue,properties,body);
  // 5s 超时
  channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}

上面的例子中,发布消息通过使用channel.WaitForConfirmsOrDie(TimeSpan)方法等待消息确认。 消息一旦被确认方法将获得返回值。如果消息超时时间内未被确认或者消息被拒绝,该方法会抛出异常。

该策略有一个很大的弊端就是发布时间明显变慢,因为确认消息时会阻塞后续要发布的消息。该方案每秒能发布的消息数量不超过几百条,但对于某些应用来说也足够了。

策略2:Publishing Message in Batches(批量发布消息)

为了优化上一个方案,我们可以发布一批消息,然后等待这批消息被确认。

var batchSize =100;
var outstandingMessageCount = 0; // 未确认/解决的消息数量
while(ThereAreMessageToPublish())
{
  byte[] body = ...;
  IBasicProperties properties = ...;
  channel.BasicPublish(exchange, queue, properties, body);
  outstandingMessageCount ++ ;
  if(outstandingMessageCount == batchSize)
  {
    channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
    outstandingMessageCount = 0 ;
  }
}
if(outstandingMessageCount>0)
{
  channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}

批量发布确认消息比上一个方案吞吐量有了明显提高(一个 Rabbit MQ 远程节点提升 20~30 倍)。

但缺点是我们不知道发生错误失败是具体原因,所以必须把整批数据放在内存里记录日志或者重发消息。并且这个方案仍然是同步的,还会阻塞正在发送的消息。

策略3: Handling Publisher Confirms Asynchronously(异步处理发布者确认)

通过 broker 异步确认消息,只需要在客户端注册一个回调事件用来通知消息的确认情况:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
{
  // 消息被确认的代码
};
channel.BasicNacks += (sender,ea) => 
{
  // 消息否定应答时的代码
}

一共有两个回调:消息被确认消息无应答(可以认为消息丢失)。2个回调都包含一个对应的 EventArg参数(ea),它包含:

  • delivery tag:消息被确认或丢失的序列号。
  • multiple: 为 boolean 类型。 false 表示 只有一条消息被确认或者丢失。 True 表示所有具有较低或相等序列号的消息被确认/丢失

序列号可以在消息发布前通过 channel.NextPublishSeqNo 获得:

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange,queue,properties,body);

有个简单的办法把消息和序列号关联起来:使用字典 Dictionary。假设我们要发布 string 类型的消息:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange,queue,properties,Encoding.UTF8.GetBytes(body));

我们需要在消息确认时清空 Dictionary ,丢失时做一些日志记录

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

var cleanOutstandingCnfirms(ulong sequenceNumber, bool multiple)
{
  if(multiple)
  {
    var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
    foreach(var entry in confirmed)
    {
      outstandingConfirms.TryRemove(entry.Key,out _);
    }
  }else
  {
    outstandingConfirms.TryRemove(sequenceNumber);
  }
}

channel.BasicAcks += (sender,ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender,ea) => 
{
  outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
  Console.WriteLine($"Message with body {body} has been nack-ed.Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
  cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
}

// ... publishing code

如何跟踪未完成的确认?

我们的示例使用ConcurrentDictionary来跟踪未完成的确认。这种数据结构很方便有几个原因。它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地将条目清理到给定的序列 id(以处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。

综上所述,异步处理发布确认通常需要以下几步:

  • 提供一种将发布序列号与消息相关联的方法。
  • 在通道上注册确回调事件,以便在发布者确认/否定应答 以执行适当的操作时收到通知,例如记录或重新发布已确认的消息。在此步骤中,序列号与消息的关联机制也可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

关于消息重发

从相应的回调中重新发布 nack-ed 消息可能很诱人,但应该避免这种情况,因为确认回调是 I/O 线程中分派的,channel 不应该执行操作的。更好的解决方案是将消息排入内存队列,该队列由发布线程轮询。像ConcurrentQueue这样的类 将是在确认回调和发布线程之间传输消息的好选择。

总结

在一些场景下,确认消息成功发送到 broker 是必须的。Publisher Confirms 本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,典型的技术是:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,批量同步等待确认:简单,合理的吞吐量,但当出现问题时很难推理。
  • 异步处理:最好的性能和资源使用,在错误的情况下很好的控制,但可以参与正确实现。

⚠️注意:

批量发布实现起来很简单,但在发布者拒绝确认的情况下,很难知道哪些消息无法发送到代理。

异步处理发布者确认更涉及实现,但提供更好的粒度和更好地控制在发布消息被 nack 时执行的操作。