Source code for edeposit.amqp.amqpdaemon

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Interpreter version: python 2.7
#
#= Imports ====================================================================
"""
This module provides generic AMQP daemon and builder of common connection
informations, which are defined as constants in :mod:`edeposit.amqp.settings`.

Daemon is used by :mod:`edeposit_amqp_alephdaemon` and
:mod:`edeposit_amqp_calibredaemon`.
"""
import traceback

import pika

try:
    import serializers
except ImportError:
    import edeposit.amqp.serializers as serializers

import pikadaemon
import settings


# Functions & classes =========================================================
[docs]def getConParams(virtualhost): """ Connection object builder. Args: virtualhost (str): selected virtualhost in rabbitmq Returns: pika.ConnectionParameters: object filled by `constants` from :class:`edeposit.amqp.settings`. """ return pika.ConnectionParameters( host=settings.RABBITMQ_HOST, port=int(settings.RABBITMQ_PORT), virtual_host=virtualhost, credentials=pika.PlainCredentials( settings.RABBITMQ_USER_NAME, settings.RABBITMQ_USER_PASSWORD ) )
[docs]class AMQPDaemon(pikadaemon.PikaDaemon): def __init__(self, con_param, queue, out_exch, out_key, react_fn, glob): """ Args: con_param (ConnectionParameters): see :func:`getConParams` for details queue (str): name of the queue out_exch (str): name of the exchange for outgoing messages out_key (str): what key will be used to send messages back react_fn (fn): function, which can react to messages, see Note for details glob (dict): result of ``globals()`` call - used in deserializer to automatically build classes, which are not available in this namespace of this package Note: ``react_fn`` parameter is expected to be function, which gets two parameters - `message` (some form of message, it can be also namedtuple), and `UUID` containing unique identificator of the message. Example of function used as `react_fn` parameter:: def reactToAMQPMessage(message, UUID): response = None if message == 1: return 2 elif message == "Hello": return "Hi" elif type(message) == dict: return {1: 2} raise UserWarning("Unrecognized message") As you can see, protocol is pretty easy. You get `message`, to which you react somehow and return `response`. Thats all. """ super(AMQPDaemon, self).__init__( con_param, queue, out_exch, out_key ) self.react_fn = react_fn self.globals = glob
[docs] def parseKey(self, method_frame): key = "" if hasattr(method_frame, "routing_key"): key = method_frame.routing_key if "." in key: key = key.rsplit(".", 1)[0] + "." + self.output_key else: key = self.output_key return key
[docs] def onMessageReceived(self, method_frame, properties, body): """ React to received message - deserialize it, add it to users reaction function stored in ``self.react_fn`` and send back result. If `Exception` is thrown during process, it is sent back instead of message. Note: In case of `Exception`, response message doesn't have useful `body`, but in headers is stored following (string) parameters: - ``exception``, where the Exception's message is stored - ``exception_type``, where ``e.__class__`` is stored - ``exception_name``, where ``e.__class__.__name__`` is stored - ``traceback`` where the full traceback is stored (contains line number) This allows you to react to unexpected cases at the other end of the AMQP communication. """ # if UUID is not in headers, just ack the message and ignore it if "UUID" not in properties.headers: self.process_exception( e=ValueError("No UUID provided, message ignored."), uuid="", routing_key=self.parseKey(method_frame), body=body ) return True # ack message key = self.parseKey(method_frame) uuid = properties.headers["UUID"] try: result = self.react_fn( serializers.deserialize(body, self.globals), self.get_sendback(uuid, key) ) print "sending response", key self.sendResponse( serializers.serialize(result), uuid, key ) except Exception, e: self.process_exception( e=e, uuid=uuid, routing_key=key, body=str(e), tb=traceback.format_exc().strip() ) return True # ack message
[docs] def get_sendback(self, uuid, key): """ Return function for sending progress messages back to original caller. Args: uuid (str): UUID of the received message. key (str): Routing key. Returns: fn reference: Reference to function which takes only one data \ argument. """ def send_back_callback(data): self.sendResponse( serializers.serialize(data), uuid, key ) return send_back_callback
[docs] def process_exception(self, e, uuid, routing_key, body, tb=None): """ Callback called when exception was raised. This method serializes the exception and sends it over AMQP back to caller. Args: e (obj): Instance of the exception. uuid (str): UUID of the message that caused the exception to raise. routing_key (str): Which routing key was used. body (str): Body of the exception - the longer text. tb (str, default None): Traceback (stacktrace)v of the exception. """ # get informations about message msg = e.message if hasattr(e, "message") else str(e) exception_type = str(e.__class__) exception_name = str(e.__class__.__name__) print "Sending exception %s: %s for UUID %s." % ( exception_name, msg, uuid ) self.sendMessage( self.output_exchange, routing_key, str(body), properties=pika.BasicProperties( content_type="application/text", delivery_mode=2, headers={ "exception": msg, "exception_type": exception_type, "exception_name": exception_name, "traceback": tb, "UUID": uuid } ) )