Исходный код pikahandler.PikeHandler

import pika
import signal
import sys


[документация]class PikaHandler: """ Класс, который будет отвечать за расределение запросов к вашим функциям. Важная вещь, не надо его обижать. Вызывать в начале кода и потом использовать для декорирования. Передавать либо `server, port, host, user (optional), password (optional)`, либо `connection=connection`, где значение connection - `pika.BlockingConnection(pika.ConnectionParameters(<..>))` **Агрументы и атрибуты** :param server: домен или IP сервера, по умолчанию 'localhost' :type server: ``str``, optional :param port: порт сервера, по умолчанию '5672' :type port: ``int``, optional :param host: адрес очережи, по умолчанию - `/` :type host: ``str``, optional :param user: пользователь :type user: ``str``, optional :param password: пароль :type password: ``str``, optional :param connection: объект соединения :type connection: ``pika.BlockingConnection``, optional """ def __init__(self, server="localhost", port=5672, host="/", user=None, password=None, connection=None): if not connection: args = [server, port, host] if user and password: args.append(pika.PlainCredentials(user, password)) self.connection = pika.BlockingConnection(pika.ConnectionParameters(*args)) self.channel = self.connection.channel() else: self.connection = connection self.channel = self.connection.channel() self.handlers = dict() signal.signal(signal.SIGINT, self._close_connection) def _close_connection(self, **kwargs): print(' [*] You pressed Ctrl+C or killed me with -2') self.connection.close() print(' [*] Connection closed. By!') # .... Put your logic here ..... sys.exit(0)
[документация] def handler(self, queue: str, func=None): """ Декоратор для сохранения функции в список обрабатываемых **Аргументы** :param queue: название очереди :type queue: ``str`` :param func: лямбда-функция или обычная функция, принимающая в себя один аргумент - класс с ключами ``ch``, ``method``, ``properties``, ``body``. """ def decorator(handler): print(f' [*] Function "{handler.__name__}" added to handler for "{queue}" queue') if func: if queue in self.handlers: self.handlers[queue].append({"handler": handler, "filter": func}) else: self.channel.queue_declare(queue=queue) self.handlers[queue] = [{"handler": handler, "filter": func}] return handler else: raise ValueError("filter-function 'func' must be function with one argument or lambda") return decorator
def _parse(self, ch, method, properties, body): for handler in self.handlers[method.routing_key]: if handler["filter"]({"ch": ch, "method": method, "properties": properties, "body": body}): handler["handler"](ch, method, properties, body)
[документация] def start(self): """ Запускает приём и обработку сообщений из очереди. """ for queue in self.handlers.keys(): self.channel.basic_consume(queue=queue, on_message_callback=self._parse, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()