生产者代码
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import json
import pika
from pika.exchange_type import ExchangeType
print('pika version: %s' % pika.__version__)
def main():
credentials = pika.PlainCredentials('root', 'root')
parameters = pika.ConnectionParameters('192.168.133.11', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare( # 创建交换机
exchange='test_exchange',
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
channel.queue_declare(queue='standard') # 创建队列
channel.queue_bind( # 队列绑定
queue='standard', exchange='test_exchange', routing_key='standard_key')
# 发布消息
channel.basic_publish(
exchange='test_exchange',
routing_key='standard_key',
body=json.dumps("hello world"),
properties=pika.BasicProperties(content_type='application/json'))
connection.close()
if __name__ == '__main__':
main()
消费者
import functools
import json
import logging
import pika
from pika.exchange_type import ExchangeType
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
def on_message(chan, method_frame, header_frame, body, userdata=None):
"""Called when a message is received. Log message and ack it."""
LOGGER.info('Delivery properties: %s, message metadata: %s', method_frame, header_frame)
LOGGER.info('Userdata: %s, message body: %s', userdata, json.loads(body))
LOGGER.info("chan>>>>>>>>>>>>>>>>>>{0}, type:{1}".format(chan, type(chan))) # BlockingChannel
# 手动回执
chan.basic_ack(delivery_tag=method_frame.delivery_tag)
def main():
"""Main method."""
credentials = pika.PlainCredentials('root', 'root')
parameters = pika.ConnectionParameters('192.168.133.11', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 创建信道
channel.exchange_declare( # 创建交换机
exchange='test_exchange',
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
channel.queue_declare(queue='standard') # 创建队列
channel.queue_bind( # 队列绑定
queue='standard', exchange='test_exchange', routing_key='standard_key')
channel.basic_qos(prefetch_count=1)
on_message_callback = functools.partial(
on_message, userdata='on_message_userdata')
# 消费者消费消息后,回调函数on_message_callback,内部可以手动ack
# basic_consume中有auto_ack字段,默认为false,即不自动ack。
channel.basic_consume('standard', on_message_callback)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
if __name__ == '__main__':
main()