0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-30 03:33:59 +00:00
alerta_alerta/alerta/common/amqp.py
2014-03-19 21:59:20 +00:00

140 lines
4.2 KiB
Python

import json
from kombu import BrokerConnection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
# from kombu.utils.debug import setup_logging
from alerta.common import log as logging
from alerta.common import config
from alerta.common.utils import DateEncoder
LOG = logging.getLogger(__name__)
CONF = config.CONF
class Messaging(object):
amqp_opts = {
'amqp_queue': 'alerts',
'amqp_topic': 'notify',
'amqp_url': 'amqp://guest:guest@localhost:5672//', # RabbitMQ
# 'amqp_url': 'mongodb://localhost:27017/kombu', # MongoDB
# 'amqp_url': 'redis://localhost:6379/', # Redis
}
def __init__(self):
config.register_opts(Messaging.amqp_opts)
self.connection = None
self.channel = None
self.connect()
def connect(self):
self.connection = BrokerConnection(CONF.amqp_url)
self.connection.connect()
self.channel = self.connection.channel()
LOG.info('Connected to broker %s', CONF.amqp_url)
def disconnect(self):
return self.connection.release()
def is_connected(self):
return self.connection.connected
class DirectPublisher(object):
def __init__(self, channel, name=None):
config.register_opts(Messaging.amqp_opts)
self.channel = channel
self.exchange_name = name or CONF.amqp_queue
self.exchange = Exchange(name=self.exchange_name, type='direct', channel=self.channel, durable=True)
self.producer = Producer(exchange=self.exchange, channel=self.channel, serializer='json')
LOG.info('Configured direct publisher on queue %s', CONF.amqp_queue)
def send(self, msg):
self.producer.publish(json.dumps(msg.get_body(), cls=DateEncoder), exchange=self.exchange,
serializer='json', declare=[self.exchange], routing_key=self.exchange_name)
LOG.info('Message sent to exchange "%s"', self.exchange_name)
class FanoutPublisher(object):
def __init__(self, channel, name=None):
config.register_opts(Messaging.amqp_opts)
self.channel = channel
self.exchange_name = name or CONF.amqp_topic
self.exchange = Exchange(name=self.exchange_name, type='fanout', channel=self.channel)
self.producer = Producer(exchange=self.exchange, channel=self.channel, serializer='json')
LOG.info('Configured fanout publisher on topic "%s"', CONF.amqp_topic)
def send(self, msg):
self.producer.publish(json.dumps(msg.get_body(), cls=DateEncoder), exchange=self.exchange,
serializer='json', declare=[self.exchange])
LOG.info('Message sent to exchange "%s"', self.exchange_name)
class DirectConsumer(ConsumerMixin):
config.register_opts(Messaging.amqp_opts)
def __init__(self, connection):
self.connection = connection
self.channel = self.connection.channel()
self.exchange = Exchange(CONF.amqp_queue, 'direct', channel=self.channel, durable=True)
self.queue = Queue(CONF.amqp_queue, exchange=self.exchange, routing_key=CONF.amqp_queue, channel=self.channel)
LOG.info('Configured direct consumer on queue %s', CONF.amqp_queue)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.queue], callbacks=[self.on_message])
]
def on_message(self, body, message):
LOG.debug('Received queue message: {0!r}'.format(body))
message.ack()
class FanoutConsumer(ConsumerMixin):
config.register_opts(Messaging.amqp_opts)
def __init__(self, connection):
self.connection = connection
self.channel = self.connection.channel()
self.exchange = Exchange(CONF.amqp_topic, 'fanout', channel=self.channel, durable=True)
self.queue = Queue('', exchange=self.exchange, routing_key='', channel=self.channel, exclusive=True)
LOG.info('Configured fanout consumer on topic "%s"', CONF.amqp_topic)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.queue], callbacks=[self.on_message])
]
def on_message(self, body, message):
LOG.debug('Received topic message: {0!r}'.format(body))
message.ack()