MQ_LISTNER.py 8.94 KB
import time
import logging
import pika
import json
import time
import datetime
import pytz

from threading import Thread
from queue import Queue, Empty
from ocm_fwk.ocm_mq import OcmMqBase

import config as cfg

'''
class MyJSONEncoder(json.JSONEncoder):
        #Override the default method
        def default(self, obj):
            if isinstance(obj, (datetime.date, datetime.datetime)):
                return obj.replace(tzinfo=datetime.timezone.utc).isoformat()
            if isinstance(obj, np.ndarray):
                return obj.tolist()
'''

class test_cons(OcmMqBase):

    def __init__(self, settings, consumer_queue=None, consumer_routing_key=None, consume_only_routing_key=True,
                 message_ttl_ms=None):
        super(test_cons, self).__init__(settings)
        self.logger = logging.getLogger(__name__)

        if consumer_queue is None:
            raise TypeError
        if consumer_routing_key is None:
            raise TypeError

        self.consumer_queue = consumer_queue
        self.consumer_routing_key = consumer_routing_key
        self.consume_only_routing_key = consume_only_routing_key
        self.message_ttl_ms = message_ttl_ms

        self._loop_thread = Thread(target=self._main_loop)
        self._stop_flag = False

        self.mms_main_query = Queue()

    def consume(self, routing_key, message_object):
        self.mms_main_query.put(message_object)
        print("{} ->  consume bu rout {} messege :{}".format(datetime.datetime.now(), routing_key, message_object))
        return True

    def get_lazy_data(self):
        result = []
        while self.mms_main_query.qsize() != 0:
            result.append(self.mms_main_query.get())
        return result

    def _consumer_callback(self, ch, method, properties, body):
        # проверка правильности подписки на routing_key
        if self.consume_only_routing_key:
            if isinstance(self.consumer_routing_key, str):      # если routing key - одна строка
                if self.consumer_routing_key != method.routing_key:
                    # сообщение не может быть обработано и посылать его повторно не нужно
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    self.logger.warning('MQ: routing key({}) not proper'.format(method.routing_key))
                    return
            elif isinstance(self.consumer_routing_key, tuple):  # если routing key - кортеж строк
                if method.routing_key not in self.consumer_routing_key:
                    # сообщение не может быть обработано и посылать его повторно не нужно
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    self.logger.warning('MQ: routing key({}) not proper'.format(method.routing_key))
                    return
            else:
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                self.logger.warning('MQ: routing key has not proper type')
                return

        # преобразование сообщения в нативный объект
        try:
            obj = json.loads(body.decode('utf-8'))
        except Exception:
            self.logger.exception("MQ: json conversion error")
            # сообщение не может быть обработано и посылать его повторно не нужно
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            return

        # callback может вернуть
        #   None - простым вызовом return или по завершению кода обработчика, то же самое, что False
        #   True - сообщение обработано, можно посылать ack - удаляет сообщение из очереди
        #  False - сообщение не обработано, нужно его отправить обратно в очередь - послать nack(requeue=True)
        ack = None
        try:
            ack = self.consume(method.routing_key, obj)
        except Exception:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            self.logger.exception("MQ: message not consumed")
            raise

        # если callback не возвратил логический тип, значит ack - False
        if ack is None:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        elif ack:
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    def _main_loop(self):
        while not self._stop_flag:
            connection = None
            ch_consumer = None
            try:
                connection = pika.BlockingConnection(self.parameters)
                ch_consumer = connection.channel()
                ch_consumer.basic_qos(prefetch_count=100)       # todo: вынести в настройки

                # определение очереди сообщений, в случае отсутствия, будет создана
                if self.message_ttl_ms is not None:
                    queue = ch_consumer.queue_declare(
                        queue=self.consumer_queue,
                        durable=True,
                        exclusive=False,
                        arguments={'x-message-ttl': self.message_ttl_ms, },
                    ).method.queue
                else:
                    queue = ch_consumer.queue_declare(
                        queue=self.consumer_queue,
                        durable=True,
                        exclusive=False,
                    ).method.queue

                # привязка к очереди
                if isinstance(self.consumer_routing_key, str):  # если routing key - одна строка
                    ch_consumer.queue_bind(
                        exchange=self.EXCHANGE,
                        queue=queue,
                        routing_key=self.consumer_routing_key,
                    )
                elif isinstance(self.consumer_routing_key, tuple):  # если routing key - кортеж строк
                    for key in self.consumer_routing_key:
                        ch_consumer.queue_bind(
                            exchange=self.EXCHANGE,
                            queue=queue,
                            routing_key=key
                        )
                else:
                    raise TypeError

                # настройка обработчика получения сообщений
                ch_consumer.basic_consume(
                    consumer_callback=self._consumer_callback,
                    queue=queue,
                    no_ack=False,
                    consumer_tag="ocm_mq_consumer"
                )

                self.consumer_queue = queue

                # блокирующий вызов ожидания входящих сообщений
                try:
                    ch_consumer.start_consuming()
                except:
                    ch_consumer.stop_consuming()
                    raise

            except Exception:
                self.logger.exception('mq consuming error, attempting to reconnect...')
            finally:
                try:
                    ch_consumer.close()
                except:
                    pass

                try:
                    connection.close()
                except:
                    pass

            time.sleep(self.RECONNECT_SLEEP_SECONDS)

        try:
            ch_consumer.close()
        except:
            pass

        try:
            connection.close()
        except:
            pass

    def start(self):
        """ Запуск подписчика для шины ОКМ """
        self._loop_thread.start()

    def stop(self):
        self._stop_flag = True

bus_TS = test_cons(cfg, "makeyev_L", "measurement.precipitation.*", consume_only_routing_key=False)
bus_LO = test_cons(cfg, "makeyev_C", "metering.rainfall", consume_only_routing_key=False)
print("begin...\n")
#cons.stop()
bus_LO.start()
bus_TS.start()

i = 0

while i < 100:
    i = i + 1
    print("Iterator={}\t\tdata:{}".format(i, bus_LO.get_lazy_data()))
    print("Iterator={}\t\tdata:{}".format(i, bus_TS.get_lazy_data()))
    time.sleep(30)



'''
АГК-34Д uuid = "0efefaee-eada-446f-a2a0-f95acabfc0a7" Л
АГК-77Д uuid = "c3f0ecb7-c65e-4273-93a9-6f39a65dcd4a" Л
АГК-141 uuid = "83b895ae-3a21-4585-8289-e28da5d0cc43" C
АГК-040 uuid = "a6f44e50-ad18-4ee9-a6bb-ffae6a85d2a3" С
АГК-286 uuid = "33ce9a80-27df-41dd-8dbe-cf3ec918b4c2" С
'''