博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python拓展8(消息队列rabbitMQ)
阅读量:4673 次
发布时间:2019-06-09

本文共 15667 字,大约阅读时间需要 52 分钟。

前置

  为什么要有消息队列

    生产者与消费者

    数据通信

      rest api http协议发送的json格式数据

      webservice http协议发送的xml格式数据

      rpc 基于socket并使用自己封装的协议进行数据传输

rabbitMQ

  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

  MQ全称为Message Queue, (MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

rabbitMQ安装与运行

服务端:    yum install rabbitmq-server运行:运行方法一:rabbitmq-server 运行方法二:systemctl start rabbitmq-server   重启:    systemctl restart rabbitmq-server

rabbitMQ服务端设置密码

sudo rabbitmqctl add_user wupeiqi 123# 设置用户为administrator角色sudo rabbitmqctl set_user_tags wupeiqi administrator# 设置权限sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"# 然后重启rabbiMQ服务sudo /etc/init.d/rabbitmq-server restart # 然后可以使用刚才的用户远程连接rabbitmq server了。------------------------------credentials = pika.PlainCredentials("wupeiqi","123")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
设置链接密码
#!/usr/bin/env python# -*- coding:utf-8 -*-import pikafrom pika.adapters.blocking_connection import BlockingChannelcredentials = pika.PlainCredentials("root", "123")conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.20', credentials=credentials))# 超时时间conn.add_timeout(5, lambda: channel.stop_consuming())channel = conn.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    channel.stop_consuming()channel.basic_consume(callback,                      queue='hello',                      no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
设置超时时间

rabbitMQ客户端

pip3 install pika

简单的rabbitMQ队列实现

# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hellochannel.queue_declare(queue='hello')# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
生产者
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取一个就关掉的方法    channel.stop_consuming()# 去hello队列里拿数据,一但有数据,就执行callbackchannel.basic_consume(callback, queue='hello', no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者

rabbitMQ队列进阶

 1、acknowledgment 消息不丢失【消费者变化】

  no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

关键点:

# 设置消费者方# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hello', no_ack=False)与取值函数:# 取值做确认工作ch.basic_ack(delivery_tag=method.deliver_tag)

示例:

# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hellochannel.queue_declare(queue='hello')# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
生产者
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者

2、durable   消息不丢失【生产者变化】

应用场景及注意

# 当想为服务器做持久化的时候需要设置durable=True  # 代表数据持久化delivery_mode=2 # 消息持久化# ps:注意如果以前存在过队列,便无法多这个队列进行重新赋予属性,也就是说以前存在的不可持久化的队列# 是没有办法变成可持久化的队列的,且已存在的队列无法重新生成

示例:

# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 创建一个队列,队列名称叫做hello,# 并对其做持久化durable=True【即使服务器挂掉也不会丢失】channel.queue_declare(queue='hi', durable=True)# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!',                      properties=pika.BasicProperties(                          delivery_mode=2,  # 并对其做持久化【即使服务器挂掉也不会丢失】                      ))print(" [x] Sent 'Hello World!'")connection.close()
生产者
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 服务器做持久化 durable=Truechannel.queue_declare(queue='hi', durable=True)def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hi', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者

3、消息获取顺序【消费者改变】

  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

关键点

channel.basic_qos(prefetch_count=1) # 表示谁来谁取,不再按照奇偶数排列# 谁先完成任务,谁就取下一个任务

示例

# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 设置谁完成任务,谁便来取下一个任务channel.basic_qos(prefetch_count=1)# 去hello队列里拿数据,一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出channel.basic_consume(callback, queue='hi', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者

4、发布订阅【可以有多个订阅者//订阅者代码是一样的】

  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

# !/usr/bin/env pythonimport pika# ######################### 发布者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 发布者模式exchange='ssq', type='fanout'channel.exchange_declare(exchange='ssq', type='fanout')# exchange:发布者方式channel.basic_publish(exchange='ssq',                      routing_key='',                      body='Hello World!',                      )print(" [x] Sent 'Hello World!'")connection.close()
发布者
# !/usr/bin/env pythonimport pika# ########################## 订阅者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 订阅者exchange='ssq', exchange_type='fanout'channel.exchange_declare(exchange='ssq', exchange_type='fanout')# 订阅者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和e1绑定channel.queue_bind(exchange='e1', queue=queue_name)def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出#  queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
订阅者

5、关键字发送 

  exchange type = direct

  之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

 

示例:

# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 关键字模式exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 关键字模式routing_key='info',channel.basic_publish(exchange='ssq',                      # 关键字routing_key='info',                      routing_key='info',                      body='Hello World!',                      )print(" [x] Sent 'Hello World!'")connection.close()
生产者
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 发布者exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 发布者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s4绑定两个关键字routing_key='info'channel.queue_bind(exchange='s4', queue=queue_name, routing_key='info')channel.queue_bind(exchange='s4', queue=queue_name, routing_key='reeor')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出#  queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者一
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 发布者exchange='s4', exchange_type='direct'channel.exchange_declare(exchange='s4', exchange_type='direct')# 发布者随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s4绑定一个关键字routing_key='info'channel.queue_bind(exchange='s4', queue=queue_name, routing_key='info')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出#  queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)
消费者二

6、模糊匹配

 

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
发送者路由值              队列中kk.ss.python          kk.*  -- 不匹配kk.ss.python          kk.#  -- 匹配

示例

# !/usr/bin/env pythonimport pika# ######################### 生产者 ########################## 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))# 声明一个channel,类似数据库打开一个链接channel = connection.channel()# 关键字模式exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 关键字模式routing_key='info',channel.basic_publish(exchange='s5',                      # 关键字routing_key='info',                      routing_key='info.xx',                      body='Hello World!',                      )print(" [x] Sent 'Hello World!'")connection.close()
生产者
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 模糊匹配exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 模糊匹配随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s5绑定一个关键字routing_key='info.*'channel.queue_bind(exchange='s5', queue=queue_name, routing_key='info.*')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出#  queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者一
# !/usr/bin/env pythonimport pika# ########################## 消费者 ########################### 如果设置密码,那么就需要加以下一句话配置用户名与密码credentials = pika.PlainCredentials("root", "123")# host:ip地址 credentials:链接凭证connection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost', credentials=credentials))channel = connection.channel()# 模糊匹配exchange='s5', exchange_type='direct'channel.exchange_declare(exchange='s5', exchange_type='topic')# 模糊匹配随机生成一个队列名result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 让队列和s5绑定一个关键字routing_key='info.#'channel.queue_bind(exchange='s5', queue=queue_name, routing_key='info.#')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    # 取值做确认工作    ch.basic_ack(delivery_tag=method.deliver_tag)# 一但有数据,就执行callback,# no_ack=Flask必须在取值时做确认工作,否则值不会被取出#  queue=queue_name等待区接收任务channel.basic_consume(callback, queue=queue_name, no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉channel.start_consuming()
消费者二

关于此类问题

1、exchange的作用

exchange和队列进行绑定用户想队列发送数据时,无需再找队列,直接向exchange中发送即可

关系图

2、rabbitmq中有几种exchange?

fanout    # 这要绑定就发dirct    # 确定的关键字topic    # 模糊的匹配

3、消息持久化和ack

服务端(durable)客户端(ack)

4、消息队列的应用场景

防止消息堆积(消息提醒)订单处理(消息队列+celery)# 事务比较多的时候并发量的的时候使用

 

转载于:https://www.cnblogs.com/L5251/articles/9325586.html

你可能感兴趣的文章
nopcommerce 二次开发
查看>>
NHibernate入门实例
查看>>
IBM_DS5020磁盘阵列做raid、热备并把盘阵挂在服务器上的步骤
查看>>
svg制作风车旋转
查看>>
《软件工程》课堂作业:返回一个整数数组中最大字数组的和
查看>>
ACM 美素数 (没AC)
查看>>
Sqlserver学习研究
查看>>
VTK图形模型主要对象
查看>>
c# Linq实现 获得某一个路径下所有文件的名(不含扩展名)
查看>>
动静态广播的区别
查看>>
前缀式计算(前缀表达式)
查看>>
Linux常用命令大全
查看>>
添加删除tag
查看>>
ARM学习篇 中断定时理解
查看>>
卷积神经网络在tenserflow的实现
查看>>
[STL]用法
查看>>
PostgresException: 42883: function ifnull(integer, integer) does not exist
查看>>
python3 表情符号编码
查看>>
桥接模式
查看>>
跨server传输数据注意事项
查看>>