前置
为什么要有消息队列
生产者与消费者
数据通信
rest api http协议发送的json格式数据
webservice http协议发送的xml格式数据
rpc 基于socket并使用自己封装的协议进行数据传输
rabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, (MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
rabbitMQ安装与运行
服务端: yum install rabbitmq-server运行:运行方法一:rabbitmq-server 运行方法二:systemctl start rabbitmq-server 重启: systemctl restart rabbitmq-server
rabbitMQ服务端设置密码
sudo rabbitmqctl add_user wupeiqi 123# 设置用户为administrator角色sudo rabbitmqctl set_user_tags wupeiqi administrator# 设置权限sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"# 然后重启rabbiMQ服务sudo /etc/init.d/rabbitmq-server restart # 然后可以使用刚才的用户远程连接rabbitmq server了。------------------------------credentials = pika.PlainCredentials("wupeiqi","123")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
#!/usr/bin/env python# -*- coding:utf-8 -*-import pikafrom pika.adapters.blocking_connection import BlockingChannelcredentials = pika.PlainCredentials("root", "123")conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.20', credentials=credentials))# 超时时间conn.add_timeout(5, lambda: channel.stop_consuming())channel = conn.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.stop_consuming()channel.basic_consume(callback, queue='hello', no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
rabbitMQ客户端
pip3 install pika
简单的rabbitMQ队列实现
# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hellochannel.queue_declare(queue='hello')# 向hello这个队列里发送一个Hello World! exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取一个就关掉的方法 channel.stop_consuming()# 去hello队列里拿数据,一但有数据,就执行callbackchannel.basic_consume(callback, queue='hello', no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
rabbitMQ队列进阶
1、acknowledgment 消息不丢失【消费者变化】
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
关键点:
# 设置消费者方# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hello', no_ack=False)与取值函数:# 取值做确认工作ch.basic_ack(delivery_tag=method.deliver_tag)
示例:
# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hellochannel.queue_declare(queue='hello')# 向hello这个队列里发送一个Hello World! exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
2、durable 消息不丢失【生产者变化】
应用场景及注意
# 当想为服务器做持久化的时候需要设置durable=True # 代表数据持久化delivery_mode=2 # 消息持久化# ps:注意如果以前存在过队列,便无法多这个队列进行重新赋予属性,也就是说以前存在的不可持久化的队列# 是没有办法变成可持久化的队列的,且已存在的队列无法重新生成
示例:
# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hello,# 并对其做持久化durable=True【即使服务器挂掉也不会丢失】channel.queue_declare(queue='hi', durable=True)# 向hello这个队列里发送一个Hello World! exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 并对其做持久化【即使服务器挂掉也不会丢失】 ))print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 服务器做持久化 durable=Truechannel.queue_declare(queue='hi', durable=True)def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hi', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
3、消息获取顺序【消费者改变】
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
关键点
channel.basic_qos(prefetch_count=1) # 表示谁来谁取,不再按照奇偶数排列# 谁先完成任务,谁就取下一个任务
示例
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 设置谁完成任务,谁便来取下一个任务channel.basic_qos(prefetch_count=1)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hi', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
4、发布订阅【可以有多个订阅者//订阅者代码是一样的】
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
# !/usr/bin/env pythonimport pika# ######################### 发布者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 发布者模式exchange='ssq', type='fanout'channel.exchange_declare(exchange='ssq', type='fanout')# exchange:发布者方式channel.basic_publish(exchange='ssq', routing_key='', body='Hello World!', )print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 订阅者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 订阅者exchange='ssq', exchange_type='fanout'channel.exchange_declare(exchange='ssq', exchange_type='fanout')# 订阅者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和e1绑定channel.queue_bind(exchange='e1', queue=queue_name)def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出# queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
示例:
# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 关键字模式exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 关键字模式routing_key='info',channel.basic_publish(exchange='ssq', # 关键字routing_key='info', routing_key='info', body='Hello World!', )print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 发布者exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 发布者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s4绑定两个关键字routing_key='info'channel.queue_bind(exchange='s4', queue=queue_name, routing_key='info')channel.queue_bind(exchange='s4', queue=queue_name, routing_key='reeor')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出# queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 发布者exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 发布者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s4绑定一个关键字routing_key='info'channel.queue_bind(exchange='s4', queue=queue_name, routing_key='info')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出# queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
发送者路由值 队列中kk.ss.python kk.* -- 不匹配kk.ss.python kk.# -- 匹配
示例
# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 关键字模式exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 关键字模式routing_key='info',channel.basic_publish(exchange='s5', # 关键字routing_key='info', routing_key='info.xx', body='Hello World!', )print(" [x] Sent 'Hello World!'")connection.close()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 模糊匹配exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 模糊匹配随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s5绑定一个关键字routing_key='info.*'channel.queue_bind(exchange='s5', queue=queue_name, routing_key='info.*')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出# queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', credentials=credentials))channel = connection.channel()# 模糊匹配exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 模糊匹配随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s5绑定一个关键字routing_key='info.#'channel.queue_bind(exchange='s5', queue=queue_name, routing_key='info.#')def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 取值做确认工作 ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出# queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
关于此类问题
1、exchange的作用
exchange和队列进行绑定用户想队列发送数据时,无需再找队列,直接向exchange中发送即可
关系图
2、rabbitmq中有几种exchange?
fanout # 这要绑定就发dirct # 确定的关键字topic # 模糊的匹配
3、消息持久化和ack
服务端(durable)客户端(ack)
4、消息队列的应用场景
防止消息堆积(消息提醒)订单处理(消息队列+celery)# 事务比较多的时候并发量的的时候使用