Skip to main content

Command Palette

Search for a command to run...

RabbitMQ—Publisher Confirms (发布者确认)

Updated
2 min read
E

Science is gold.

channel 支持 publisher confirms

Publisher confirms 是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,因此不是默认支持的。 要使用channel中的 ConfirmSelect方法开启发布者确认:

var channel = connection.CreateModel();
channel.ConfirmSelect();

几种方案

策略1: Publishing Messages Individually(单独发布消息)

发布确认最简单的方式就是发布一条消息然后同步等待确认

while(TrereAreMessageToPublish())
{
  byte[] body = ...;
  IBasicProperties properties = ...;
  channel.BasicPublish(exchange,queue,properties,body);
  // 5s 超时
  channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}

上面的例子中,发布消息通过使用channel.WaitForConfirmsOrDie(TimeSpan)方法等待消息确认。 消息一旦被确认方法将获得返回值。如果消息超时时间内未被确认或者消息被拒绝,该方法会抛出异常。

该策略有一个很大的弊端就是发布时间明显变慢,因为确认消息时会阻塞后续要发布的消息。该方案每秒能发布的消息数量不超过几百条,但对于某些应用来说也足够了。

策略2:Publishing Message in Batches(批量发布消息)

为了优化上一个方案,我们可以发布一批消息,然后等待这批消息被确认。

var batchSize =100;
var outstandingMessageCount = 0; // 未确认/解决的消息数量
while(ThereAreMessageToPublish())
{
  byte[] body = ...;
  IBasicProperties properties = ...;
  channel.BasicPublish(exchange, queue, properties, body);
  outstandingMessageCount ++ ;
  if(outstandingMessageCount == batchSize)
  {
    channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
    outstandingMessageCount = 0 ;
  }
}
if(outstandingMessageCount>0)
{
  channel.WaitForConfirmsOrDie(new TimeSpan(0,0,5));
}

批量发布确认消息比上一个方案吞吐量有了明显提高(一个 Rabbit MQ 远程节点提升 20~30 倍)。

但缺点是我们不知道发生错误失败是具体原因,所以必须把整批数据放在内存里记录日志或者重发消息。并且这个方案仍然是同步的,还会阻塞正在发送的消息。

策略3: Handling Publisher Confirms Asynchronously(异步处理发布者确认)

通过 broker 异步确认消息,只需要在客户端注册一个回调事件用来通知消息的确认情况:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
{
  // 消息被确认的代码
};
channel.BasicNacks += (sender,ea) => 
{
  // 消息否定应答时的代码
}

一共有两个回调:消息被确认消息无应答(可以认为消息丢失)。2个回调都包含一个对应的 EventArg参数(ea),它包含:

  • delivery tag:消息被确认或丢失的序列号。
  • multiple: 为 boolean 类型。 false 表示 只有一条消息被确认或者丢失。 True 表示所有具有较低或相等序列号的消息被确认/丢失

序列号可以在消息发布前通过 channel.NextPublishSeqNo 获得:

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange,queue,properties,body);

有个简单的办法把消息和序列号关联起来:使用字典 Dictionary。假设我们要发布 string 类型的消息:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange,queue,properties,Encoding.UTF8.GetBytes(body));

我们需要在消息确认时清空 Dictionary ,丢失时做一些日志记录

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

var cleanOutstandingCnfirms(ulong sequenceNumber, bool multiple)
{
  if(multiple)
  {
    var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
    foreach(var entry in confirmed)
    {
      outstandingConfirms.TryRemove(entry.Key,out _);
    }
  }else
  {
    outstandingConfirms.TryRemove(sequenceNumber);
  }
}

