2012-03-20 24 views
5

के साथ टॉपिक एक्सचेंज मैं थोड़ा उलझन में हूं कि मेरी कॉन्फ़िगरेशन एक विषय विनिमय स्थापित करने के लिए कैसा दिखना चाहिए।सेलरी और खरगोश एमक्यू

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

तो:

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

यह मैं हासिल करना चाहते हैं क्या है

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

मैं केवल Task1 QueueTwo से उपभोग करने के लिए QueueOne और Task2 से उपभोग करने के लिए चाहते हैं।

अब समस्या यह है कि जब कार्य 1 और 2 रन, तो वे क्यूईफ़ीरहाउस भी निकाल देते हैं, और टास्कफायरहोज़ कार्य कभी निष्पादित नहीं होता है।

क्या मेरी कॉन्फ़िगरेशन में कुछ गड़बड़ है, या क्या मैं कुछ गलत समझ रहा हूं?

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

शायद यह स्पष्ट करने के लिए शब्दावली है, लेकिन आपका विवरण लगता है जैसे आप कार्य और श्रमिकों को भंग कर रहे हैं। उदाहरण के लिए, आप कहते हैं "टास्क 2 क्यूई 2 को भेजा गया" फिर बाद में "टास्क 2 क्यूई 2 से उपभोग करने के लिए" कहता है। कार्य उपभोग नहीं करते हैं; वे उपभोग कर रहे हैं (श्रमिकों द्वारा)। आप यह भी कहते हैं कि "टास्कफायरहोज़ कार्य कभी निष्पादित नहीं होता" लेकिन आपके विवरण में, किसी भी कतार में कोई कार्यफाइल नहीं भेजा जा रहा है। मूल अवधारणा है: कार्य कतारों को भेजे जाते हैं; और श्रमिकों को कतारों से कार्य निष्पादित किया जाता है जिन्हें वे सौंपा जाता है। कार्य! = मजदूर जो उन्हें निष्पादित करते हैं। –

उत्तर

0

यह मानते हुए कि आप वास्तव में कुछ इस तरह का मतलब:

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

तो:

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

यह ठीक नहीं हो सकता है कि तुम क्या मतलब है, लेकिन मुझे लगता है कि यह कई परिदृश्यों को शामिल करना चाहिए और उम्मीद है तुम्हारा भी। कुछ इस तरह काम करना चाहिए:

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

अधिक विकल्प और संदर्भ के लिए: http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

इस दस्तावेज़ से सीधे था।

मुझे भी इसी तरह की स्थिति थी, और मैंने इसे थोड़ा अलग तरीके से निपटाया। मैं पर्यवेक्षक के साथ अजवाइन बहु का उपयोग नहीं कर सका। तो इसके बजाय मैंने प्रत्येक कार्यकर्ता के लिए पर्यवेक्षक में एकाधिक कार्यक्रम बनाया। वैसे भी मजदूर अलग-अलग प्रक्रियाओं पर होंगे, इसलिए पर्यवेक्षक को आपके लिए सब कुछ का ख्याल रखना चाहिए।

; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

इसी तरह, Worker2 और WorkerFirehose के लिए, बनाने के लिए इसी तर्ज संपादित:

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

और

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

उन सब supervisord में शामिल हैं -: कॉन्फ़िग फ़ाइल तरह दिखता है .conf फ़ाइल और इसे करना चाहिए।