MQ_LISTNER.py
8.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import time
import logging
import pika
import json
import time
import datetime
import pytz
from threading import Thread
from queue import Queue, Empty
from ocm_fwk.ocm_mq import OcmMqBase
import config as cfg
'''
class MyJSONEncoder(json.JSONEncoder):
#Override the default method
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.replace(tzinfo=datetime.timezone.utc).isoformat()
if isinstance(obj, np.ndarray):
return obj.tolist()
'''
class test_cons(OcmMqBase):
def __init__(self, settings, consumer_queue=None, consumer_routing_key=None, consume_only_routing_key=True,
message_ttl_ms=None):
super(test_cons, self).__init__(settings)
self.logger = logging.getLogger(__name__)
if consumer_queue is None:
raise TypeError
if consumer_routing_key is None:
raise TypeError
self.consumer_queue = consumer_queue
self.consumer_routing_key = consumer_routing_key
self.consume_only_routing_key = consume_only_routing_key
self.message_ttl_ms = message_ttl_ms
self._loop_thread = Thread(target=self._main_loop)
self._stop_flag = False
self.mms_main_query = Queue()
def consume(self, routing_key, message_object):
self.mms_main_query.put(message_object)
print("{} -> consume bu rout {} messege :{}".format(datetime.datetime.now(), routing_key, message_object))
return True
def get_lazy_data(self):
result = []
while self.mms_main_query.qsize() != 0:
result.append(self.mms_main_query.get())
return result
def _consumer_callback(self, ch, method, properties, body):
# проверка правильности подписки на routing_key
if self.consume_only_routing_key:
if isinstance(self.consumer_routing_key, str): # если routing key - одна строка
if self.consumer_routing_key != method.routing_key:
# сообщение не может быть обработано и посылать его повторно не нужно
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.logger.warning('MQ: routing key({}) not proper'.format(method.routing_key))
return
elif isinstance(self.consumer_routing_key, tuple): # если routing key - кортеж строк
if method.routing_key not in self.consumer_routing_key:
# сообщение не может быть обработано и посылать его повторно не нужно
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.logger.warning('MQ: routing key({}) not proper'.format(method.routing_key))
return
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.logger.warning('MQ: routing key has not proper type')
return
# преобразование сообщения в нативный объект
try:
obj = json.loads(body.decode('utf-8'))
except Exception:
self.logger.exception("MQ: json conversion error")
# сообщение не может быть обработано и посылать его повторно не нужно
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return
# callback может вернуть
# None - простым вызовом return или по завершению кода обработчика, то же самое, что False
# True - сообщение обработано, можно посылать ack - удаляет сообщение из очереди
# False - сообщение не обработано, нужно его отправить обратно в очередь - послать nack(requeue=True)
ack = None
try:
ack = self.consume(method.routing_key, obj)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.logger.exception("MQ: message not consumed")
raise
# если callback не возвратил логический тип, значит ack - False
if ack is None:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
elif ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def _main_loop(self):
while not self._stop_flag:
connection = None
ch_consumer = None
try:
connection = pika.BlockingConnection(self.parameters)
ch_consumer = connection.channel()
ch_consumer.basic_qos(prefetch_count=100) # todo: вынести в настройки
# определение очереди сообщений, в случае отсутствия, будет создана
if self.message_ttl_ms is not None:
queue = ch_consumer.queue_declare(
queue=self.consumer_queue,
durable=True,
exclusive=False,
arguments={'x-message-ttl': self.message_ttl_ms, },
).method.queue
else:
queue = ch_consumer.queue_declare(
queue=self.consumer_queue,
durable=True,
exclusive=False,
).method.queue
# привязка к очереди
if isinstance(self.consumer_routing_key, str): # если routing key - одна строка
ch_consumer.queue_bind(
exchange=self.EXCHANGE,
queue=queue,
routing_key=self.consumer_routing_key,
)
elif isinstance(self.consumer_routing_key, tuple): # если routing key - кортеж строк
for key in self.consumer_routing_key:
ch_consumer.queue_bind(
exchange=self.EXCHANGE,
queue=queue,
routing_key=key
)
else:
raise TypeError
# настройка обработчика получения сообщений
ch_consumer.basic_consume(
consumer_callback=self._consumer_callback,
queue=queue,
no_ack=False,
consumer_tag="ocm_mq_consumer"
)
self.consumer_queue = queue
# блокирующий вызов ожидания входящих сообщений
try:
ch_consumer.start_consuming()
except:
ch_consumer.stop_consuming()
raise
except Exception:
self.logger.exception('mq consuming error, attempting to reconnect...')
finally:
try:
ch_consumer.close()
except:
pass
try:
connection.close()
except:
pass
time.sleep(self.RECONNECT_SLEEP_SECONDS)
try:
ch_consumer.close()
except:
pass
try:
connection.close()
except:
pass
def start(self):
""" Запуск подписчика для шины ОКМ """
self._loop_thread.start()
def stop(self):
self._stop_flag = True
bus_TS = test_cons(cfg, "makeyev_L", "measurement.precipitation.*", consume_only_routing_key=False)
bus_LO = test_cons(cfg, "makeyev_C", "metering.rainfall", consume_only_routing_key=False)
print("begin...\n")
#cons.stop()
bus_LO.start()
bus_TS.start()
i = 0
while i < 100:
i = i + 1
print("Iterator={}\t\tdata:{}".format(i, bus_LO.get_lazy_data()))
print("Iterator={}\t\tdata:{}".format(i, bus_TS.get_lazy_data()))
time.sleep(30)
'''
АГК-34Д uuid = "0efefaee-eada-446f-a2a0-f95acabfc0a7" Л
АГК-77Д uuid = "c3f0ecb7-c65e-4273-93a9-6f39a65dcd4a" Л
АГК-141 uuid = "83b895ae-3a21-4585-8289-e28da5d0cc43" C
АГК-040 uuid = "a6f44e50-ad18-4ee9-a6bb-ffae6a85d2a3" С
АГК-286 uuid = "33ce9a80-27df-41dd-8dbe-cf3ec918b4c2" С
'''