मुझे लगता है कि आप कुछ बहुत अच्छे प्रश्न पूछते हैं जो बताते हैं कि एसडब्ल्यूएफ सेवा के रूप में कितना सहायक हो सकता है। संक्षेप में, आप अपने सर्वर को अपने आप के बीच काम समन्वय करने के लिए नहीं बताते हैं। एसडब्ल्यूएफ सेवा की मदद से आपका निर्णायक यह सब आपके लिए ऑर्केस्ट्रेट करता है।
अपने कार्यप्रवाह के रूप में जाना होगा के कार्यान्वयन इस प्रकार है:
- अपने कार्यप्रवाह और सेवा (एक बंद) के साथ अपनी गतिविधियों का पंजीयन।
- निर्णायक और श्रमिकों को लागू करें।
- अपने कर्मचारियों और निर्णयों को चलाने दें।
- नया वर्कफ़्लो प्रारंभ करें।
boto.swf के कोड में प्रमाण पत्र फ़ीड करने के कई तरीके हैं। इस अभ्यास के प्रयोजनों के लिए, मैं नीचे दिए गए कोड को चलाने से पहले पर्यावरण के लिए उन्हें निर्यात करने की सलाह देते:
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1) डोमेन, कार्यप्रवाह रजिस्टर करने के लिए और गतिविधियों निम्नलिखित निष्पादित करें:
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) को लागू करें और निर्णायक और श्रमिकों को चलाएं।
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
श्रमिक बहुत आसान हैं, यदि आप नहीं चाहते हैं तो आपको विरासत का उपयोग करने की आवश्यकता नहीं है।
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) अपने निर्णायक और श्रमिकों को चलाएं। आपका निर्णायक और कर्मचारी अलग-अलग मेजबानों से या एक और एक ही मशीन से चल रहे हैं। ओपन चार टर्मिनलों और चलाने के अपने अभिनेताओं:
सबसे पहले अपने निर्णायक
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
तो कार्यकर्ता एक, तो आप इस सर्वर एक से कर सकता है: संभवतः सर्वर बी से
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
तो कार्यकर्ता बी, लेकिन यदि आप उन्हें लैपटॉप से चलाते हैं तो यह भी काम करेगा:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) अंत में, लात मारो वर्कफ़्लो
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
अपने कलाकारों के साथ क्या होता है यह देखने के लिए वापस स्विच करें। निष्क्रियता के एक मिनट के बाद वे सेवा से डिस्कनेक्ट हो सकते हैं। यदि ऐसा होता है, तो मतदान लूप को फिर से दर्ज करने के लिए तीर-अप + दर्ज करें दबाएं।
अब आप अपने एडब्ल्यूएस प्रबंधन कंसोल के एसडब्ल्यूएफ पैनल पर जा सकते हैं, जांच कर सकते हैं कि निष्पादन कैसे कर रहे हैं और उनका इतिहास देखें। वैकल्पिक रूप से, आप कमांड लाइन के माध्यम से इसके लिए पूछ सकते हैं।
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
बस गतिविधियों के धारावाहिक निष्पादन के साथ एक कार्यप्रवाह का एक उदाहरण है, लेकिन यह schedule and coordinate parallel execution of activities को निर्णायक के लिए भी संभव है कि।
मुझे आशा है कि यह कम से कम आपको शुरू कर देगा। सीरियल वर्कफ़्लो के थोड़ा और जटिल उदाहरण के लिए, मैं looking at this की अनुशंसा करता हूं।
धन्यवाद की आवश्यकता नहीं है आप इसके लिए बहुत कुछ है, यह वास्तव में एक व्यापक जवाब है। – Jimmy
बहुत बहुत धन्यवाद! – Vor
@ ओज़ी- महान जवाब, भयानक वर्ग। बोटो डॉक्स में होने की आवश्यकता है- – Yarin