2013-02-12 43 views
9

सर्वर के बीच संदेशों को संवाद करने के लिए अमेज़ॅन एसडब्ल्यूएफ का उपयोग करें?अमेज़ॅन एसडब्ल्यूएफ का उपयोग सर्वर के बीच संवाद करने के लिए

  1. सर्वर ऐ पर एक स्क्रिप्ट
  2. जब कि समाप्त हो गया है मैं इसे सफलतापूर्वक पूरा मैं इसे काम साफ़ करना चाहते हैं एक स्क्रिप्ट बी
  3. चलाने के लिए सर्वर बी करने के लिए एक संदेश भेजना चाहते हैं चलाना चाहते हैं वर्कफ़्लो कतार

मुझे यह करने में वास्तव में कठिन समय लगता है कि मैं इसे करने के लिए संयोजन में बोटो और एसडब्ल्यूएफ का उपयोग कैसे कर सकता हूं। मैं कुछ पूर्ण कोड के बाद नहीं हूं लेकिन मेरे बाद क्या है यदि कोई भी शामिल होने के बारे में कुछ और बता सकता है।

  • स्क्रिप्ट ए के पूरा होने की जांच करने के लिए मैं वास्तव में सर्वर बी कैसे कहूं?
  • मैं कैसे सुनिश्चित करूं कि सर्वर ए स्क्रिप्ट ए को पूरा नहीं करेगा और स्क्रिप्ट बी को आजमाएं और चलाएं (क्योंकि सर्वर बी को इसे चलाना चाहिए)?
  • मैं वास्तव में स्क्रिप्ट के एसडब्ल्यूएफ को कैसे पूरा कर सकता हूं? क्या आप एक झंडा है, या संदेश, या क्या?

जैसा कि आप देख सकते हैं कि मैं वास्तव में इस बारे में बहुत उलझन में हूं, अगर कोई इस पर कुछ प्रकाश डाल सकता है तो मैं वास्तव में इसकी सराहना करता हूं।

उत्तर

17

मुझे लगता है कि आप कुछ बहुत अच्छे प्रश्न पूछते हैं जो बताते हैं कि एसडब्ल्यूएफ सेवा के रूप में कितना सहायक हो सकता है। संक्षेप में, आप अपने सर्वर को अपने आप के बीच काम समन्वय करने के लिए नहीं बताते हैं। एसडब्ल्यूएफ सेवा की मदद से आपका निर्णायक यह सब आपके लिए ऑर्केस्ट्रेट करता है।

अपने कार्यप्रवाह के रूप में जाना होगा के कार्यान्वयन इस प्रकार है:

  1. अपने कार्यप्रवाह और सेवा (एक बंद) के साथ अपनी गतिविधियों का पंजीयन।
  2. निर्णायक और श्रमिकों को लागू करें।
  3. अपने कर्मचारियों और निर्णयों को चलाने दें।
  4. नया वर्कफ़्लो प्रारंभ करें।

boto.swf के कोड में प्रमाण पत्र फ़ीड करने के कई तरीके हैं। इस अभ्यास के प्रयोजनों के लिए, मैं नीचे दिए गए कोड को चलाने से पहले पर्यावरण के लिए उन्हें निर्यात करने की सलाह देते:

export AWS_ACCESS_KEY_ID=<your access key> 
export AWS_SECRET_ACCESS_KEY=<your secret key> 

1) डोमेन, कार्यप्रवाह रजिस्टर करने के लिए और गतिविधियों निम्नलिखित निष्पादित करें:

# ab_setup.py 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

swf.Domain(name=DOMAIN).register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register() 
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register() 

2) को लागू करें और निर्णायक और श्रमिकों को चलाएं।

# ab_decider.py 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class ABDecider(swf.Decider): 

    domain = DOMAIN 
    task_list = 'default_tasks' 
    version = VERSION 

    def run(self): 
     history = self.poll() 
     # Print history to familiarize yourself with its format. 
     print history 
     if 'events' in history: 
      # Get a list of non-decision events to see what event came in last. 
      workflow_events = [e for e in history['events'] 
           if not e['eventType'].startswith('Decision')] 
      decisions = swf.Layer1Decisions() 
      # Record latest non-decision event. 
      last_event = workflow_events[-1] 
      last_event_type = last_event['eventType'] 
      if last_event_type == 'WorkflowExecutionStarted': 
       # At the start, get the worker to fetch the first assignment. 
       decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()), 
        ACTIVITY1, VERSION, task_list='a_tasks') 
      elif last_event_type == 'ActivityTaskCompleted': 
       # Take decision based on the name of activity that has just completed. 
       # 1) Get activity's event id. 
       last_event_attrs = last_event['activityTaskCompletedEventAttributes'] 
       completed_activity_id = last_event_attrs['scheduledEventId'] - 1 
       # 2) Extract its name. 
       activity_data = history['events'][completed_activity_id] 
       activity_attrs = activity_data['activityTaskScheduledEventAttributes'] 
       activity_name = activity_attrs['activityType']['name'] 
       # 3) Optionally, get the result from the activity. 
       result = last_event['activityTaskCompletedEventAttributes'].get('result') 

       # Take the decision. 
       if activity_name == ACTIVITY1: 
        # Completed ACTIVITY1 just came in. Kick off ACTIVITY2. 
        decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()), 
         ACTIVITY2, VERSION, task_list='b_tasks', input=result) 
       elif activity_name == ACTIVITY2: 
        # Server B completed activity. We're done. 
        decisions.complete_workflow_execution() 

      self.complete(decisions=decisions) 
      return True 

