2012-03-06 25 views
5

Pika timed received example के बाद, मैं एक ग्राहक को अधिक समवर्ती अनुरोधों को संभालना चाहता हूं। मेरा सवाल है, अगर हैंडल_डिल्वेवरी किसी भी तरह से नया संदेश प्राप्त किया जा सकता है और पिछले हैंडल_डिल्वेवरी रिटर्न की प्रतीक्षा नहीं कर रहा है?Pika RabbitMQ क्लाइंट द्वारा असीमित संदेश हैंडलिंग

उत्तर

2

ऐसा लगता है कि handle_delivery पर कॉल अवरुद्ध है, लेकिन आप इसे add_timeout का उपयोग करके I/O ईवेंट लूप में द्वितीयक हैंडलर जोड़ सकते हैं। मुझे लगता है कि आप यही करना चाहते हैं:

""" 
Asyncronous amqp consumer; do our processing via an ioloop timeout 
""" 

import sys 
import time 

from pika.adapters import SelectConnection 
from pika.connection import ConnectionParameters 

connection = None 
channel = None 


def on_connected(connection): 
    print "timed_receive: Connected to RabbitMQ" 
    connection.channel(on_channel_open) 


def on_channel_open(channel_): 
    global channel 
    channel = channel_ 
    print "timed_receive: Received our Channel" 
    channel.queue_declare(queue="test", durable=True, 
          exclusive=False, auto_delete=False, 
          callback=on_queue_declared) 

class TimingHandler(object): 
    count = 0 
    last_count = 0 

    def __init__(self, delay=0): 
     self.start_time = time.time() 
     self.delay = delay 

    def handle_delivery(self, channel, method, header, body): 
     connection.add_timeout(self.delay, self) 

    def __call__(self): 
     self.count += 1 
     if not self.count % 1000: 
      now = time.time() 
      duration = now - self.start_time 
      sent = self.count - self.last_count 
      rate = sent/duration 
      self.last_count = self.count 
      self.start_time = now 
      print "timed_receive: %i Messages Received, %.4f per second" %\ 
        (self.count, rate) 

def on_queue_declared(frame): 
    print "timed_receive: Queue Declared" 
    channel.basic_consume(TimingHandler().handle_delivery, queue='test', no_ack=True) 


if __name__ == '__main__': 

    # Connect to RabbitMQ 
    host = (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1' 
    connection = SelectConnection(ConnectionParameters(host), 
            on_connected) 
    # Loop until CTRL-C 
    try: 
     # Start our blocking loop 
     connection.ioloop.start() 

    except KeyboardInterrupt: 

     # Close the connection 
     connection.close() 

     # Loop until the connection is closed 
     connection.ioloop.start()