1 概念
什么是消息队列?
基于生产者消费者模型,即生产一个消费一个,无生产则无消费,有消费无生产则等待;
采用队列数据结构,先进先出,和生活中排队一致;
为什么消息要形成队列?
计算机处理单条消息的事件始终受限,微观上来看本就逐条处理,但宏观上同时到达的任务会导致并发困难以至于积压任务;
消息队列就是对任务形成一种逻辑处理顺序,不再是无需的任务处理请求,按照先来先出来的原则,逐次处理任务;
由于微观上的逐次处理和多任务导致宏观上的并发丧失,消息队列提供了最坏的时间长度和一般的逻辑处理,计算机不再随机或者说遭遇堵塞;
对于许多任务并非要求实时性,比如大V的微博发布,对于在线用户可以直接推,但对于离线用户可以存储在消息队列,等用户上线后再推送,
对于服务器而言,减少了并发处理的任务量,对于用户而言,基本感觉不到消息的延迟,而对于推荐而言,公共区域的推送直接节省了多个用户推送的数据量,展示更加直观;
代码模板
# -*- coding: UTF-8 -*-"""@author:41999@file:队列.py@time:2021/10/22"""import queueq = queue.Queue(maxsize=100)q.put(123)q.put(456)q.put(789)print(q.get())print(q.get())print(q.get(block=False))# block用于关闭阻塞时读取操作的挂起
为什么使用消息队列?
简介
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
应用场景—外卖系统
外卖订单的数据模型:用户,时间,店铺,金额
处理流程:用户完成付款,订单系统负责提供信息,分为四个对象,
- 对用户而言,告知交易成功;
- 对骑手系统而言,告知新的任务已产生,等待抢单,
- 对于产品存储系统而言,告知其库存减量,
- 对于商家系统而言,扣除前端库存,并且告知交易产生
通知的方式:链式处理,骑手系统 —> 产品存储系统 —> 商家系统 —> 用户[完整走完四个部件,数据却可能丢失]
传统业务一对一,结算系统开启一个接口,逐次通知四个系统;消息队列的方式:开放四个接口,被四个系统监听,并且形成一个计数器,成功读取四次后自动销毁数据,等待下一个订单的生产;5. 消息队列充当了一个中间层,左侧是订单系统,右侧分别是骑手系统,产品库存系统,商家系统和用户,右侧四者分别订阅消息,左侧订单的变动会实时推送给右侧四个主体;
评价方式[内聚耦合]
低内聚高耦合,模块相对独立;
RabbitMQ
简介:
分为简单模式和交换机模式两种交换机模式分为:发布订阅;关键字匹配模式;模糊匹配模式简单模式生产者:链接服务器;创建队列;将消息插入队列;消费者:链接服务器;创建消息队列;
生产者代码
# -*- coding: UTF-8 -*-"""@author:41999@file:01producer.py@time:2021/10/27"""import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')# 简单模式,交换机参数为空,第二个参数是指定队列channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print("[x] Sent 'Hello world!'")
消费者
# -*- coding: UTF-8 -*-"""@author:41999@file:02consumer.py@time:2021/10/27"""import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 创建消息队列channel.queue_declare(queue='hello')# 确定回调函数def callback(ch, mnethod, properties, body):print("[x] Received %r" % body)# 监听队列 auto_ack 默认应答channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)print("[*] Waiting for messages. To exit press CTRL + C")# 正式开始监听channel.start_consuming()
最终效果

参数使用
应答参数
应用场景
1. 应答参数所属的函数以及消息接收阶段1. 属于消费者模块的函数channel.basic_consume;属于最后的确定监听队列阶段,在确定回调函数之后2. 存在的意义1. 默认应答则在生产一个消息后消费一个消息2. 当生产者生产一个消息,消费者创建连接--->创建队列--->创建回调函数时,程序接收但意外终止四个步骤并没有完全走完,那么数据就会丢失[原子操作]3. 解决策略:将应答参数改为False,手动应答
代码
===========================producer==================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')# 简单模式,交换机参数为空,第二个参数是指定队列channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print("[x] Sent 'Hello world!'")===========================consumer=================================================# 确定回调函数def callback(ch, mnethod, properties, body):print("[x] Received %r" % body)# 监听队列 auto_ack 默认应答channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)print("[*] Waiting for messages. To exit press CTRL + C")# 正式开始监听channel.start_consuming()
最终效果

