2012-02-22 24 views
14

मैं कुछ धागे में प्रक्रिया संदेशों चाहते हैं, लेकिन इस दौरान कोड निष्पादित मैं त्रुटि मिल रही है का उपयोग कर RabbitMQ को पावती संदेशों का प्रयास करें:त्रुटि "अज्ञात वितरण टैग" तब होता है जब मैं पिका (अजगर)

from __future__ import with_statement 
import pika 
import sys 
from pika.adapters.blocking_connection import BlockingConnection 
from pika import connection, credentials 
import time 
import threading 
import random 
from pika.adapters.select_connection import SelectConnection 
from pika.connection import Connection 
import traceback 


def doWork(body, args, channel): 


    r = random.random() 
    time.sleep(r * 10) 
    try:   
     channel.basic_ack(delivery_tag=args.delivery_tag) 

    except : 
     traceback.print_exc() 


auth = credentials.PlainCredentials(username="guest", password="guest") 
params = connection.ConnectionParameters(host="localhost", credentials=auth) 
conn = BlockingConnection(params) 
channel = conn.channel() 


while True: 

    time.sleep(0.03)  
    try: 

     method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
     if method_frame.NAME == 'Basic.GetEmpty': 
      continue   

     t = threading.Thread(target=doWork, args=[body, method_frame, channel]) 
     t.setDaemon(True) 
     t.start() 

    except Exception, e: 
     traceback.print_exc() 
     continue 

त्रुटि desctiption:

 
Traceback (most recent call last): 
    File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get 
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack) 
    File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get 
    no_ack=no_ack)) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method 
    self.connection.process_data_events() 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events 
    self._handle_read() 
    File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read 
    self._on_data_available(data) 
    File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available 
    frame)     # Args 
    File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process 
    callback(*args, **keywords) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close 
    frame.method.reply_text) 
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204') 

संस्करण: पिका 0.9.5, RabbitMQ 2.6.1

+0

कल मैं py-amqplib लाइब्रेरी का उपयोग करने की कोशिश की इसके बजाय पिका। यह ठीक काम किया। शायद पिका पुस्तकालय में एक समस्या है। – solo117

+1

यदि आप अपने कोड को एकाधिक धागे में साझा करना चाहते हैं तो आपको एक थ्रेड सुरक्षित लाइब्रेरी जैसे खरगोश या अम्कप-तूफान का उपयोग करना चाहिए। सुनिश्चित नहीं है कि py-amqplib थ्रेडसेफ है। https://github.com/eandersson/amqp-storm – eandersson

उत्तर

3

मैं इसका कोई सुधार नहीं है, लेकिन मैं पुष्टि कर सकते हैं कि यह BlockingConnection एडाप्टर का उपयोग होता है।

यह लगातार होती रहती है जब

स्वीकार करते हैं या संदेश है कि एक channel.basic_recover()

पिका 0.9.5, RabbitMQ 2.2.0, अजगर 2.7 के जवाब में redelivered की जा रही है, खारिज और Erlang R14B01 मेरे पास मौजूद वर्कअराउंड हमेशा वितरण_टैग = 0

मुझे संदेह है कि यह केवल तभी काम करता है जब आप जिस संदेश को एकिंग/नाकिंग कर रहे हैं वह अंतिम है जिसे आपने पढ़ा है (स्ट्रीम में)। जिस पुस्तकालय में मैं लिख रहा हूं, इस तरह से संदेश को इस तरह से सारणीबद्ध करता है कि प्रत्येक को स्वतंत्र रूप से स्वीकार किया जा सकता है, जो इस समाधान के साथ टूट जाता है।

क्या कोई यह पुष्टि कर सकता है कि यह अभी तक पिका टीम पर किसी के द्वारा तय या स्वीकार किया गया है? या, क्या यह RabbitMQ के साथ एक मुद्दा हो सकता है?

consumer_tag = channel.basic_consume(
     message_delivery_event, 
     no_ack=True, 
     queue=queue, 
    ) 

यह प्रारंभिक (redelivered नहीं) स्वीकृतियां पर वर्णित त्रुटि पैदा का प्रभाव था जब संदेश के वितरण टैग:

+0

मैं इस त्रुटि को नोड-एएमपीपी के साथ देख रहा हूं, इसलिए यह RabbitMQ (संस्करण 3.0.2-1) के साथ एक समस्या होनी चाहिए। – alexfernandez

0

RabbitMQ - upgraded to a new version and got a lot of "PRECONDITION_FAILED unknown delivery tag 1"

देखने के बाद मैं अपने इस तरह देखने के लिए बुनियादी उपभोग बदल निर्दिष्ट किया गया था। डिलीवरी को संदेश वितरण की विधि संरचना से निकाला गया था।

channel.basic_ack(delivery_tag=0) 

का उपयोग करते हुए इस मामले में त्रुटि को दबा भी

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html को देखते हुए यह लगता है जैसे कि यह RabbitMQ में एक मुद्दा हो सकता बनाता है।

