RabbitMQ 消息模型及在.Net6下实现
1. RabbitMQ介绍、概念、基本架构
1.1 RabbitMQ介绍
RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一-款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。
- 高可靠性、易扩展、高可用、功能丰富等
支持大多数(甚至冷门)的编程语言客户端。
RabbitMQ 遵循 AMQP 协议,自身采用 Erlang (一种由爱立信开发的通用面向并发编程的语言)编写。
- RabbitMQ 也支持 MQTT 等其他协议。
- RabbitMQ 具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择: rabbitmg.com/community:plugins.html
1.2 RabbitMQ整体逻辑架构
1.3 RabbitMQ Exchange
RabbitMQ 常用的交换器类型有: fanout
、 direct
、 topic
、 headers
四种。
Fanout
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:
Direct
direct类型的交换器路由规则很简单,它会把消息路由到那些 BindingKey
和 RoutingKey
完全匹配的队列中,
如下图:
Topic
topic 类型的交换器在 direct 匹配规则上进行了扩展,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,这里的匹配规则稍微不同,它约定:
BindingKey 和 RoutingKey 一样都是由".
"分隔的字符串;
BindingKey 中可以存在两种特殊字符”*
“和”#
", 用于模糊匹配,其中"*
"用于匹配一个单词
,"#
" 用于匹配多个单词(可以是0个)
Headers
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对, 当发送的消息到交换器时,RabbitMQ会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。
headers类型的交换器性能很差, 不实用。
1.4 RabbitMQ 数据存储
存储机制
RabbitMQ 消息有两种类型:
持久化消息
和非持久化消息
,这两种消息都会被写入磁盘。
2. RabbitMQ常用操作命令
# 前台启动 Erlang VM 和 RabbitMQ
rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 停止 RabbitMQ
rabbitmqctl stop
# 查看所有队列
rabbitmqctl list_queues
# 查看所有虚拟主机
rabbitmqctl list_vhosts
# 在 Erlang VM 运行的情况下启动和关闭 RabbitMQ 应用
rabbitmqctl start_app
rabbitmqctl stop_app
# 查看节点状态
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 修改密码
rabbitmqctl change_password username newpassword
# 设置用户权限
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 设置用户标签
rabbitmqctl set_user_tags username tagname
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
3. RabbitMQ 工作流程详解
###3.1 生产者发送消息的过程
- 生产者连接 RabbitMQ,建立 TCP 连接(Connection),开启信道(Channel)
- 生产者声明一个 Exchange (交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过
routingKey
(路由Key) 将交换器和队列绑定(binding
) 起来 - 生产者发送消息至 RabbitMQ Broker,其中包含
routingkey
(路由键)、交换器等信息 - 相应的交换器根据接收到的
routingKey
查找相匹配的队列。 - 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
3.2 消费者接收消息的过程
- 消费者连接到 RabbitMQ Broker,建立一个连接(Connection) ,开启一一个信道(Channel) 。
- 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
消费者确认(ack)接收到的消息。
RabbitMQ从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
3.3 Connection 和 Channel 关系
生产者和消费者,需要与 RabbitMQ Broker 建立TCP连接,也就是 Connection。 一旦TCP连接建立起来,客户端紧接着创建一个AMQP 信道(Channel) ,每个信道都会被指派一
个唯一 的ID。 信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条AMQP指令都是通过信道完成的。
为什么不直接使用TCP连接,而是使用信道?
RabbitMQ 采用类似 NIO(java) 、异步Socket(C#) 的做法,复用TCP连接,减少性能开销,便于管理。
当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省TCP连接资源。
当信道本身的流量很大时,一个 Connection 就会产生性能瓶颈,流量被限制。需要建立多个 Connection,分摊信道。具体的调优看业务需要。
信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
channel.ExchangeDeclare
channel.QueueDeclare
channel.BasicPublish
channel.BasicConsume
...
4. RabbitMQ 工作模式
官网地址:https://www.rabbitmq.com/#getstarted
RabbitMQ 支持 7 种模式:HelloWorld
、Work queues
、Publish/Subccribe
、Routing
、Topics
、RPC
、Publisher Confirms
下面的工作模式代码实现中,我们在 .NET6 中完成:
因为消费者要一直保持接收消息,我们使用后台服务RabbitConsumerService.cs
来实现;
生产者实现自定义接口IRabbitProducer.cs
// RabbitConcumerService.cs
public abstract class RabbitConsumerService : IHostedService
{
private readonly IConnection _connection;
protected readonly IModel _channel;
protected string RouteKey;
protected string QueueName;
public RabbitConsumerService(IOptions<RabbitOptions> options)
{
try
{
ConnectionFactory factory = new ConnectionFactory
{
HostName = options.Value.HostName,
Port = options.Value.Port,
UserName = options.Value.UserName,
Password = options.Value.Password
// VirtualHost = "/"
};
// factory.Uri=new Uri("amqp://admin:admin@localhost:5672/%2f");
this._connection = factory.CreateConnection();
this._channel = _connection.CreateModel();
}
catch (Exception e)
{
Console.WriteLine($"RabbitConsumerService init error, ex:{e.Message}");
}
}
public abstract void Register();
public virtual bool Process(string message)
{
Console.WriteLine($"接受到的消息为:{message}");
return true;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Register();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
this._channel.Dispose();
this._connection.Close();
return Task.CompletedTask;
}
}
IRabbitProducer.cs
:
public interface IRabbitProducer
{
// 初始化 RabbitMQ
public void Init(string exchangeName, string queueName, string routingKey);
public void PushMessage(string message);
public void PushMessage(string routeKey, object message);
}
rabbitMQ配置类:RabbitOptions.cs
// RabbitMQ 信息配置
public class RabbitOptions
{
public string HostName { get; set; }
public int Port { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
Program.cs
// 选项框架配置
builder.Services.Configure<RabbitOptions>(builder.Configuration.GetSection("RabbitMQ"));
4.1 Hello World(简单模式)
发送单一消息。
代码
生产者
// 生产者 消息模型:HelloWorld public class RabbitProducer : IRabbitProducer { private IModel _channel; private string QueueName; private string ExchangeName; private string RoutingKey; private readonly RabbitOptions _rabbitOptions; public RabbitProducer(IOptions<RabbitOptions> options) { Console.WriteLine("初始化 RabbitProducer !"); _rabbitOptions = options.Value; } public void Init(string exchangeName, string queueName, string routingKey) { try { ExchangeName = exchangeName; QueueName = queueName; RoutingKey = routingKey; var factory = new ConnectionFactory { HostName = _rabbitOptions.HostName, Port = _rabbitOptions.Port, UserName = _rabbitOptions.UserName, Password = _rabbitOptions.Password }; var connection = factory.CreateConnection(); this._channel = connection.CreateModel(); _channel.QueueDeclare( queue: QueueName, // 消息队列名称 durable: false, // 是否持久化,true 持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息 exclusive: false, // 是否排他,true 排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false, // 是否自动删除,true 是自动删除。自动删除的前提:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 arguments: null // 设置队列的一些其他参数 ); } catch (Exception e) { Console.WriteLine($"RabbitProducer init fail: {e.Message}"); } } public void PushMessage(string message) { Console.WriteLine($"ExchangeName: {ExchangeName}, QueueName:{QueueName}"); Console.WriteLine($"生产者发送内容:{message}"); // 消息内容 byte[] body = Encoding.UTF8.GetBytes(message); // 发送消息 _channel.BasicPublish(ExchangeName, RoutingKey, null, body); } public void PushMessage(string routeKey, object message) { } }
消费者
public class HelloWorldRabbitService : RabbitConsumerService { public HelloWorldRabbitService(IOptions<RabbitOptions> options) : base(options) { base.QueueName = "hello"; } public override void Register() { // 声明一个队列 base._channel.QueueDeclare( queue: QueueName, // 消息队列名称 durable: false, // 是否持久化,true 持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息 exclusive: false, // 是否排他,true 排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false, // 是否自动删除,true 是自动删除。自动删除的前提:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 arguments: null // 设置队列的一些其他参数 ); // 创建消费者 var consumer = new EventingBasicConsumer(base._channel); consumer.Received += (model, ea) => { Thread.Sleep(10000); byte[] message = ea.Body.ToArray(); bool result = Process(Encoding.UTF8.GetString(message)); }; // 消费者开启监听 _channel.BasicConsume(QueueName, true, consumer); } }
Program.cs
// 注册后台服务 builder.Services.AddHostedService<HelloWorldRabbitService>(); // 依赖注入 IRabbitProducer builder.Services.AddSingleton<IRabbitProducer, RabbitProducer>();
测试:
Console.WriteLine("测试生产者代码:"); var service = app.Services.GetService<IRabbitProducer>(); if (service == null) Console.WriteLine("RabbitProducer 为空"); else { service.Init("", "hello", "hello"); service.PushMessage("在吗?"); service.PushMessage("在干嘛?"); service.PushMessage("哈哈哈 我在学习 rabbitmq ~"); }
结果:
// 生产者: 初始化 RabbitProducer ! ExchangeName: , QueueName:hello 生产者发送内容:在吗? ExchangeName: , QueueName:hello 生产者发送内容:在干嘛? ExchangeName: , QueueName:hello 生产者发送内容:哈哈哈 我在学习 rabbitmq ~ // 消费者: 接受到的消息为:在吗? 接受到的消息为:在干嘛? 接受到的消息为:哈哈哈 我在学习 rabbitmq ~
4.2 Work Queue(工作队列)
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们将任务安排在以后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多 worker 时,任务将在他们之间共享。
循环调度(Round-Robin dispatching)
默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。不是每个消息 所有worker 都可以接收到。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
如上图中,
第 1 条消息会发送给 C1,
第 2 条消息会发送给 C2,
第 3 条消息会发送给 C1,
第 4 条消息会发送给 C2
...
以此类推
消息确认(Message acknowledgment)
完成一项任务可能需要几秒钟。您可能想知道如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。
使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。
在这种情况下,如果你结束一个 worker,我们将丢失它刚刚处理的消息。我们还将丢失所有发送给该特定 worker 但尚未处理的消息。但是我们不想丢失任何任务。如果一个 worker 终止,我们希望将任务交付给另一个 worker。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认
(message acknowledgment)。一个 ack 由消费者发回,告诉 RabbitMQ 一个特定的消息已经被接收、处理并且 RabbitMQ 可以自由地删除它。
如果消费者在没有发送 ack 的情况下异常(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。如果同时有其他消费者在线,它会迅速将其重新发送给另一个消费者。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
// Note: it is possible to access the channel via
// ((EventingBasicConsumer)sender).Model here
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
消息持久性(Message durability)
使用手动消息确认,可以保证即使消费者宕机,消息也不会丢失。但是如果 RabbitMQ 服务器停止,任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。确保消息不会丢失需要做两件事:我们需要将 Queue
和 Message
都标记为持久的。
首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为可持久化的:
channel.QueueDeclare(queue: "task_queue",
durable: true, // durable 为 true,代表可持久化
exclusive: false,
autoDelete: false,
arguments: null);
即使 RabbitMQ 重启, task_queue队列也不会丢失
现在我们需要将我们的消息标记为持久--通过将IBasicProperties.Persistent
设置为 true
// producer 设置消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
关于消息持久性的注意事项
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉 RabbitMQ 将消息保存到磁盘,但是当 RabbitMQ 接受消息并且还没有保存它时,仍然有很短的时间窗口。此外,RabbitMQ 不会对每条消息都执行
fsync(2)
——它可能只是保存到缓存中而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经绰绰有余了。如果您需要更强的保证,那么您可以使用 Publisher confirms。
公平调度(Fair dispatch)
Work queue 默认是轮询分发消息给 worker,会有以下场景问题:例如,在有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人会一直很忙,而另一个工人几乎不会做任何工作。RabbitMQ 对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不查看消费者未确认消息的数量。它只是盲目地将第 n 个消息发送给第 n 个消费者。
为了解决这个问题,我们可以调用BasicQos
方法,将prefetchCount
= 1
。
这告诉 RabbitMQ 一次不要给一个 worker 多条消息。换句话说,在 worker 处理并确认之前的消息之前,不要向工作人员发送新消息。相反,它将把它分派给下一个不忙的 worker。
// 消费者设置 Qos
channel.BasicQos(0, 1, false);
关于队列大小的注意事项
如果所有工作人员都很忙,您的队列可能会被填满。你会想要关注这一点,可能会增加更多的工人,或者有一些其他的策略。
使用消息确认和 BasicQos ,可以设置工作队列即使 RabbitMQ 重新启动,持久性选项也能让任务继续存在。
代码
生产者
WorkerQueueProducer.cs
:public class WorkerQueueProducer: IRabbitProducer { private IModel _channel; private string QueueName; private string ExchangeName; private string RoutingKey; private readonly RabbitOptions _rabbitOptions; public WorkerQueueProducer(IOptions<RabbitOptions> options) { Console.WriteLine("初始化 WorkerQueueProducer!"); _rabbitOptions = options.Value; } public void Init(string exchangeName, string queueName, string routingKey) { try { ExchangeName = exchangeName; QueueName = queueName; RoutingKey = routingKey; var factory = new ConnectionFactory { HostName = _rabbitOptions.HostName, Port = _rabbitOptions.Port, UserName = _rabbitOptions.UserName, Password = _rabbitOptions.Password }; var connection = factory.CreateConnection(); this._channel = connection.CreateModel(); // 设置消息持久化 var properties=_channel.CreateBasicProperties(); properties.Persistent = true; _channel.QueueDeclare( queue: QueueName, // 消息队列名称 durable: true, // 是否持久化,true 持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息 exclusive: false, // 是否排他,true 排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false, // 是否自动删除,true 是自动删除。自动删除的前提:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 arguments: null // 设置队列的一些其他参数 ); } catch (Exception e) { Console.WriteLine($"RabbitProducer init fail: {e.Message}"); } } public void PushMessage(string message) { Console.WriteLine($"ExchangeName: {ExchangeName}, QueueName:{QueueName}"); Console.WriteLine($"生产者发送内容:{message}"); var body = Encoding.UTF8.GetBytes(message); _channel.BasicPublish(ExchangeName,RoutingKey,null,body); } public void PushMessage(string routeKey, object message) { throw new NotImplementedException(); } }
消费者
我们创建2个消费者服务
WorkerConsumerService.cs
和WorkerConsumerService2.cs
public class WorkerConsumerService: RabbitConsumerService { public WorkerConsumerService(IOptions<RabbitOptions> options) : base(options) { base.QueueName = "task_queue"; } public override void Register() { _channel.QueueDeclare( QueueName, true, // 是否持久化,true 持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息 false, false, null ); // 消费者设置 Qos,告诉 Rabbit 每次只能向消费者发送 1 条消息,消费者未确认之前,不再向他发送信息 _channel.BasicQos(0,1,false); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { Thread.Sleep(10000); var message = Encoding.UTF8.GetString(ea.Body.ToArray()); if(Process(message)) _channel.BasicAck(ea.DeliveryTag,false); // 消息处理完后,手动 ACK 确认 }; _channel.BasicConsume(QueueName,false,consumer); } public override bool Process(string message) { Console.WriteLine($"{nameof(WorkerConsumerService)} 收到消息:{message}"); return true; } }
// WorkerConsumerService2.cs 里只把Qos prefetchCount 改为 2,其他和 WorkerConsumerService 一致 // 消费者设置 Qos,告诉 Rabbit 每次只能向消费者发送 1 条消息,消费者未确认之前,不再向他发送信息 _channel.BasicQos(0,1,false);
Program.cs
// 注册后台服务 builder.Services.AddHostedService<WorkerConsumerService>(); builder.Services.AddHostedService<WorkerConsumerService2>(); // 依赖注入 IRabbitProducer builder.Services.AddSingleton<IRabbitProducer, WorkerQueueProducer>();
测试:
Console.WriteLine("测试生产者代码:"); Console.WriteLine("测试生产者代码:"); var service = app.Services.GetService<IRabbitProducer>(); if (service == null) Console.WriteLine("RabbitProducer 为空"); else { service.Init("","task_queue","task_queue"); service.PushMessage("message1"); service.PushMessage("message2"); service.PushMessage("message3"); service.PushMessage("message4"); service.PushMessage("message5"); }
输出结果
// 生产者 测试生产者代码: 初始化 WorkerQueueProducer! ExchangeName: , QueueName:task_queue 生产者发送内容:message1 ExchangeName: , QueueName:task_queue 生产者发送内容:message2 ExchangeName: , QueueName:task_queue 生产者发送内容:message3 ExchangeName: , QueueName:task_queue 生产者发送内容:message4 ExchangeName: , QueueName:task_queue 生产者发送内容:message5 // 消费者 WorkerConsumerService2 收到消息:message2 WorkerConsumerService 收到消息:message1 WorkerConsumerService2 收到消息:message3 WorkerConsumerService 收到消息:message5 WorkerConsumerService2 收到消息:message4 // 可以看到,worker2 处理了3条消息,worker1 处理了2条消息 // 符合我们设置的 Qos(能者多劳)
4.3 Publish/Subscribe(发布/订阅)
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
相反,生产者只能向交换器(exchange)发送消息。
这里我们使用fanout
类型交换器,routingKey 忽略。 每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
fanout 交换器广播它接受的所有消息给素有绑定的队列
默认交换器
之前的 HelloWorld 和 WorkQueue 模式并没声明交换器仍然可以向队列发送消息。
这是因为使用了默认交换器
""
(定义为空交换器)再次看下之前发送消息时的代码:
_channel.BasicPublish( exchange:"", routingKey:"hello", null, body );
第一个参数是 exchange 的名称。空字符串表示默认:消息被路由到具有
routingKey
名称的队列(如果队列存在)。
临时队列(Temporary queues)
您可能还记得之前我们使用具有特定名称的队列(还记得hello
和task_queue
吗?)。能够命名队列对我们来说至关重要——我们需要将 worker 指向同一个队列。当您想在生产者和消费者之间共享队列时,为队列命名很重要。
但 发布/订阅 模式并非如此,我们希望了解所有消息,而不仅仅是其中的一部分。我们也只对当前流动的消息感兴趣,而不是对旧消息感兴趣。为了解决这个问题,我们需要两件事。
- 每当我们连接到 Rabbit 时,我们都需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,甚至最好让服务器为我们选择一个随机队列名称。
- 一旦我们断开消费者的连接,队列应该会被自动删除。
在 .NET 客户端中,当我们不向QueueDeclare()
提供任何参数时, 我们会创建一个具有生成名称的非持久、独占、自动删除队列:
var queueName = channel.QueueDeclare().QueueName;
此时queueName
包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
绑定(Bindings)
我们已经创建了一个 fanout 交换器和一个队列。现在我们需要告诉交换机向我们的队列发送消息。交换和队列之间的这种关系称为绑定
。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
现在logs
交换会将消息附加到我们的队列中。
Listing bindings
可以使用下面命令查看绑定情况
rabbitmqctl list_bindings
生产者发出消息和之前的看起来没有太大差别。最大的区别是将消息发布到logs
交换器而不是默认交换器""
。
我们需要在发送时提供routingKey
,但它的值在fanout
交换器时会被忽略。
代码
生产者
// 发布/订阅 模式,生产者声明 fanout类型的交换器后,发送消息会丢失。因为此时消费者还未将队列与交换器进行绑定,绑定成功后的消息正常发送无丢失 public class PublishDescribeProducer : IRabbitProducer { private IModel _channel; private string ExchangeName; private readonly RabbitOptions _options; public PublishDescribeProducer(IOptions<RabbitOptions> options) { Console.WriteLine("初始化 PublishDescribeProducer!"); _options = options.Value; } public void Init(string exchangeName, string queueName, string routingKey) { try { ExchangeName = exchangeName; var factory = new ConnectionFactory { HostName = _options.HostName, Port = _options.Port, UserName = _options.UserName, Password = _options.Password }; var connection = factory.CreateConnection(); _channel = connection.CreateModel(); _channel.ExchangeDeclare(ExchangeName, ExchangeType.Fanout, false, false, null); } catch (Exception e) { Console.WriteLine($"RabbitProducer init fail: {e.Message}"); } } public void PushMessage(string message) { Console.WriteLine($"ExchangeName: {ExchangeName}"); Console.WriteLine($"生产者发送内容:{message}"); var body = Encoding.UTF8.GetBytes(message); _channel.BasicPublish(ExchangeName, "", null, body); } public void PushMessage(string routeKey, object message) { throw new NotImplementedException(); } }
代码与上面没有什么差异,只是由上面的消息队列声明变成了交换机声明(交换机类型为fanout),也就说发送者发送消息从原来的直接发送消息队列变成了发送到交换机
消费者
我们创建2个消费者服务
PublishDescribeConsumerService.cs
和PublishDescribeConsumerService2.cs
public class PublishDescribeConsumerService: RabbitConsumerService { public PublishDescribeConsumerService(IOptions<RabbitOptions> options) : base(options) { ExchangeName = "logs"; } public override void Register() { _channel.ExchangeDeclare(ExchangeName,ExchangeType.Fanout,false,false,null); QueueName = _channel.QueueDeclare().QueueName; _channel.QueueBind(QueueName,ExchangeName,""); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { Thread.Sleep(10000); Console.WriteLine($"{QueueName} 接受到消息"); var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Process(message); }; _channel.BasicConsume(QueueName, true, consumer); } }
PublishDescribeConsumerService2.cs 代码内容与上面相同
- 可以看到消费者代码与上面有些差异
- 首先是声明交换机(同上面一样,为了防止异常)
- 然后声明消息队列并对交换机进行绑定,在这里使用了默认声明队列的方式,目的是声明不重复的消息队列,如果是同一个消息队列,则就变成worker模式,也就是说对于发布订阅模式有多少接收者就有多少个消息队列,而这些消息队列共同从一个交换机中获取数据
Program.cs
// 注册后台服务 builder.Services.AddHostedService<PublishDescribeConsumerService>(); builder.Services.AddHostedService<PublishDescribeConsumerService2>(); // 依赖注入 IRabbitProducer builder.Services.AddSingleton<IRabbitProducer, PublishDescribeProducer>();
测试
Console.WriteLine("测试生产者代码:"); var service = app.Services.GetService<IRabbitProducer>(); if (service == null) Console.WriteLine("RabbitProducer 为空"); else { service.Init("logs", "", ""); service.PushMessage("Demon"); service.PushMessage("Enemy"); service.PushMessage("Bones"); service.PushMessage("Natural"); service.PushMessage("Warriors"); }
运行结果
// 生产者 测试生产者代码: 初始化 PublishDescribeProducer! ExchangeName: logs 生产者发送内容:Demon ExchangeName: logs 生产者发送内容:Enemy ExchangeName: logs 生产者发送内容:Bones ExchangeName: logs 生产者发送内容:Natural ExchangeName: logs 生产者发送内容:Warriors // 消费者 amq.gen-3aHKCw3OGPh3exVQwyIiyA 接受到消息 接受到的消息为:Demon amq.gen-RyydsJGJkWW7jlpz9ycznA 接受到消息 接受到的消息为:Demon amq.gen-RyydsJGJkWW7jlpz9ycznA 接受到消息 接受到的消息为:Enemy amq.gen-3aHKCw3OGPh3exVQwyIiyA 接受到消息 接受到的消息为:Enemy amq.gen-3aHKCw3OGPh3exVQwyIiyA 接受到消息 接受到的消息为:Bones amq.gen-RyydsJGJkWW7jlpz9ycznA 接受到消息 接受到的消息为:Bones amq.gen-RyydsJGJkWW7jlpz9ycznA 接受到消息 接受到的消息为:Natural amq.gen-3aHKCw3OGPh3exVQwyIiyA 接受到消息 接受到的消息为:Natural amq.gen-RyydsJGJkWW7jlpz9ycznA 接受到消息 接受到的消息为:Warriors amq.gen-3aHKCw3OGPh3exVQwyIiyA 接受到消息 接受到的消息为:Warriors
可以看到来自
logs
交换器的数据进入两个具有服务器分配名称的队列。符合我们的预期
4.4 Routing(路由模式)
使用direct
类型的Exchange, 发 N 条消息并使用不同的 routingKey
。
消费者定义队列并将队列、routingkey
、Exchange绑定。 此时使用 direct
模式 Exchange 必须要 routingKey
完全匹配的情况下消息才会转发到对应的队列中被消费。
上一个模式中,可以将消息广播到很多接收者。现在我们想让接收者只接收部分消息,如我们通过直接模式的交换器将关键的错误信息记录到 log 文件,同时在控制台正常打印所有的
日志信息。
绑定
上一模式中,交换器的使用方式:
channel.QueueBind(queneName, exchangeName, "");
绑定语句中还有第三个参数: routingkey
。为避免与BasicPublish
参数混淆,我们将其称为 绑定键
。
channel.QueueBind(queueName, exchangeName, "black");
bindingkey
的作用与具体使用的交换器类型有关。绑定键的含义取决于交换类型。对于 fanout
类型的交换器,此参数设置无效,系统直接忽略。
Direct exchange
上一个模式中日志系统将所有消息广播给所有消费者。我们希望扩展它以允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本只接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
我们使用的是 fanout
exchange,它没有给我们太多的灵活性——它只能进行无意识的广播。
我们将改为使用 direct
exchange。直接交换背后的路由算法很简单——消息进入 绑定键与消息的路由键
完全匹配的队列。
上图中可以看到绑定了两个队列的 direct 类型的交换器 X。Q1 使用绑定键 orange
进行绑定,Q2 有两个绑定,一个使用绑定键 black
另一个使用 green
。
使用路由键 orange
发布到交换器的消息 将被路由到队列 Q1。带有 black
或 green
路由键的消息将发送到Q2。所有其他消息将被丢弃。
Multiple bindings
使用相同的绑定键绑定多个队列是完全合法的。如上图中使用
black
路由键发布到 X 交换器的消息,将被路由给 Q1 和 Q2,此时 direct 交换器的行为类似于 fanout 并将消息广播到所有匹配的队列。
使用
生产者创建/确认 Exchange 存在后带着 Routing Key 直接发送消息即可。
交换器和队列的绑定,交给消费者去做。
首先先创建一个 exchange
_channel.ExchangeDeclare("direct_logs",ExchangeType.Direct);
准备发送消息
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
_channel.BasicPublish("direct_logs",RoutingKey,null,body);
接收消息和上个模式一样,只是要进行多个绑定。
QueueName = _channel.QueueDeclare().QueueName;
foreach (var severity in severityArr)
{
_channel.QueueBind(QueueName,"direct_logs",severity,null);
}
代码
生产者
RoutingProducer.cs
public class RoutingProducer: IRabbitProducer { private IModel _channel; public string ExchangeName; public string RoutingKey; private readonly RabbitOptions _options; public RoutingProducer(IOptions<RabbitOptions> options) { Console.WriteLine("初始化 RoutingProducer!"); _options = options.Value; } public void Init(string exchangeName, string queueName, string routingKey) { try { ExchangeName = exchangeName; RoutingKey = routingKey; var factory = new ConnectionFactory { HostName = _options.HostName, Port = _options.Port, UserName = _options.UserName, Password = _options.Password }; var connection = factory.CreateConnection(); _channel = connection.CreateModel(); _channel.ExchangeDeclare(ExchangeName,ExchangeType.Direct); } catch (Exception e) { Console.WriteLine($"RabbitProducer init fail: {e.Message}"); } } public void PushMessage(string message) { } public void PushMessage(string routeKey, object message) { if (!string.IsNullOrWhiteSpace(routeKey)) RoutingKey = routeKey; Console.WriteLine($"ExchangeName: {ExchangeName}、RoutingKey:{routeKey}"); Console.WriteLine($"生产者发送内容:{message}"); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); _channel.BasicPublish(ExchangeName,RoutingKey,null,body); } }
消费者 2个后台服务
RoutingConsumerService.cs
、RoutingConsumerService.cs
public class RoutingConsumerService: RabbitConsumerService { private static readonly string[] severityArr = new []{"error","info","warning"}; public RoutingConsumerService(IOptions<RabbitOptions> options) : base(options) { ExchangeName = "direct_logs"; } public override void Register() { _channel.ExchangeDeclare(ExchangeName,ExchangeType.Direct); QueueName = _channel.QueueDeclare().QueueName; foreach (var severity in severityArr) { _channel.QueueBind(QueueName,ExchangeName,severity,null); } var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body.ToArray()); Process($"Queue: {QueueName},RoutingKey:{ea.RoutingKey}收到的消息为{message}"); }; _channel.BasicConsume(QueueName, true, consumer); } public override bool Process(string message) { Console.WriteLine(message); return true; } }
RoutingConsumerService2
代码基本和RoutingConsumerService
一样,只是日志等级只有errorprivate static readonly string[] severityArr = new []{"error"};
Program.cs
// 注册后台服务 builder.Services.AddHostedService<RoutingConsumerService>(); builder.Services.AddHostedService<RoutingConsumerService2>(); // 依赖注入 IRabbitProducer builder.Services.AddSingleton<IRabbitProducer, RoutingProducer>();
测试
public void Routing() { var args = new string[] { "error","info","warning"}; _rabbitProducer.Init("direct_logs","",""); foreach (var t in args) { _rabbitProducer.PushMessage(t,t+" 消息"); } } Console.WriteLine("测试生产者代码:"); Routing();
测试结果
// 生产者 ExchangeName: direct_logs、RoutingKey:error 生产者发送内容:error 消息 ExchangeName: direct_logs、RoutingKey:info 生产者发送内容:info 消息 ExchangeName: direct_logs、RoutingKey:warning 生产者发送内容:warning 消息 // 消费者 Queue: amq.gen-BvwBW60UxgQagrECYJmaVQ,RoutingKey:error收到的消息为"error 消息" Queue: amq.gen-1jLXZkMDY5Z4uvMukVNKqw,RoutingKey:error收到的消息为"error 消息" Queue: amq.gen-BvwBW60UxgQagrECYJmaVQ,RoutingKey:info收到的消息为"info 消息" Queue: amq.gen-BvwBW60UxgQagrECYJmaVQ,RoutingKey:warning收到的消息为"warning 消息"
可以看到 C1 只接受 error 消息, C2 接受 error、info 和 warning 类型消息。
4.5 Topic(主题模式)
使用 topic
类型的交换器,队列绑定到交换器、bindingKey
时使用通配符
,交换器将消息路由转发到具体队列时会根据消息 routingkey
模糊匹配,比较灵活。
使用主题模式 routing_key
的格式是有要求的,不能随便命名 —— 必须是由点.
分割的单词列表。具体内容没有限制,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse
”、“ nyse.vmw
”、“ quick.orange.rabbit
”。路由键中可以有任意多的单词,最多为 255 个字节。
绑定键binding_key
也必须采用相同的格式。
Topic
交换器类似于 Direct
交换器, 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况:
*
(星号)可以只替换一个单词。#
(hash) 可以代替零个或多个单词。
下面的这个列子可以很直观的解释:
这个示例中我们想要发送描述动物的消息。消息的路由键routing_key
由3个部分组成(还有2个.
)。格式为 <speed>.<color>.<species>
,第一个词描述动物的运动速度,第二个词是颜色,第三个词是动物种类。
图中有3种绑定关系:
- Q1 通过
binding_key
(绑定键)*.orange.*
和交换器 X 绑定 - Q2 通过
binding_key
(绑定键)*.*.rabbit
、lazy.#
和交换器 X 绑定
也就是说, Q1 只对所有颜色为橙色的动物感兴趣,Q2 对 兔子的所有信息以及所有跑得慢的动物感兴趣。
比如:
P
发送一条路由键为quick.orange.rabbit
的消息,C1 和 C2 都将收到这条消息P
发送一条路由键为lazy.orange.elephant
的消息,C1 和 C2 也都收到这条消息P
发送一条路由键为quick.orange.fox
的消息,则只有 C1 收到消息P
发送一条路由键为lazy.brown.fox
的消息,则只有 C2 收到消息lazy.pink.rabbit
虽然匹配两个binding_key
但是 C2 只会收到一次消息- 没有匹配
binding_key
的消息(如quick.brown.fox
)将被丢弃
Topic 交换器
Topic
交换器功能更强大,也可以像其他类型交换器那样工作。当队列使用 含有
#
的binding_key
进行绑定时—— 队列会接受所有消息,无论routing_key
是什么,就像fanout
类型交换器一样。当队列不使用含有
*
但没有使用#
的binding_key
进行绑定时 —— 则和direct
模式一样
代码
生产者
TopicProducer
public class TopicProducer : IRabbitProducer { private IModel _channel; private string _exchangeName; private string _routeKey; private readonly RabbitOptions _rabbitOptions; public TopicProducer(IOptions<RabbitOptions> options) { Console.WriteLine("初始化 TopicProducer!"); _rabbitOptions = options.Value; } public void Init(string exchangeName, string queueName, string routingKey) { try { _exchangeName = exchangeName; var factory = new ConnectionFactory { HostName = _rabbitOptions.HostName, Port = _rabbitOptions.Port, UserName = _rabbitOptions.UserName, Password = _rabbitOptions.Password }; _channel = factory.CreateConnection().CreateModel(); _channel.ExchangeDeclare(_exchangeName,ExchangeType.Topic); } catch (Exception e) { Console.WriteLine($"RabbitProducer init fail: {e.Message}"); } } public void PushMessage(string message) { } public void PushMessage(string routeKey, object message) { string msg = JsonConvert.SerializeObject(message); Console.WriteLine($"向交换器 {_exchangeName} ,routing key:{routeKey} 发送消息:{msg}"); var body = Encoding.UTF8.GetBytes(msg); _channel.BasicPublish(_exchangeName,routeKey,null,body); } }
消费者
TopicConsumerService.cs
和TopicConsumerService2.cs
public class TopicConsumerService: RabbitConsumerService { public TopicConsumerService(IOptions<RabbitOptions> options) : base(options) { ExchangeName = "topic_logs"; RouteKey = "#"; QueueName = "logs.all"; } public override void Register() { _channel.ExchangeDeclare(ExchangeName,ExchangeType.Topic); _channel.QueueDeclare(QueueName, false, false, true); _channel.QueueBind(QueueName,ExchangeName,RouteKey); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"Queue:{QueueName} \t Exchange: {ExchangeName} \t RoutingKey: {ea.RoutingKey} \t BindingKey:{RouteKey} \t Message: {message}"); }; _channel.BasicConsume(QueueName, true, consumer); } public override bool Process(string message) { return true; } }
public class TopicConsumerService2: RabbitConsumerService
{
public TopicConsumerService2(IOptions<RabbitOptions> options) : base(options)
{
ExchangeName = "topic_logs";
RouteKey = "kern.*";
QueueName = "logs.kern";
}
public override void Register()
{
_channel.ExchangeDeclare(ExchangeName,ExchangeType.Topic);
_channel.QueueDeclare(QueueName, false, false, true);
_channel.QueueBind(QueueName,ExchangeName,RouteKey);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
string message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"Queue:{QueueName}\tExchange: {ExchangeName}\tRoutingKey: {ea.RoutingKey}\tBindingKey:{RouteKey}\t Message: {message}");
};
_channel.BasicConsume(QueueName, true, consumer);
}
public override bool Process(string message)
{
return true;
}
}
TopicConsumerService.cs
绑定键为 #
它将接受 topic_logs
交换器所有的消息
TopicConsumerService2.cs
绑定键为 kern.*
它将接收 topic_logs
交换器种路由键为 kern.
打头的消息
Program.cs
// 注册后台服务 builder.Services.AddHostedService<TopicConsumerService>(); builder.Services.AddHostedService<TopicConsumerService2>(); // 依赖注入 IRabbitProducer builder.Services.AddSingleton<IRabbitProducer, TopicProducer>();
测试
public void Topic() { var args = new string[] { "kern.critical","opps","ken.warn"}; _rabbitProducer.Init("topic_logs","",""); foreach (var rk in args) { _rabbitProducer.PushMessage(rk,$"routing key: {rk}, Hi."); } } Console.WriteLine("测试生产者代码:"); Topic();
结果
// 生产者 测试生产者代码: 初始化 TopicProducer! 向交换器 topic_logs ,routing key:kern.critical 发送消息:"routing key: kern.critical, Hi." 向交换器 topic_logs ,routing key:opps 发送消息:"routing key: opps, Hi." 向交换器 topic_logs ,routing key:ken.warn 发送消息:"routing key: ken.warn, Hi." // 消费者 Queue:logs.all Exchange: topic_logs RoutingKey: kern.critical BindingKey:# Message: "routing key: kern.critical, Hi." Queue:logs.kern Exchange: topic_logs RoutingKey: kern.critical BindingKey:kern.* Message: "routing key: kern.critical, Hi." Queue:logs.all Exchange: topic_logs RoutingKey: opps BindingKey:# Message: "routing key: opps, Hi." Queue:logs.all Exchange: topic_logs RoutingKey: ken.warn BindingKey:# Message: "routing key: ken.warn, Hi."
可以看到队列
logs.all
接收了生产者向topic_logs
交换器发送的所有日志消息。队列
logs.kern
只接收生产者向topic_logs
交换器发送的 kern 设备的日志消息。