下载依赖包:pip3 install pika
封装公共方法
import pika import json class RaMqStart(object): def __init__(self,url,user,pwd,vhost,queue,key,exchange): # 初始化数据 ''' user:用户 pwd:密码 vhost:虚拟主机 queue:队列 exchange:交换机 key:交换机绑定队列的关键字 ''' self.url = url self.user = user self.pwd = pwd self.vhost = vhost self.exchange = exchange self.queue = queue self.key = key #MQ认证、链接MQ self.credentials = pika.PlainCredentials(self.user, self.pwd) self.cnnection = pika.BlockingConnection( pika.ConnectionParameters(self.url, 5672, self.vhost, self.credentials)) self.channel = self.cnnection.channel() def start_producer(self,body): #声明队列 self.channel.queue_declare(queue=self.queue, durable=True) # 发送、发布消息:routing_key--队列名称,body--消息 self.channel.basic_publish(exchange=self.exchange, routing_key=self.key, body=body) def mq_close(self): #关闭通讯 self.cnnection.close()
调用函数
from public import RaMqStart import json def mq_run(json_body,vhost,queue,exchange): mq = RaMqStart( url="common-mq-test01.shein.com", user=vhost, pwd=vhost, vhost=vhost, queue=queue, exchange=exchange, key="" ) #解析json body = json.dumps(json_body) #发送消息 mq.start_producer(body=body) mq.mq_close() if __name__ == '__main__': pass