2012-07-16 16 views
13

का हिस्सा हैं मेरे पास एक सेलेरी श्रृंखला है जो कुछ कार्य चलाती है। प्रत्येक कार्य विफल हो सकता है और पुनः प्रयास किया जा सकता है। कृपया एक त्वरित उदाहरण के लिए नीचे देखें:सेलेरी को पुनः प्रयास करना विफल रहा कार्य जो चेन

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

और श्रृंखला:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

दो कार्य चल रहा है (और यह सोचते हैं कि कुछ भी नहीं विफल रहता है), आपके हो/मुद्रित देखेंगे:

1 + 2 = 3 
3 * 4 = 12 

हालांकि, जब ऐड कार्य पहली बार विफल रहता है और बाद में पुनः प्रयास कॉल में सफल होता है, तो चेन में शेष कार्य नहीं चलते हैं, यानी कार्य कार्य विफल रहता है, श्रृंखला में अन्य सभी कार्य नहीं चलते हैं और बाद में ईड सेकेंड, ऐड टास्क फिर से चलाता है और सफल होता है और चेन में शेष कार्य (इस मामले में mul.si (3, 4)) नहीं चलता है।

क्या अजवाइन उस कार्य से असफल श्रृंखला जारी रखने का एक तरीका प्रदान करता है जो बाद में विफल रहा? यदि नहीं, तो इसे पूरा करने के लिए सबसे अच्छा तरीका क्या होगा और यह सुनिश्चित कर लें कि एक श्रृंखला के कार्य निर्दिष्ट क्रम में चलते हैं और केवल पिछले कार्य के बाद सफलतापूर्वक निष्पादित किया गया है, भले ही कार्य को कुछ बार पुनः प्रयास किया जाए?

नोट 1: मुद्दा

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

करके हल किया जा सकता है, लेकिन मैं समझ क्यों जंजीरों में विफल रहा कार्यों के साथ काम नहीं करते में दिलचस्पी है।

उत्तर

0

मुझे यह समझने में भी दिलचस्पी है कि चेन असफल कार्यों के साथ क्यों काम नहीं करते हैं।

मैं कुछ अजवाइन कोड खुदाई और क्या मैं अब तक मिल गया है है:

कार्यान्वयन app.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

पर happends आप अंत prepare_stepsprev_task जुड़ा हुआ है पर देख सकते हैं कि अगले कार्य के लिए। जब prev_task विफल हुआ अगला कार्य नहीं कहा जाता है।

मैं अगले करने के लिए पिछला कार्य से link_error को जोड़ने के साथ परीक्षण कर रहा हूँ:

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

लेकिन फिर, अगले काम हो सकता है दोनों ही मामलों (के ख्याल रखना चाहिए, जब यह अपरिवर्तनीय होने के लिए कॉन्फ़िगर किया गया है को छोड़कर, जैसे अधिक तर्क स्वीकार नहीं करते हैं)।

c = chain(t1, (t2, t1e), (t3, t2e))

जिसका अर्थ है:

t1linkt2 करने और link_errort1e

t2t3 को link करने और

मुझे लगता है कि श्रृंखला कुछ वाक्य रचना की अनुमति देकर कि समर्थन कर सकते हैं यह पसंद है link_errort2e को

+0

मैंने एक श्रृंखला-जैसे कार्य का उपयोग करने का निर्णय लिया जो अन्य कार्यों को चलाता है जो अन्यथा एक श्रृंखला में होते हैं, लेकिन दूसरे को शुरू करने से पहले एक कार्य पूरा करने की प्रतीक्षा करते हैं, उदाहरण के लिए: 'task1.delay ([params])। प्राप्त(); । Task2.delay ([पैरामीटर]) मिलता है(); task3.delay ([पैरामीटर])। मिलता है() '। चेन-जैसे कार्य किसी भी कार्य द्वारा उठाए गए अपवादों को पकड़ सकता है और खुद को पुनः प्रयास कर सकता है। – Andrei

+0

तो आपके उदाहरण से, टी 1 ई और टी 2 को टी 2 और क्रमशः टी 3, कॉल करना होगा? – Andrei

+0

उदाहरण के लिए चेन के लिए संभावित वाक्यविन्यास पर सिर्फ मेरे विचार। इसका मतलब है कि प्रत्येक अगला कार्य वास्तव में कार्यों की जोड़ी है, जोड़ी में पहला तत्व कॉल किया जाएगा यदि पिछले चरण में कोई अपवाद/त्रुटि नहीं होती है, और दूसरा तत्व पिछले चरण की विफलता के लिए अपवाद/त्रुटि हैंडलर है। 't1e' मतलब 't1 त्रुटि हैंडलर' – anh

7

आपको एक बग :)

https://github.com/celery/celery/commit/b2b9d922fdaed5571cf685249bdc46f28acacde3 में फिक्स्ड 3.0.4 का हिस्सा होगा मिल गया है।

+0

नमस्ते, यहां तक ​​कि नए संस्करणों के साथ भी यह इस पल तक काम नहीं करता है। मेरे पास वही सटीक समस्या है और मैं कामकाज खोजने की कोशिश कर रहा हूं .. अगर आप कर सकते हैं तो कृपया मदद करें। – securecurve