2012-10-23 24 views
9

मैं यह जांचना चाहता हूं कि उपभोक्ता/कार्यकर्तासंदेश का उपभोग करने के लिए मौजूद है या नहीं।पिका या खरगोश एमक्यू में, मैं कैसे जांचूं कि कोई उपभोक्ता वर्तमान में उपभोग कर रहा है या नहीं?

अगर वहाँ नहीं है किसी भी कार्यकर्ता, मैं कुछ श्रमिकों (दोनों उपभोक्ताओं और प्रकाशकों के एक मशीन पर हैं) शुरू होगा और उसके बाद संदेश प्रकाशन के बारे में जाना।

अगर वहाँ connection.check_if_has_consumers की तरह एक समारोह है, मैं इसे कुछ हद तक लागू करेगा इस तरह -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

लेकिन मैं पिका में check_if_has_consumers कार्यक्षमता के साथ किसी भी समारोह को खोजने में असमर्थ हूँ।

pika का उपयोग करके इसे पूरा करने का कोई तरीका है? या शायद, से खरगोश सीधे बात कर रहे हैं?

मैं पूरी तरह से यकीन है कि नहीं हूँ, लेकिन मैं वास्तव में, लगता है RabbitMQ अलग कतारों की सदस्यता ली उपभोक्ताओं की संख्या के बारे में पता किया जाएगा, क्योंकि यह संदेशों उन्हें प्रेषण करता है और acks

स्वीकार करता है मैं तो बस RabbitMQ 3 घंटे पहले के साथ शुरू किया गया ... किसी भी मदद का स्वागत है ...

यहाँ, workers.py कोड मैंने लिखा है अगर इसकी कोई मदद ....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

संपादित करें:

मैं तो यहाँ आगे बढ़ने के लिए एक वैकल्पिक हल है कि मैं लेने के लिए, जब तक कि एक बेहतर दृष्टिकोण साथ आता जा रहा हूँ,

तो, RabbitMQ इन किया है HTTP management apis, वे management plugin चालू करने के बाद और HTTP एपिस पेज के बीच में काम करते हैं

/एपीआई/कनेक्शन - सभी खुले कनेक्शन की एक सूची।

/एपीआई/कनेक्शन/नाम - एक व्यक्तिगत कनेक्शन। इसे हटाने से कनेक्शन बंद हो जाएगा।

तो, अगर मैं अपने श्रमिक कनेक्ट और मेरे दोनों का उत्पादन विभिन्न कनेक्शन नाम/उपयोगकर्ताओं के द्वारा, मैं जाँच करने में सक्षम हो जाएगा अगर कार्यकर्ता कनेक्शन खुला है ... (जब कर्मचारी मर जाता है तो समस्या हो सकती है ...)

बेहतर समाधान की प्रतीक्षा कर रहा है ...

संपादित करें:

सिर्फ RabbitMQ डॉक्स में इस मिला, लेकिन इस अजगर में क्या करना hacky होगा:

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

तो मैं कुछ की तरह कर सकता है,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

हैकी ... अभी भी उम्मीद है कि पिका के पास कुछ पायथन कार्य है ...

धन्यवाद,

उत्तर

7

मैं बस इसे भी देख रहा था। स्रोत और दस्तावेज़ों के माध्यम से पढ़ने के बाद मैंने निम्नलिखित में चैनल.py:

@property 
def consumer_tags(self): 
    """Property method that returns a list of currently active consumers 

    :rtype: list 

    """ 
    return self._consumers.keys() 

मेरा स्वयं का परीक्षण सफल रहा। मैंने निम्नलिखित का उपयोग किया जहां मेरा चैनल ऑब्जेक्ट self._channel:

if len(self._channel.consumer_tags) == 0: 
     LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") 
     ... 
0

मुझे वास्तव में यह एक अलग समस्या की तलाश में दुर्घटनाग्रस्त हो गया, लेकिन एक चीज जो आपकी मदद कर सकती है वह मूल_Publish फ़ंक्शन पर है, एक पैरामीटर "तत्काल" है जो गलत पर डिफ़ॉल्ट है।

एक विचार जो आप कर सकते हैं वह तत्काल ध्वज को सही करने के लिए है, जिसे एक कतार में बैठने के बजाय तुरंत उपभोक्ता द्वारा उपभोग करने की आवश्यकता होगी। यदि कोई कर्मचारी संदेश का उपभोग करने के लिए उपलब्ध नहीं है, तो यह आपको एक और कार्यकर्ता शुरू करने के लिए कहकर एक त्रुटि वापस लाएगा।

आपके सिस्टम के थ्रूपुट के आधार पर, यह या तो मृत श्रमिकों को बदलने के लिए बहुत से अतिरिक्त श्रमिकों या श्रमिकों को पैदा करेगा। पूर्व मुद्दे के लिए आप एक व्यवस्थापक जैसी प्रणाली लिख सकते हैं जो श्रमिकों को नियंत्रण कतार के माध्यम से ट्रैक करता है, जहां आप उन श्रमिकों की प्रक्रियाओं को मारने के लिए प्रक्रिया जैसे "धावक" को बता सकते हैं जो अब आवश्यक नहीं हैं।