0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-02-21 20:36:07 +00:00
alerta_alerta/alerta/common/amqp.py

152 lines
4.2 KiB
Python
Raw Normal View History

2013-02-27 18:10:44 +00:00
2014-03-31 11:55:44 +00:00
import sys
2013-02-21 14:22:12 +00:00
2014-04-29 21:55:53 +00:00
from kombu import BrokerConnection, Exchange, Queue, Producer
2014-03-17 16:59:17 +00:00
from kombu.mixins import ConsumerMixin
2014-04-29 21:55:53 +00:00
from kombu.utils.debug import setup_logging
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
from alerta.common import log as logging
2013-02-21 14:22:12 +00:00
from alerta.common import config
2014-03-17 16:59:17 +00:00
LOG = logging.getLogger(__name__)
2013-02-21 14:22:12 +00:00
CONF = config.CONF
2014-03-17 16:59:17 +00:00
class Messaging(object):
amqp_opts = {
2014-04-29 22:45:28 +00:00
'amqp_queue': '', # do not send to queue by default
2014-03-17 16:59:17 +00:00
'amqp_topic': 'notify',
2014-04-29 22:45:28 +00:00
'amqp_url': 'amqp://guest:guest@localhost:5672//', # RabbitMQ
2014-03-17 16:59:17 +00:00
# 'amqp_url': 'mongodb://localhost:27017/kombu', # MongoDB
# 'amqp_url': 'redis://localhost:6379/', # Redis
2014-04-29 22:45:28 +00:00
# 'amqp_url': 'sqs://ACCESS_KEY:SECRET_KEY@' # AWS SQS (must define amqp_queue)
2014-04-30 09:48:02 +00:00
# 'amqp_sqs_region': 'eu-west-1' # required if SQS is used
2014-03-17 16:59:17 +00:00
}
def __init__(self):
config.register_opts(Messaging.amqp_opts)
2014-04-29 21:55:53 +00:00
if CONF.debug:
setup_logging(loglevel='DEBUG', loggers=[''])
2014-03-17 16:59:17 +00:00
self.connection = None
self.connect()
def connect(self):
2014-03-31 11:55:44 +00:00
if not CONF.amqp_url:
return
2014-04-30 09:48:02 +00:00
if CONF.amqp_sqs_region:
transport_options = {'region': CONF.amqp_sqs_region}
else:
transport_options = {}
2014-04-29 21:55:53 +00:00
self.connection = BrokerConnection(
CONF.amqp_url,
2014-04-30 09:48:02 +00:00
transport_options=transport_options
2014-04-29 21:55:53 +00:00
)
2014-03-31 11:55:44 +00:00
try:
self.connection.connect()
except Exception as e:
LOG.error('Failed to connect to AMQP transport %s: %s', CONF.amqp_url, e)
sys.exit(1)
2014-03-17 16:59:17 +00:00
LOG.info('Connected to broker %s', CONF.amqp_url)
def disconnect(self):
return self.connection.release()
def is_connected(self):
return self.connection.connected
class DirectPublisher(object):
2014-04-29 21:55:53 +00:00
def __init__(self, connection):
2014-03-17 16:59:17 +00:00
config.register_opts(Messaging.amqp_opts)
2014-04-29 21:55:53 +00:00
self.queue = connection.SimpleQueue(CONF.amqp_queue)
2014-03-17 16:59:17 +00:00
2014-04-29 21:55:53 +00:00
LOG.info('Configured direct publisher on queue "%s"', CONF.amqp_queue)
2014-03-17 16:59:17 +00:00
def send(self, msg):
2014-04-29 21:55:53 +00:00
self.queue.put(msg.get_body())
2014-03-17 16:59:17 +00:00
2014-04-29 22:45:28 +00:00
LOG.info('Message sent to queue "%s"', CONF.amqp_queue)
2014-03-17 16:59:17 +00:00
class FanoutPublisher(object):
2014-04-29 21:55:53 +00:00
def __init__(self, connection):
2014-03-17 16:59:17 +00:00
config.register_opts(Messaging.amqp_opts)
2014-04-29 21:55:53 +00:00
self.channel = connection.channel()
self.exchange_name = CONF.amqp_topic
2014-03-17 16:59:17 +00:00
self.exchange = Exchange(name=self.exchange_name, type='fanout', channel=self.channel)
2014-04-29 21:55:53 +00:00
self.producer = Producer(exchange=self.exchange, channel=self.channel)
2014-03-17 16:59:17 +00:00
LOG.info('Configured fanout publisher on topic "%s"', CONF.amqp_topic)
def send(self, msg):
2014-04-29 21:55:53 +00:00
self.producer.publish(msg.get_body(), declare=[self.exchange], retry=True)
2014-03-17 16:59:17 +00:00
2014-04-29 22:45:28 +00:00
LOG.info('Message sent to topic "%s"', CONF.amqp_topic)
2014-03-17 16:59:17 +00:00
class DirectConsumer(ConsumerMixin):
config.register_opts(Messaging.amqp_opts)
def __init__(self, connection):
2014-04-29 21:55:53 +00:00
self.channel = connection.channel()
self.exchange = Exchange(CONF.amqp_queue, type='direct', channel=self.channel, durable=True)
2014-03-17 16:59:17 +00:00
self.queue = Queue(CONF.amqp_queue, exchange=self.exchange, routing_key=CONF.amqp_queue, channel=self.channel)
LOG.info('Configured direct consumer on queue %s', CONF.amqp_queue)
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
def get_consumers(self, Consumer, channel):
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
return [
Consumer(queues=[self.queue], callbacks=[self.on_message])
]
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
def on_message(self, body, message):
2014-04-29 21:55:53 +00:00
2014-03-17 16:59:17 +00:00
LOG.debug('Received queue message: {0!r}'.format(body))
message.ack()
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
class FanoutConsumer(ConsumerMixin):
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
config.register_opts(Messaging.amqp_opts)
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
def __init__(self, connection):
2013-02-21 14:22:12 +00:00
2014-04-29 21:55:53 +00:00
self.channel = connection.channel()
self.exchange = Exchange(CONF.amqp_topic, type='fanout', channel=self.channel, durable=True)
2014-03-17 16:59:17 +00:00
self.queue = Queue('', exchange=self.exchange, routing_key='', channel=self.channel, exclusive=True)
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
LOG.info('Configured fanout consumer on topic "%s"', CONF.amqp_topic)
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
def get_consumers(self, Consumer, channel):
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
return [
Consumer(queues=[self.queue], callbacks=[self.on_message])
]
2013-02-21 14:22:12 +00:00
2014-03-17 16:59:17 +00:00
def on_message(self, body, message):
2014-04-29 21:55:53 +00:00
2014-03-17 16:59:17 +00:00
LOG.debug('Received topic message: {0!r}'.format(body))
message.ack()