持久化
应用场景
为什么设置这个参数?
RabbitMQ可能会意外停止,而持久化可以保证即使程序重启,消费者队列依然可以接收到生产者的产品
代码
=====================================producer.py=====================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello', durable=True, passive=True)# 简单模式,交换机参数为空,第二个参数是指定队列channel.basic_publish(exchange='',routing_key='hello',properties=pika.BasicProperties(delivery_mode=2),body='Hello World!')print("[x] Sent 'Hello world!'")=====================================consumer.py=====================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 创建消息队列channel.queue_declare(queue='hello', durable=True, passive=True)# 确定回调函数def callback(ch, mnethod, properties, body):print("[x] Received %r" % body)# ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品# 监听队列 auto_ack 默认应答channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)print("[*] Waiting for messages. To exit press CTRL + C")# 正式开始监听channel.start_consuming()
报错:pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'")
解决
在队列建立阶段中中新增passive参数,并且设置为True。生产者与消费者均需要设置,且均为持久化参考文档:https://www.cnblogs.com/gangzi4321/p/11001497.html
订阅者模式
场景
生产者通过交换机发布消息,多个消费者建立自己的队列从交换机读取同一份信息,类似于村里的广播
代码
=======================================producer.py====================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为long的,交换机类型为fanout的交换机message = "Info:20211027"channel.basic_publish(exchange='logs',exchange_type='fanout',routing_key='',body=message)print("[x] Sent % r" % message)connection.close()=======================================consumer.py====================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='fanout')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()
公平分发
简介
1. 缺陷我们提供多个消费者,目的就是为了提高系统的性能,提升系统处理任务的速度;如果将消息平均的分发给每个消费者,那么处理消息快的服务是不是会空闲下来;而处理慢的服务可能会阻塞等待处理,这样的场景是我们不愿意看到的;所以有了今天要说的分发模式,公平分发。2. 能者多劳所谓的公平分发,其实用能者多劳描述更为贴切,根据名字就可以知道,谁有能力处理更多的任务,那么就交给谁处理,防止消息的挤压。3. 前提将自动应答改为手动应答
代码
===================================================producer.py===================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='hello')# 简单模式,交换机参数为空,第二个参数是指定队列channel.basic_publish(exchange='',routing_key='hello',body='222')print("[x] Sent 'Hello world!'")===================================================consumer.py===================================================import pika,timeconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()# 创建消息队列channel.queue_declare(queue='hello')# 确定回调函数def callback(ch, mnethod, properties, body):time.sleep(5)print("[x] Received %r" % body)# ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品# 公平分发---能者多劳channel.basic_qos(prefetch_count=1)# 监听队列 auto_ack 默认应答channel.basic_consume(queue='hello',auto_ack=False,on_message_callback=callback)print("[*] Waiting for messages. To exit press CTRL + C")# 正式开始监听channel.start_consuming()
问题
1. 出现的问题1. 本次新增的参数为:channel.basic_qos(prefetch_count=1);2. 想要实现公平分配,即工作效率高,快速回复的对象接收最多的消息2. 实验方法1. 消费者分三次发送消息,间隔看手速,三个消费者持续监听同一个队列;2. 三个消费者的睡眠时间分别为5,10,15;3. 预期实现的效果:睡眠时间最短的消费者收到最多的消息,也就是三条;4. 实际运行的效果:三个消费者收到的消息平摊,每人一条。
交换机模式之关键字模式
简介
发布订阅相当于群发,而关键字模式类似于私发,前者一对多,后者一对一
测试方法
对于生产者而言,只需要更改参数消息发布中的routine_key和exchange_type=’direct’ ,对于消费者,需要修改exchange_type 和 routine_key前者修改模式,后者指定关键字
一个生产者对应三个消费者
===================================================producer.py====================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为long的,交换机类型为fanout的交换机message = "Info:20211029"channel.basic_publish(exchange='logs',exchange_type='direct',routing_key='info',body=message)print("[x] Sent % r" % message)connection.close()===================================================consumer1.py===================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='direct')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='error',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()===================================================consumer2.py===================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='direct')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='warning',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()===================================================consumer3.py===================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='direct')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='info',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()
交换机之通配符
简介
订阅者模式是一对多,关键字模式是一对一,通配符类似于群组分发,只要满足某个共同规则就可以
场景
一个生产者,四个消费者需要修改的参数:对于生产者,在声明消息队列的时候,需要指定交换机类型为topic对于消费者,在声明消息队列的时候,修改交换机类型为topic,修改关键字为通配符;
================================================producer.py=======================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',exchange_type='topic')message = "info:gunpowder"channel.basic_publish(exchange='logs',routing_key='USA.news',body=message)print(" [x] Sent %r" % message)connection.close()================================================consumer1.py======================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='topic')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='#.news',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()================================================consumer2.py======================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='topic')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='#.weather',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()================================================consumer3.py======================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='topic')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='USA.#',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()================================================consumer4.py======================================================import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个名为log类型为fanout的交换机channel.exchange_declare(exchange='logs',exchange_type='topic')# 创建队列result = channel.queue_declare("", exclusive=True)queue_name = result.method.queueprint(queue_name)# 将队列和交换机绑定channel.queue_bind(exchange='logs',queue=queue_name,routing_key='EU.#',)print('[*] Waiting for logs. To exit press CTRL + C')def callback(ch, method, properties, body):print("[X] % r" % body)# 将消费者的队列与交换机绑定channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)channel.start_consuming()
引用与参考
RabbitMQ常用命令:
# 管理命令rabbitmq-service.bat install 或 rabbitmq-service installrabbitmq-service.bat stop 或 rabbitmq-service stoprabbitmq-service.bat start 或 rabbitmq-server start# 查看状态rabbitmqctl status# 启动网页管理界面# 接口: localhost:15672# 命令:rabbitmq-plugins enable rabbitmq_management
link of downloading
http://www.erlang.org/downloadshttps://www.rabbitmq.com/download.html# 消息延迟插件https://www.rabbitmq.com/download.html# 插件应用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange# 安装教程链接https://www.cnblogs.com/yyee/p/14281111.html
