#!/usr/bin/env python # # amqpclient.py # # Copyright 2013-2017,2019 Todd Shadburn # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import pika class AMQPClient(object): def __init__(self): self.conn = None self.ch = None self.proto = 'amqp' self.host = 'localhost' self.port = 5672 self.vhost = '/' self.username = 'guest' self.password = 'guest' self.consumer = None return def connect(self, **kwargs): if 'proto' in kwargs: self.proto = kwargs['proto'] if 'host' in kwargs: self.host = kwargs['host'] if 'port' in kwargs: self.port = int(kwargs['port']) if 'vhost' in kwargs: self.vhost = kwargs['vhost'] if 'username' in kwargs: self.username = kwargs['username'] if 'password' in kwargs: self.password = kwargs['password'] self.creds = pika.PlainCredentials(self.username,self.password) urlopts = '' if 'options' in kwaegs: self.options = kwargs['options'] urlopts = urllib.urlencode(self.options) url = '%s://%s:%s@%s:%d/%s?%s' % ( self.proto, self.username, self.password, self.host, self.port, self.virtualhost, urlopts) connparams = pika.connection.URLParameters(url) self.conn = pika.BlockingConnection(connparams) self.ch = self.conn.channel() return connection def close(self): #if self.consumer: # self.ch.basic_cancel(self.consumer) if self.ch: self.ch.close() return def declare_queue(self, queuename, durable=False): self.ch.queue_declare(queue=queuename, durable=durable) return def delete_queue(self, queuename): self.ch.queue_delete(queue=queuename) return def attach(self, queuename, callback, durable=False): if not queuename is None: self.ch.queue_declare(queue=queuename, durable=durable) self.consumer = self.ch.basic_consume(callback, queue=queuename, no_ack=True) else: self.consumer = self.ch.basic_consume(callback, queue=self.callbackq, no_ack=True) return def bind(self, exchange, queuename, durable=False): if not queuename is None: self.ch.queue_declare(queue=queuename, durable=durable) self.ch.queue_bind(exchange=exchange, queue=queuename) else: self.ch.queue_bind(exchange=exchange, queue=self.callbackq) return def declare_exchange(self, exchangename, exchangetype='fanout'): self.ch.exchange_declare(exchange=exchangename, exchange_type=exchangetype) return def queue_get(self, queuename): method,header,body = self.ch.basic_get(queue=queuename) return (method,header,body) def basic_ack(self, method): self.ch.basic_ack(delivery_tag=method.delivery_tag) return def consume(self): self.loop = 1 while self.loop: # main loop self.ch.start_consuming() self.ch.basic_cancel(self.consumer) return def exit_loop(self): self.loop = 0 return def emit_to_queue(self, queuename='', message='', exchange=''): self.ch.basic_publish(exchange=exchange, routing_key=queuename, body=message) return def emit_exchange_event(self, exchange='', data=''): self.ch.basic_publish(exchange=exchange, routing_key='', body=data) return