26

समस्या शायद जाती है कि आप इस तरह no_ack=True स्थापित कर रहे हैं:

consumer_tag = channel.basic_consume(
    message_delivery_event, 
    no_ack=True, 
    queue=queue, 
) 

और संदेशों तो स्वीकार करते हैं:

channel.basic_ack(delivery_tag=args.delivery_tag) 

यदि आप स्वीकार करने के लिए चाहते हैं या नहीं चुना है और सेट सही उपभोग पैरामीटर।

+0

मेरे कोड के लिए मूल कारण सिंक समस्या और कॉन्फ़िगरेशन समस्या है। मेरे पास rabbitmq उपभोक्ता बनाने के लिए एक साधारण लपेटो है। जब एक अस्थायी कतार का उपभोग करें (channel.queueDeclare ("", झूठा, सत्य, सत्य, तर्क) .getQueue()), अगली डिलीवरी को मल्टी-थ्रेड एनवी में सिंक का उपयोग करके संरक्षित करने की आवश्यकता है। इसका मतलब है, अगर आपको कोई संदेश मिलता है, तो आपको किसी अन्य संदेश का उपभोग करने से पहले इसे पकड़ना होगा। अन्यथा, जब एएक को कॉल करें, तो यह अपवाद फेंक देगा और उपभोग करते समय फेंक अपवाद रखेगा ... – DeepNightTwo

+0

यह वास्तव में मेरी समस्या थी, बहुत बहुत धन्यवाद। – Rob

+1

मुझे एक खराब विलय के बाद यह त्रुटि मिल रही थी जहां एक ही डिलीवरी टैग के साथ एक संदेश दो बार लगाया जा रहा था – blockloop

2

आपके कोड के साथ एक बग है। आप थ्रेड में एक चैनल साझा करते हैं। यह पिका द्वारा समर्थित नहीं है (FAQ देखें)।

  1. basic_get(...) में no_ack=True झंडा परिभाषित करें और धागा के समारोह doWork(...)
  2. में चैनल वस्तु का उपयोग नहीं करते आप संदेश एसीके के बाद ही आप अपने काम समाप्त कर दिया है की जरूरत है, तो मुख्य जाने: आप 2 विकल्प हैं धागा (while True: लूप) संदेश एएसी (और कार्यकर्ता धागा नहीं) को संभालता है। नीचे आपके कोड का एक संशोधित संस्करण है जो ऐसा करता है।

    from __future__ import with_statement 
    import pika 
    import sys 
    from pika.adapters.blocking_connection import BlockingConnection 
    from pika import connection, credentials 
    import time 
    import threading 
    import random 
    from pika.adapters.select_connection import SelectConnection 
    from pika.connection import Connection 
    import traceback 
    from Queue import Queue, Empty 
    
    def doWork(body, args, channel, ack_queue): 
        time.sleep(random.random()) 
        ack_queue.put(args.delivery_tag) 
    
    def doAck(channel): 
        while True: 
         try: 
          r = ack_queue.get_nowait() 
         except Empty: 
          r = None 
         if r is None: 
          break 
         try: 
          channel.basic_ack(delivery_tag=r) 
         except: 
          traceback.print_exc() 
    
    auth = credentials.PlainCredentials(username="guest", password="guest") 
    params = connection.ConnectionParameters(host="localhost", credentials=auth) 
    conn = BlockingConnection(params) 
    channel = conn.channel() 
    # Create a queue for the messages that should be ACKed by main thread 
    ack_queue = Queue() 
    
    while True: 
        time.sleep(0.03)  
        try: 
         doAck(channel) 
         method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
         if method_frame.NAME == 'Basic.GetEmpty': 
          continue   
         t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue]) 
         t.setDaemon(True) 
         t.start() 
        except Exception, e: 
         traceback.print_exc() 
         continue 
    
4

मेरे लिए, यह था सिर्फ इतना है कि मैं कतार मैं ack के लिए नहीं जा रहा था कहा था, तो मैं acked।

उदा। गलत:

channel.basic_consume(callback, queue=queue_name, no_ack=True) 

और फिर मेरी कॉलबैक में:

def callback(ch, method, properties, body): 
    # do stuff 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

सही:

channel.basic_consume(callback, queue=queue_name, no_ack=False) 

निष्कर्ष: आप के लिए मैन्युअल रूप से पावती, no_ack सेट चाहते हैं = असत्य।

डॉक्स से:

no_ack: (bool) अगर यह सच है, स्वत: स्वीकृति मोड पर सेट किया जाएगा (http://www.rabbitmq.com/confirms.html देखें)

+0

धन्यवाद। यह वास्तव में उपयोगी था। समस्या जो मैं देखता हूं वह यह है कि पैरामीटर नाम (no_ack, या nonet में .net) भ्रमित करने वाला है। मुझे लगता है कि इसे सिर्फ "एएके" कहा जाना चाहिए, और यदि आप सच हो जाते हैं तो यह संदेश स्वीकार करेगा। –