#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Interpreter version: python 2.7
#
"""
AMQP tool used for debugging and automatic RabbitMQ schema making.
"""
# Imports =====================================================================
import os
import sys
import json
import uuid
import os.path
import argparse
from functools import wraps
import pika
# if the module wasn't yet installed at this system, load it from package
try:
import edeposit.amqp.settings as settings
import edeposit.amqp.amqpdaemon as amqpdaemon
except ImportError:
sys.path.insert(0, os.path.abspath('../edeposit/amqp'))
import settings
import amqpdaemon
# Functions & objects =========================================================
[docs]def test_virtualhost(fn):
@wraps(fn)
def catch_exception(*args, **kwargs):
try:
return fn(*args, **kwargs)
except pika.exceptions.ConnectionClosed:
sys.stderr.write("Can't connect to virtuahost!\n")
sys.stderr.write("Make sure, that virtualhost is created.\n")
sys.exit()
return catch_exception
@test_virtualhost
[docs]def create_blocking_connection(host):
"""
Return properly created blocking connection.
Args:
host (str): Host as it is defined in :func:`.get_amqp_settings`.
Uses :func:`edeposit.amqp.amqpdaemon.getConParams`.
"""
return pika.BlockingConnection(
amqpdaemon.getConParams(
settings.get_amqp_settings()[host.lower()]["vhost"]
)
)
[docs]def create_schema(host):
"""
Create exchanges, queues and route them.
Args:
host (str): One of the possible hosts.
"""
connection = create_blocking_connection(host)
channel = connection.channel()
exchange = settings.get_amqp_settings()[host]["exchange"]
channel.exchange_declare(
exchange=exchange,
exchange_type="topic",
durable=True
)
print "Created exchange '%s'." % exchange
print "Creating queues:"
queues = settings.get_amqp_settings()[host]["queues"]
for queue in queues.keys():
channel.queue_declare(
queue=queue,
durable=True,
# arguments={'x-message-ttl': int(1000 * 60 * 60 * 24)} # :S
)
print "\tCreated durable queue '%s'." % queue
print
print "Routing exchanges using routing key to queues:"
for queue in queues.keys():
channel.queue_bind(
queue=queue,
exchange=exchange,
routing_key=queues[queue]
)
print "\tRouting exchange %s['%s'] -> '%s'." % (
exchange,
queues[queue],
queue
)
def _get_channel(host, timeout):
"""
Create communication channel for given `host`.
Args:
host (str): Specified --host.
timeout (int): Set `timeout` for returned `channel`.
Returns:
Object: Pika channel object.
"""
connection = create_blocking_connection(host)
# register timeout
if timeout >= 0:
connection.add_timeout(
timeout,
lambda: sys.stderr.write("Timeouted!\n") or sys.exit(1)
)
return connection.channel()
[docs]def receive(host, timeout):
"""
Print all messages in queue.
Args:
host (str): Specified --host.
timeout (int): How log should script wait for message.
"""
parameters = settings.get_amqp_settings()[host]
queues = parameters["queues"]
queues = dict(map(lambda (x, y): (y, x), queues.items())) # reverse items
queue = queues[parameters["out_key"]]
channel = _get_channel(host, timeout)
for method_frame, properties, body in channel.consume(queue):
print json.dumps({
"method_frame": str(method_frame),
"properties": str(properties),
"body": body
})
print "-" * 79
print
channel.basic_ack(method_frame.delivery_tag)
[docs]def send_message(host, data, timeout=None, properties=None):
"""
Send message to given `host`.
Args:
host (str): Specified host: aleph/ftp/whatever available host.
data (str): JSON data.
timeout (int, default None): How much time wait for connection.
"""
channel = _get_channel(host, timeout)
if not properties:
properties = pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
headers={"UUID": str(uuid.uuid4())}
)
parameters = settings.get_amqp_settings()[host]
channel.basic_publish(
exchange=parameters["exchange"],
routing_key=parameters["in_key"],
properties=properties,
body=data
)
[docs]def get_list_of_hosts():
"""
Returns:
list: List of strings with names of possible hosts.
"""
return settings.get_amqp_settings().keys()
def _require_host_parameter(args, to):
"""
Make sure, that user specified --host argument.
"""
if not args.host:
sys.stderr.write("--host is required parameter to --%s\n" % to)
sys.exit(1)
# Main program ================================================================
[docs]def main():
# parse arguments
parser = argparse.ArgumentParser(
description="""AMQP tool used for debugging and automatic RabbitMQ
schema making."""
)
parser.add_argument(
"-l",
"--list",
action='store_true',
help="List all possible hosts."
)
parser.add_argument(
"-s",
"--host",
choices=get_list_of_hosts() + ["all"],
help="""Specify host. You can get list of valid host by using --list
switch or use 'all' for all hosts."""
)
parser.add_argument(
"-c",
"--create",
action='store_true',
help="""Create exchanges/queues/routes for given host. --host is
required."""
)
parser.add_argument(
"-p",
"--put",
help="""Put message into input queue at given host. --put argument
expects file with JSON data or - as indication of stdin data
input."""
)
parser.add_argument(
"-g",
"--get",
action='store_true',
help="Get messages from --host output queue."
)
parser.add_argument(
'-t',
'--timeout',
metavar="N",
type=int,
default=-1,
help="Wait for message only N seconds."
)
args = parser.parse_args()
if args.list:
print "\n".join(get_list_of_hosts())
sys.exit(0)
if args.create:
_require_host_parameter(args, "create")
if args.host == "all":
for host in get_list_of_hosts():
create_schema(host)
else:
create_schema(args.host)
if args.put:
_require_host_parameter(args, "put")
data = None
if args.put == "-":
data = sys.stdin.read()
else:
if not os.path.exists(args.put):
sys.stderr.write("Can't open '%s'!\n" % args.put)
sys.exit(1)
with open(args.put) as f:
data = f.read()
send_message(args.host, data, args.timeout)
if args.get:
_require_host_parameter(args, "get")
if args.host == "all":
sys.stderr.write("Can't receive all hosts!\n")
sys.exit(1)
receive(args.host, args.timeout)
if __name__ == '__main__':
try:
main()
except pika.exceptions.AMQPConnectionError:
sys.stderr.write("Can't connect to RabbitMQ!\n")
sys.exit(1)
except KeyboardInterrupt:
print
sys.exit(0)