मैं यह जांचना चाहता हूं कि उपभोक्ता/कार्यकर्तासंदेश का उपभोग करने के लिए मौजूद है या नहीं।पिका या खरगोश एमक्यू में, मैं कैसे जांचूं कि कोई उपभोक्ता वर्तमान में उपभोग कर रहा है या नहीं?
अगर वहाँ नहीं है किसी भी कार्यकर्ता, मैं कुछ श्रमिकों (दोनों उपभोक्ताओं और प्रकाशकों के एक मशीन पर हैं) शुरू होगा और उसके बाद संदेश प्रकाशन के बारे में जाना।
अगर वहाँ connection.check_if_has_consumers
की तरह एक समारोह है, मैं इसे कुछ हद तक लागू करेगा इस तरह -
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2))
connection.close()
लेकिन मैं पिका में check_if_has_consumers
कार्यक्षमता के साथ किसी भी समारोह को खोजने में असमर्थ हूँ।
pika का उपयोग करके इसे पूरा करने का कोई तरीका है? या शायद, से खरगोश सीधे बात कर रहे हैं?
मैं पूरी तरह से यकीन है कि नहीं हूँ, लेकिन मैं वास्तव में, लगता है RabbitMQ अलग कतारों की सदस्यता ली उपभोक्ताओं की संख्या के बारे में पता किया जाएगा, क्योंकि यह संदेशों उन्हें प्रेषण करता है और acks
स्वीकार करता है मैं तो बस RabbitMQ 3 घंटे पहले के साथ शुरू किया गया ... किसी भी मदद का स्वागत है ...
यहाँ, workers.py कोड मैंने लिखा है अगर इसकी कोई मदद ....
import multiprocessing
import pika
def start_workers(num=3):
"""start workers as non-daemon processes"""
for i in xrange(num):
process = WorkerProcess()
process.start()
class WorkerProcess(multiprocessing.Process):
"""
worker process that waits infinitly for task msgs and calls
the `callback` whenever it gets a msg
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_working = multiprocessing.Event()
def run(self):
"""
worker method, open a channel through a pika connection and
start consuming
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='worker_queue', auto_delete=False,
durable=True)
# don't give work to one worker guy until he's finished
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='worker_queue')
# do what `channel.start_consuming()` does but with stopping signal
while len(channel._consumers) and not self.stop_working.is_set():
channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
return 0
def signal_exit(self):
"""exit when finished with current loop"""
self.stop_working.set()
def exit(self):
"""exit worker, blocks until worker is finished and dead"""
self.signal_exit()
while self.is_alive(): # checking `is_alive()` on zombies kills them
time.sleep(1)
def kill(self):
"""kill now! should not use this, might create problems"""
self.terminate()
self.join()
def callback(channel, method, properties, body):
"""pika basic consume callback"""
print 'GOT:', body
# do some heavy lifting here
result = save_to_database(body)
print 'DONE:', result
channel.basic_ack(delivery_tag=method.delivery_tag)
संपादित करें:
मैं तो यहाँ आगे बढ़ने के लिए एक वैकल्पिक हल है कि मैं लेने के लिए, जब तक कि एक बेहतर दृष्टिकोण साथ आता जा रहा हूँ,
तो, RabbitMQ इन किया है HTTP management apis, वे management plugin चालू करने के बाद और HTTP एपिस पेज के बीच में काम करते हैं
/एपीआई/कनेक्शन - सभी खुले कनेक्शन की एक सूची।
/एपीआई/कनेक्शन/नाम - एक व्यक्तिगत कनेक्शन। इसे हटाने से कनेक्शन बंद हो जाएगा।
तो, अगर मैं अपने श्रमिक कनेक्ट और मेरे दोनों का उत्पादन विभिन्न कनेक्शन नाम/उपयोगकर्ताओं के द्वारा, मैं जाँच करने में सक्षम हो जाएगा अगर कार्यकर्ता कनेक्शन खुला है ... (जब कर्मचारी मर जाता है तो समस्या हो सकती है ...)
बेहतर समाधान की प्रतीक्षा कर रहा है ...
संपादित करें:
सिर्फ RabbitMQ डॉक्स में इस मिला, लेकिन इस अजगर में क्या करना hacky होगा:
[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue 0
...done.
तो मैं कुछ की तरह कर सकता है,
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
हैकी ... अभी भी उम्मीद है कि पिका के पास कुछ पायथन कार्य है ...
धन्यवाद,