channel.BasicAcks += (sender,ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender,ea) => 
{
  outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
  Console.WriteLine($"Message with body {body} has been nack-ed.Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
  cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
}

// ... publishing code

如何跟踪未完成的确认?

我们的示例使用ConcurrentDictionary来跟踪未完成的确认。这种数据结构很方便有几个原因。它允许轻松地将序列号与消息相关联(无论消息数据是什么),并轻松地将条目清理到给定的序列 id(以处理多个确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,该线程应与发布线程保持不同。

综上所述,异步处理发布确认通常需要以下几步:

  • 提供一种将发布序列号与消息相关联的方法。
  • 在通道上注册确回调事件,以便在发布者确认/否定应答 以执行适当的操作时收到通知,例如记录或重新发布已确认的消息。在此步骤中,序列号与消息的关联机制也可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

关于消息重发

从相应的回调中重新发布 nack-ed 消息可能很诱人,但应该避免这种情况,因为确认回调是 I/O 线程中分派的,channel 不应该执行操作的。更好的解决方案是将消息排入内存队列,该队列由发布线程轮询。像ConcurrentQueue这样的类 将是在确认回调和发布线程之间传输消息的好选择。

总结

在一些场景下,确认消息成功发送到 broker 是必须的。Publisher Confirms 本质上是异步的,但也可以同步处理它们。没有明确的方法来实现发布者确认,典型的技术是:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,批量同步等待确认:简单,合理的吞吐量,但当出现问题时很难推理。
  • 异步处理:最好的性能和资源使用,在错误的情况下很好的控制,但可以参与正确实现。

⚠️注意:

批量发布实现起来很简单,但在发布者拒绝确认的情况下,很难知道哪些消息无法发送到代理。

异步处理发布者确认更涉及实现,但提供更好的粒度和更好地控制在发布消息被 nack 时执行的操作。

More from this blog

C# 标准性能测试高级用法(Benchmark)

在 C# 标准性能测试 已经告诉大家如何使用 BenchmarkDotNet 测试性能,本文会告诉大家高级的用法。 建议是创建一个控制台项目用来做性能测试,这个项目要求是 dotnet framework 4.6 以上,建议是 dotnet 7 的版本。使用这个项目引用需要测试的项目,然后在里面写测试的代码。 例如被测试项目有一个类 Foo 里面有一个叫 Lindexidb 的方法,接下来的任务是需要测试这个 Lindexidb 方法的性能 最简单的测试的代码 public class FooP...

Jan 9, 20247 min read

.NetCore 实践——HttpClientFactory[一]

HttpClientFactory介绍 HttpClientFactory 主要有下面的功能: 管理内部HttpMessageHandler 的生命周期,灵活应对资源问题和DNS刷新问题 支持命名话、类型化配置,集中管理配置,避免冲突。 灵活的出站请求管道配置,轻松管理请求生命周期 内置管道最外层和最内层日志记录器,有information 和 Trace 输出 核心对象: HttpClient HttpMessageHandler SocketsHttpHandler De...

Jan 9, 20245 min read

认识 MSBuild - 1

前言 很多人一谈到 MSBuild,脑子里就会出现 “XML”、“只能用 VS 的属性框图形界面操作”、“可定制性和扩展性差” 和 “性能低” 等印象,但实际上这些除了 “XML” 之外完全都是刻板印象:这些人用着 Visual Studio 提供的图形界面,就完全不愿意花个几分钟时间翻翻文档去理解 MSBuild 及其构建过程。 另外,再加上 vcxproj (Visual C++ 项目)的默认 MSBuild 构建文件写得确实谈不上好(默认只能项目粒度并行编译,想要源码级并行编译你得加钱),...

Jan 9, 20245 min read

.NetCore实战——工作单元模式(UnitOfWork):管理好你的事务

工作单元模式有如下几个特性: 1、使用同一上下文 2、跟踪实体的状态 3、保障事务一致性 我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化 接下来看一下代码 为了实现工作单元模式,这里定义了一个工作单元的接口 public interface IUnitOfWork : IDisposable { Task<int> SaveChangesAsync(CancellationToken cancellationToken = default); Task<b...

Jan 9, 20242 min read
E

Edward Chu's blog

41 posts