0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-01-30 03:33:59 +00:00
alerta_alerta/alerta/common/mq.py
2013-11-09 23:20:39 +00:00

144 lines
4.7 KiB
Python

import json
import stomp
from stomp import exception, ConnectionListener
from alerta.common import log as logging
from alerta.common import config
from alerta.common.utils import DateEncoder
LOG = logging.getLogger('stomp.py')
CONF = config.CONF
_RECONNECT_SLEEP_INITIAL = 2 # seconds
_RECONNECT_SLEEP_INCREASE = 2
_RECONNECT_SLEEP_MAX = 300 # seconds
_RECONNECT_ATTEMPTS_MAX = 20
class Messaging(object):
mq_opts = {
'stomp_host': 'localhost',
'stomp_port': 61613,
'inbound_queue': '/exchange/alerts',
'outbound_queue': '/queue/logger',
'outbound_topic': '/topic/notify',
'rabbit_host': 'localhost',
'rabbit_port': 5672,
'rabbit_use_ssl': False,
'rabbit_userid': 'guest',
'rabbit_password': 'guest',
'rabbit_virtual_host': '/',
}
def __init__(self):
config.register_opts(Messaging.mq_opts)
logging.setup('stomp.py')
def connect(self, callback=None, wait=False):
self.callback = callback
self.wait = wait
try:
self.conn = stomp.connect.StompConnection10(
[(CONF.stomp_host, CONF.stomp_port)],
reconnect_sleep_initial=_RECONNECT_SLEEP_INITIAL,
reconnect_sleep_increase=_RECONNECT_SLEEP_INCREASE,
reconnect_sleep_max=_RECONNECT_SLEEP_MAX,
reconnect_attempts_max=_RECONNECT_ATTEMPTS_MAX
)
if self.callback:
self.conn.set_listener('', self.callback)
self.conn.start()
self.conn.connect(wait=self.wait)
except Exception, e:
LOG.error('Could not connect to broker %s:%s : %s', CONF.stomp_host, CONF.stomp_port, e)
return
LOG.info('Connected to broker %s:%s', CONF.stomp_host, CONF.stomp_port)
def reconnect(self):
LOG.warning('Reconnecting to message broker...')
try:
self.conn = stomp.connect.StompConnection10(
[(CONF.stomp_host, CONF.stomp_port)],
reconnect_sleep_initial=_RECONNECT_SLEEP_INITIAL,
reconnect_sleep_increase=_RECONNECT_SLEEP_INCREASE,
reconnect_sleep_max=_RECONNECT_SLEEP_MAX,
reconnect_attempts_max=_RECONNECT_ATTEMPTS_MAX
)
if self.callback:
self.conn.set_listener('', self.callback)
self.conn.start()
self.conn.connect(wait=self.wait)
except Exception, e:
LOG.error('Could not reconnect to broker %s:%s : %s', CONF.stomp_host, CONF.stomp_port, e)
return
LOG.info('Reconnected to broker %s:%s', CONF.stomp_host, CONF.stomp_port)
def subscribe(self, destination=None, ack='auto'):
self.destination = destination or CONF.inbound_queue
self.conn.subscribe(destination=self.destination, ack=ack)
def send(self, msg, destination=None):
self.destination = destination or CONF.inbound_queue
LOG.debug('header = %s', msg.get_header())
LOG.debug('message = %s', msg.get_body())
LOG.info('Send %s %s to %s', msg.get_type(), msg.get_id(), self.destination)
try:
self.conn.send(destination=self.destination, body=json.dumps(msg.get_body(), cls=DateEncoder),
headers=msg.get_header())
except exception.NotConnectedException, e:
LOG.error('Could not send message to broker %s:%s : %s', CONF.stomp_host, CONF.stomp_port, e)
return
LOG.info('Message sent to broker %s:%s', CONF.stomp_host, CONF.stomp_port)
def disconnect(self):
if self.is_connected():
LOG.info('Disconnecting from broker %s:%s', CONF.stomp_host, CONF.stomp_port)
self.conn.disconnect()
LOG.info('Disconnected!')
def is_connected(self):
return self.conn.is_connected()
class MessageHandler(ConnectionListener):
"""
A generic message handler class.
Usage: Subclass the MessageHandler class and override the on_message() method
"""
def on_connecting(self, host_and_port):
LOG.info('Connecting to %s', host_and_port)
def on_connected(self, headers, body):
LOG.info('Connected to %s %s', headers, body)
def on_disconnected(self):
LOG.error('Connection to messaging server has been lost.')
def on_message(self, headers, body):
LOG.info("Received message %s %s", headers, body)
def on_receipt(self, headers, body):
LOG.debug('Receipt received %s %s', headers, body)
def on_error(self, headers, body):
LOG.error('Error %s %s', headers, body)
def on_send(self, frame):
LOG.debug('Sending message %s', frame)
def on_heartbeat(self):
LOG.debug('Received heartbeat')