C#创建RabbitMq连接
public static class BasePublisher{public static ConnectionFactory CreateRabbitMqConnection(){// RabbitMQ连接工厂return new ConnectionFactory(){HostName = "localhost",// 用户名UserName = "guest",// 密码Password = "guest",// 网络故障自动恢复连接AutomaticRecoveryEnabled = true,// 心跳处理RequestedHeartbeat = new TimeSpan(5000)};}}
点对点
“ P”是生产者,“ C”是消费者。中间的框是一个队列-RabbitMQ代表保留的消息缓冲区

#define publishernamespace RabbitMQPublisher{using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;/// <summary>/// 点对点:最简单的工作模式/// </summary>internal static class PointToPointPublisher{readonly static string queueName = "test.pointToPoint.queue";#if publisherprivate static void Main(string[] args){while (true){Console.WriteLine("消息发布者:模式{点对点}=>输入消息内容");string message = Console.ReadLine();if (!string.IsNullOrEmpty(message)){// RabbitMQ连接工厂ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();// 建立连接using IConnection connection = factory.CreateConnection();// 创建信道using IModel channel = connection.CreateModel();// 声明队列channel.QueueDeclare(queueName, false, false, false, null);// 消息发送channel.BasicPublish(exchange: "",routingKey: queueName,basicProperties: null,body: Encoding.UTF8.GetBytes(message));}}}#elseprivate static void Main(string[] args){Console.WriteLine($"PointToPointConsumer");// RabbitMQ连接工厂ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();// 建立连接using IConnection connection = factory.CreateConnection();// 创建信道using IModel channel = connection.CreateModel();// 声明队列channel.QueueDeclare(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);EventingBasicConsumer consumer =new EventingBasicConsumer(channel);// 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息channel.BasicQos(0, 1, false);// 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"Message:{message}");channel.BasicAck(deliveryTag: ea.DeliveryTag,// 是否一次性确认多条数据multiple: false);};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.ReadLine();}#endif}}
Worker
worker模式是一对多的模式,但是这个一对多并不是像发布订阅那种,而是将消息顺序传输给每个接收者

生产者
while (true){Console.WriteLine("消息发布者:模式{Worker}=>输入消息内容");string message = Console.ReadLine();if (!string.IsNullOrEmpty(message)){// RabbitMQ连接工厂var factory = BasePublisher.CreateRabbitMqConnection();// 建立连接using var connection = factory.CreateConnection();// 创建信道using var channel = connection.CreateModel();// 声明队列string queueName = "test.worker.queue";channel.QueueDeclare(queueName, false, false, false, null);// 消息发送channel.BasicPublish(exchange: "",routingKey: queueName,basicProperties: null,body: Encoding.UTF8.GetBytes(message));}}
消费者
static void Main(string[] args){Console.WriteLine($"{nameof(WorkerConsumerClient1)}:");// RabbitMQ连接工厂var factory = BaseConsumer.CreateRabbitMqConnection();// 建立连接using var connection = factory.CreateConnection();// 创建信道using var channel = connection.CreateModel();string queueName = "test.worker.queue";// 申明队列channel.QueueDeclare(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);EventingBasicConsumer consumer =new EventingBasicConsumer(channel);// 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息channel.BasicQos(0, 1, false);// 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"Message:{message}");channel.BasicAck(deliveryTag: ea.DeliveryTag,// 是否一次性确认多条数据multiple: false);};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.ReadLine();}
这里定义了两个消费者来消费Queue,结果如下

ExchangesType
Exchange分发消息时根据类型的不同分发策略有区别:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型
fanout
发布订阅模式(fanout),消息发送到Exchange,所有订阅了当前Exchange的Queue都可以收到消息;每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的

生产者
while (true){Console.WriteLine("消息发布者:模式{fanout}=>输入消息内容");string message = Console.ReadLine();if (!string.IsNullOrEmpty(message)){var factory = BasePublisher.CreateRabbitMqConnection();using var connection = factory.CreateConnection();using var channel = connection.CreateModel();// 声明交换机string exchangeName = $"test.exchange.fanout";channel.ExchangeDeclare(exchange: exchangeName,type: "fanout");// 声明队列string queue1 = "test.fanout.queue1";channel.QueueDeclare(queue1, false, false, false, null);string queue2 = "test.fanout.queue2";channel.QueueDeclare(queue2, false, false, false, null);// 将队列与交换机进行绑定channel.QueueBind(queue: queue1,exchange: exchangeName,routingKey: "fanout");channel.QueueBind(queue: queue2,exchange: exchangeName,routingKey: "");channel.BasicPublish(exchange: exchangeName,routingKey: "",basicProperties: null,body: Encoding.UTF8.GetBytes(message));}}

这里绑定q1时指定了routingkey=”fanout”但是q1/q2都正常收到了消息
消费者1
Console.WriteLine($"{nameof(FanoutConsumerClient1)}:");// RabbitMQ连接工厂var factory = BaseConsumer.CreateRabbitMqConnection();// 建立连接using var connection = factory.CreateConnection();// 创建信道using var channel = connection.CreateModel();string exchangeName = $"test.exchange.fanout";//声明交换机并指定类型channel.ExchangeDeclare(exchange: exchangeName,type: "fanout");string queueName = $"test.fanout.queue1";//声明队列channel.QueueDeclare(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);//将队列与交换机进行绑定channel.QueueBind(queue: queueName,exchange: exchangeName,routingKey: "fanout");EventingBasicConsumer consumer =new EventingBasicConsumer(channel);// 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息channel.BasicQos(0, 1, false);// 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"Message:{message}");channel.BasicAck(deliveryTag: ea.DeliveryTag,// 是否一次性确认多条数据multiple: false);};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.ReadLine();
消费者2
Console.WriteLine($"{nameof(FanoutConsumerClient2)}:");// RabbitMQ连接工厂var factory = BaseConsumer.CreateRabbitMqConnection();// 建立连接using var connection = factory.CreateConnection();// 创建信道using var channel = connection.CreateModel();string exchangeName = $"test.rabbitMq.fanout";//声明交换机并指定类型channel.ExchangeDeclare(exchange: exchangeName,type: "fanout");string queueName = $"test.fanout.queue2";//声明队列channel.QueueDeclare(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);//将队列与交换机进行绑定channel.QueueBind(queue: queueName,exchange: exchangeName,routingKey: "");EventingBasicConsumer consumer =new EventingBasicConsumer(channel);// 每次只能向消费者发送一条消息,在消费者未确认之前,不再向它发送消息channel.BasicQos(0, 1, false);// 绑定消息接收后的事件委托consumer.Received += (model, ea) =>{string message =Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"Message:{message}");channel.BasicAck(deliveryTag: ea.DeliveryTag,// 是否一次性确认多条数据multiple: false);};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.ReadLine();

Consumer1消费时指定了routingKey,但是两个消费端都正常收到了消息,说明ExchangeType=”fanout”时,不受routingKey影响
direct
direct跟fanout的区别在于多了routekey,消息发送到Exchange,所有订阅了当前Exchange并且routingKey完全匹配的Queue才可以收到消息;消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式

static void Main(string[] args){while (true){Console.WriteLine("消息发布者:模式{direct}=>输入消息内容");string message = Console.ReadLine();if (!string.IsNullOrEmpty(message)){ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();using var connection = factory.CreateConnection();using var channel = connection.CreateModel();// 声明交换机string exchangeName = $"test.exchange.direct";channel.ExchangeDeclare(exchange: exchangeName,type: "direct");// 声明队列string queue1 = "test.direct.queue1";channel.QueueDeclare(queue1, false, false, false, null);string queue2 = "test.direct.queue2";channel.QueueDeclare(queue2, false, false, false, null);//将队列与交换机进行绑定channel.QueueBind(queue: queue1,exchange: exchangeName,routingKey: "fanout");channel.QueueBind(queue: queue2,exchange: exchangeName,routingKey: "");// 只有queue1可以收到消息,因为queue2的routingKey不匹配channel.BasicPublish(exchange: exchangeName,routingKey: "fanout",basicProperties: null,body: Encoding.UTF8.GetBytes(message));}}}
topic
topic符模式与路由模式一致,只不过通配符模式中的路由可以声明为模糊查询,RabbitMQ拥有两个通配符;topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词

- “#”:匹配0-n个字符语句
- “*”:匹配一个字符语句
注意:RabbitMQ中通配符并不像正则中的单个字符,而是一个以“.”分割的字符串,如 ”topic1.*“匹配的规则以topic1开始并且”.”后只有一段语句的路由。例:”topic1.aaa”,”topic1.bb”
namespace RabbitMQPublisher{using System;using System.Text;using RabbitMQ.Client;/// <summary>/// 路由模式(topic),消息会发送到exchange/// topic与direct模式区别在于routingKey可以声明为模糊查询,RabbitMQ拥有两个通配符/// #:匹配0-n个字符语句/// *:匹配一个字符语句/// </summary>static class TopicPublisher{static void Main(string[] args){while (true){Console.WriteLine("消息发布者:模式{topic}=>输入消息内容");string message = Console.ReadLine();if (!string.IsNullOrEmpty(message)){ConnectionFactory factory = BasePublisher.CreateRabbitMqConnection();using var connection = factory.CreateConnection();using var channel = connection.CreateModel();// 声明交换机string exchangeName = $"test.exchange.topic";channel.ExchangeDeclare(exchange: exchangeName,type: "topic");// 声明队列string queue1 = "test.topic.queue1";channel.QueueDeclare(queue1, false, false, false, null);string queue2 = "test.topic.queue2";channel.QueueDeclare(queue2, false, false, false, null);//将队列与交换机进行绑定channel.QueueBind(queue: queue1,exchange: exchangeName,routingKey: "topic.*");channel.QueueBind(queue: queue2,exchange: exchangeName,routingKey: "topic.#");#if debug// queue1和queue2都可以收到消息channel.BasicPublish(exchange: exchangeName,routingKey: "topic.test",basicProperties: null,body: Encoding.UTF8.GetBytes(message));#endif// 只有queue2可以收到消息,因为.#可以匹配一个或者多个字符语句而.*只能匹配单个channel.BasicPublish(exchange: exchangeName,routingKey: "topic.test.test",basicProperties: null,body: Encoding.UTF8.GetBytes(message));}}}}}
发布,路由,通配符这三种模式可以算为一种模式,区别仅仅是交互机类型不同.发送者将消息发送发送到交换机,接收者创建各自的消息队列绑定到交换机

