वर्तमान में सेलेरी श्रमिकों को कस्टम उपभोक्ताओं को जोड़ना मुश्किल है, लेकिन यह विकास संस्करण में बदल रहा है (3.1 बनने के लिए) जहां मैंने उपभोक्ता बूट-चरणों के लिए समर्थन जोड़ा है।
कोई प्रलेखन अभी तक के रूप में मैं सिर्फ यह लागू करने के बाद लिया है, लेकिन यहाँ एक उदाहरण है:
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body,))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
ध्यान दें कि एपीआई, एक बात है कि मैं अभी तक यकीन नहीं है अंतिम संस्करण में बदल सकते हैं इस बारे में get_consumer(connection)
के बाद चैनलों को कैसे संभाला जाता है। कनेक्शन खो जाने पर उपभोक्ता का चैनल बंद हो जाता है, और शट डाउन पर, लेकिन लोग मैन्युअल रूप से चैनलों को संभालना चाहते हैं। उस स्थिति में उपभोक्ता चरण को अनुकूलित करने या एक नया StartStopStep लिखने की संभावना हमेशा होती है।
प्रलेखन अब http://celery.readthedocs.org/en/latest/userguide/extending.html पर पाया जा सकता है –