Python pika 模块:使用教程

Python pika 模块

pika 是一个 Python 客户端库,用于与 RabbitMQ 进行通信的模块。RabbitMQ 是一个流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。pika 模块提供了与 RabbitMQ 进行交互的接口,支持多种功能,包括发布 - 订阅、请求 - 应答等模式。pika 模块兼容 Python 的多个版本,通常推荐使用 Python 3.x 版本以获得更好的功能支持和性能。

应用场景

pika 模块主要用于实现消息队列的功能,适用于各种场景:

  • 异步任务处理:在 Web 应用中,任务可以在后台异步执行,从而提高用户体验。例如,用户提交订单后,程序可以通过消息队列通知处理服务执行相关任务。
  • 服务解耦:通过消息队列,不同服务之间可以通过消息进行通信,达到松耦合的目的。例如,支付服务和订单处理服务可以通过 RabbitMQ 进行异步交互。
  • 流量削峰:在高流量的情况下,可以将请求放入消息队列,控制并发处理的数量,从而避免服务器过载。

安装说明

pika 模块不是 Python 的默认模块,需要通过 pip 进行安装。可以在命令行运行以下命令:

1
pip install pika  # 使用pip安装pika模块

安装完成后,可以在 Python 代码中导入 pika 模块进行使用。

用法举例

1. 示例 1:发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pika  # 导入pika模块

# 创建与RabbitMQ的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel() # 创建频道

# 声明一个队列
channel.queue_declare(queue='hello') # 确保队列存在

# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') # 通过频道发送消息

print(" [x] Sent 'Hello World!'") # 输出发送确认信息

connection.close() # 关闭连接

在这个示例中,我们创建与 RabbitMQ 的连接,声明一个名为’hello’的队列,并通过频道发送一条消息 “Hello World!” 到该队列。

2. 示例 2:接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pika  # 导入pika模块

# 创建与RabbitMQ的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel() # 创建频道

# 声明一个队列
channel.queue_declare(queue='hello') # 确保队列存在

# 定义消息处理函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body) # 输出接收到的消息

# 设置接收消息的回调函数
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) # 开始消费队列中的消息

print(' [*] Waiting for messages. To exit press CTRL+C') # 输出提示信息

channel.start_consuming() # 开始消费

在这个示例中,我们设置了一个消息接收器,用于接收来自’hello’队列的消息,并输出消息内容。

3. 示例 3:发送和接收多个消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pika  # 导入pika模块
import time # 导入time模块,以便我们可以暂停发送

# 发送消息的函数
def send_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 创建连接
channel = connection.channel() # 创建频道
channel.queue_declare(queue='task_queue', durable=True) # 确保队列存在,并持久化

for i in range(5): # 发送多个消息
message = f"Task {i + 1}" # 创建消息内容
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2)) # 发送消息并持久化
print(f" [x] Sent {message}") # 输出发送确认信息
time.sleep(1) # 暂停1秒

connection.close() # 关闭连接

# 接收消息的函数
def receive_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 创建连接
channel = connection.channel() # 创建频道
channel.queue_declare(queue='task_queue', durable=True) # 确保队列存在

# 定义消息处理函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body) # 输出接收到的消息
time.sleep(body.count(b'.')) # 根据消息内容的点数进行处理模拟耗时
print(" [x] Done") # 输出处理完成的信息
ch.basic_ack(delivery_tag=method.delivery_tag) # 发送确认

channel.basic_consume(queue='task_queue', on_message_callback=callback) # 设置接收回调函数

print(' [*] Waiting for messages. To exit press CTRL+C') # 输出提示信息
channel.start_consuming() # 开始消费

# 运行发送和接收函数
send_messages() # 发送消息
receive_messages() # 接收消息

在这个示例中,我们展示了如何发送和接收多个消息,发送者循环发送 5 条消息,接收者处理接收到的消息并输出信息。

通过以上的示例,您可以看到 pika 模块在实现消息队列功能中的强大能力。它允许开发者以简单的方式发送和接收消息,这是现代应用程序中实现异步和解耦的重要工具。

强烈建议大家关注本人的博客全糖冲击博客,这里包含了所有 Python 标准库的使用教程,方便您随时查询和学习。我的博客内容丰富,涵盖了从基础到高级的各种主题,无论您是初学者还是有经验的开发者,都能从中获得宝贵的知识。欢迎与我一起分享编程的乐趣和探索的旅程!