श्रमिक बहुत आसान हैं, यदि आप नहीं चाहते हैं तो आपको विरासत का उपयोग करने की आवश्यकता नहीं है।

# ab_worker.py 
import os 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class MyBaseWorker(swf.ActivityWorker): 

    domain = DOMAIN 
    version = VERSION 
    task_list = None 

    def run(self): 
     activity_task = self.poll() 
     print activity_task 
     if 'activityId' in activity_task: 
      # Get input. 
      # Get the method for the requested activity. 
      try: 
       self.activity(activity_task.get('input')) 
      except Exception, error: 
       self.fail(reason=str(error)) 
       raise error 

      return True 

    def activity(self, activity_input): 
     raise NotImplementedError 

class WorkerA(MyBaseWorker): 
    task_list = 'a_tasks' 

    def activity(self, activity_input): 
     result = str(time.time()) 
     print 'worker a reporting time: %s' % result 
     self.complete(result=result) 

class WorkerB(MyBaseWorker): 
    task_list = 'b_tasks' 

    def activity(self, activity_input): 
     result = str(os.getpid()) 
     print 'worker b returning pid: %s' % result 
     self.complete(result=result) 

3) अपने निर्णायक और श्रमिकों को चलाएं। आपका निर्णायक और कर्मचारी अलग-अलग मेजबानों से या एक और एक ही मशीन से चल रहे हैं। ओपन चार टर्मिनलों और चलाने के अपने अभिनेताओं:

सबसे पहले अपने निर्णायक

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass 
... 

तो कार्यकर्ता एक, तो आप इस सर्वर एक से कर सकता है: संभवतः सर्वर बी से

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass 

तो कार्यकर्ता बी, लेकिन यदि आप उन्हें लैपटॉप से ​​चलाते हैं तो यह भी काम करेगा:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass 
... 

4) अंत में, लात मारो वर्कफ़्लो

$ python 
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import boto.swf.layer2 as swf 
>>> workflows = swf.Domain(name='stackoverflow').workflows() 
>>> workflows 
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>] 
>>> execution = workflows[0].start(task_list='default_tasks') 
>>> 

अपने कलाकारों के साथ क्या होता है यह देखने के लिए वापस स्विच करें। निष्क्रियता के एक मिनट के बाद वे सेवा से डिस्कनेक्ट हो सकते हैं। यदि ऐसा होता है, तो मतदान लूप को फिर से दर्ज करने के लिए तीर-अप + दर्ज करें दबाएं।

अब आप अपने एडब्ल्यूएस प्रबंधन कंसोल के एसडब्ल्यूएफ पैनल पर जा सकते हैं, जांच कर सकते हैं कि निष्पादन कैसे कर रहे हैं और उनका इतिहास देखें। वैकल्पिक रूप से, आप कमांड लाइन के माध्यम से इसके लिए पूछ सकते हैं।

>>> execution.history() 
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ... 

बस गतिविधियों के धारावाहिक निष्पादन के साथ एक कार्यप्रवाह का एक उदाहरण है, लेकिन यह schedule and coordinate parallel execution of activities को निर्णायक के लिए भी संभव है कि।

मुझे आशा है कि यह कम से कम आपको शुरू कर देगा। सीरियल वर्कफ़्लो के थोड़ा और जटिल उदाहरण के लिए, मैं looking at this की अनुशंसा करता हूं।

+0

धन्यवाद की आवश्यकता नहीं है आप इसके लिए बहुत कुछ है, यह वास्तव में एक व्यापक जवाब है। – Jimmy

+0

बहुत बहुत धन्यवाद! – Vor

+0

@ ओज़ी- महान जवाब, भयानक वर्ग। बोटो डॉक्स में होने की आवश्यकता है- – Yarin

1

आप एसएनएस, उपयोग कर सकते हैं स्क्रिप्ट एक पूरा हो गया है जब यह एसएनएस को गति प्रदान करना चाहिए, और है कि मैं साझा करने के लिए किसी भी उदाहरण कोड नहीं है सर्वर बी

+0

एसएनएस सुविधाओं मैं दुर्भाग्य से – Jimmy

5

को एक अधिसूचना ट्रिगर होंगे, लेकिन आप निश्चित रूप से SWF उपयोग कर सकते हैं दो सर्वरों में स्क्रिप्ट के निष्पादन को समन्वयित करने के लिए। इसके साथ मुख्य विचार कोड के तीन टुकड़े बनाना है जो एसडब्ल्यूएफ से बात करते हैं:

  • एक घटक जो जानता है कि कौन सी स्क्रिप्ट पहले निष्पादित की जाती है और पहली बार स्क्रिप्ट निष्पादित करने के बाद क्या करना है। इसे एसडब्ल्यूएफ शर्तों में "निर्णायक" कहा जाता है।
  • दो घटक जो प्रत्येक समझते हैं कि प्रत्येक मशीन पर चलने वाली विशिष्ट स्क्रिप्ट को कैसे निष्पादित करना है। इन्हें एसडब्ल्यूएफ शर्तों में "गतिविधि श्रमिक" कहा जाता है।

पहला घटक, निर्णायक, दो एसडब्ल्यूएफ एपीआई कॉल करता है: PollForDecisionTask और RespondDecisionTaskCompleted। सर्वेक्षण अनुरोध निर्णायक घटक को निष्पादन वर्कफ़्लो का वर्तमान इतिहास देगा, मूल रूप से "स्क्रिप्ट मैं कहां हूं" आपकी स्क्रिप्ट धावक के लिए राज्य जानकारी। आप कोड लिखते हैं जो इन घटनाओं को देखता है और यह पता लगाता है कि कौन सी स्क्रिप्ट निष्पादित की जानी चाहिए। एक स्क्रिप्ट निष्पादित करने के लिए ये "आदेश" एक गतिविधि कार्य के शेड्यूलिंग के रूप में होंगे, जिसे RespondDecisionTaskCompleted को कॉल के हिस्से के रूप में वापस किया जाता है।

आपके द्वारा लिखे गए दूसरे घटक, गतिविधि कार्यकर्ता, प्रत्येक दो एसडब्ल्यूएफ एपीआई कॉल करते हैं: PollForActivityTask और RespondActivityTaskCompleted। सर्वेक्षण अनुरोध गतिविधि कार्यकर्ता को एक संकेत देगा कि इसे उस स्क्रिप्ट को निष्पादित करना चाहिए जो इसके बारे में जानता है, एसडब्ल्यूएफ एक गतिविधि कार्य कहता है। चुनाव अनुरोध से एसडब्ल्यूएफ में लौटाई गई जानकारी में एकल निष्पादन-विशिष्ट डेटा शामिल हो सकता है जिसे गतिविधि कार्य के शेड्यूलिंग के हिस्से के रूप में एसडब्ल्यूएफ को भेजा गया था। आपके प्रत्येक सर्वर स्वतंत्र रूप से उस होस्ट पर स्थानीय स्क्रिप्ट के निष्पादन को इंगित करने के लिए गतिविधि कार्यों के लिए एसडब्ल्यूएफ मतदान करेगा। एक बार कर्मचारी को स्क्रिप्ट निष्पादित करने के बाद, यह RespondActivityTaskCompleted API के माध्यम से SWF पर वापस कॉल करता है।

आपके गतिविधि कार्यकर्ता से एसडब्ल्यूएफ के कॉलबैक के परिणामस्वरूप मैंने पहले से उल्लेख किए गए निर्णायक घटक को एक नया इतिहास दिया जा रहा है। यह इतिहास को देखेगा, देखें कि पहली लिपि पूरी की गई है, और दूसरे को निष्पादित करने के लिए शेड्यूल करें। एक बार यह देखता है कि दूसरा एक किया जाता है, तो यह किसी अन्य प्रकार के निर्णय का उपयोग करके वर्कफ़्लो को "बंद" कर सकता है।

आप StartWorkflowExecution API को कॉल करके प्रत्येक होस्ट पर स्क्रिप्ट निष्पादित करने की पूरी प्रक्रिया को बंद कर देते हैं। यह एसडब्ल्यूएफ में समग्र प्रक्रिया का रिकॉर्ड बनाता है और पहले होस्ट पर पहली लिपि के निष्पादन को निर्धारित करने के लिए निर्णायक प्रक्रिया के पहले इतिहास को बाहर निकाल देता है।

उम्मीद है कि यह एसडब्ल्यूएफ का उपयोग करके इस प्रकार के वर्कफ़्लो को पूरा करने के तरीके पर थोड़ा और संदर्भ देता है। यदि आप पहले से नहीं हैं, तो मैं अतिरिक्त जानकारी के लिए एसडब्ल्यूएफ पृष्ठ पर देव मार्गदर्शिका पर एक नज़र डालेगा।

1

अच्छा उदाहरण

अलावा, अगर आप न पर्यावरण के लिए अपने क्रेडेंशियल्स निर्यात करना चाहते हैं, आप अपनी कक्षाओं के अंदर कॉल कर सकते हैं:

swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)