Saya pikir Anda mengajukan beberapa pertanyaan yang sangat bagus yang menyoroti betapa bermanfaatnya SWF sebagai layanan. Singkatnya, Anda tidak memberi tahu server Anda untuk mengoordinasikan pekerjaan di antara mereka sendiri. Penentu Anda mengatur semua ini untuk Anda, dengan bantuan layanan SWF.
Implementasi alur kerja Anda akan berjalan seperti berikut:
- Mendaftarkan alur kerja Anda dan aktivitasnya dengan layanan (satu kali).
- Implementasikan penentu dan pekerja.
- Biarkan pekerja dan pembuat keputusan Anda berjalan.
- Mulai alur kerja baru.
Ada sejumlah cara untuk memasukkan kredensial ke dalam kode boto.swf. Untuk keperluan latihan ini, saya sarankan untuk mengekspornya ke lingkungan sebelum menjalankan kode di bawah ini:
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1) Untuk mendaftarkan domain, alur kerja, dan aktivitas, lakukan hal berikut:
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) Terapkan dan jalankan penentu dan pekerja.
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
Pekerja jauh lebih sederhana, Anda tidak perlu menggunakan warisan jika Anda tidak mau.
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) Jalankan penentu dan pekerja Anda. Penentu dan pekerja Anda mungkin berjalan dari host yang terpisah, atau dari satu mesin yang sama. Buka empat terminal dan jalankan aktor Anda:
Pertama, penentu Anda
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
Kemudian pekerja A, Anda dapat melakukan ini dari server A:
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
Kemudian pekerja B, mungkin dari server B tetapi jika Anda menjalankan semuanya dari laptop, itu akan berfungsi dengan baik:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) Terakhir, mulai alur kerja.
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
Beralih kembali untuk melihat apa yang terjadi dengan aktor Anda. Mereka mungkin memutuskan sambungan dari layanan setelah satu menit tidak aktif. Jika itu terjadi, tekan panah ke atas+enter untuk masuk kembali ke loop polling.
Anda sekarang dapat membuka panel SWF konsol manajemen AWS, memeriksa bagaimana eksekusi dilakukan dan melihat riwayatnya. Alternatifnya, Anda dapat menanyakannya melalui baris perintah.
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
Itu hanyalah contoh alur kerja dengan eksekusi serial aktivitas, tetapi penentu juga dapat menjadwalkan dan mengoordinasikan eksekusi paralel aktivitas.
Saya harap ini setidaknya akan membantu Anda memulai. Untuk contoh alur kerja serial yang sedikit lebih kompleks, saya sarankan untuk melihat ini.
Saya tidak memiliki kode contoh untuk dibagikan, tetapi Anda pasti dapat menggunakan SWF untuk mengoordinasikan eksekusi skrip di dua server. Gagasan utamanya adalah membuat tiga bagian kode yang berhubungan dengan SWF:
- Komponen yang mengetahui skrip mana yang harus dieksekusi terlebih dahulu dan apa yang harus dilakukan setelah skrip pertama selesai dijalankan. Ini disebut "penentu" dalam istilah SWF.
- Dua komponen yang masing-masing memahami cara mengeksekusi skrip spesifik yang ingin Anda jalankan di setiap mesin. Ini disebut "pekerja aktivitas" dalam istilah SWF.
Komponen pertama, penentu, memanggil dua API SWF:PollForDecisionTask dan RespondDecisionTaskCompleted. Permintaan polling akan memberikan komponen penentu riwayat saat ini dari alur kerja yang sedang dieksekusi, pada dasarnya informasi status "di mana saya" untuk pelari skrip Anda. Anda menulis kode yang melihat kejadian ini dan mencari tahu skrip mana yang harus dijalankan. "Perintah" untuk mengeksekusi skrip ini akan berbentuk penjadwalan tugas aktivitas, yang dikembalikan sebagai bagian dari panggilan ke RespondDecisionTaskCompleted.
Komponen kedua yang Anda tulis, pekerja aktivitas, masing-masing memanggil dua API SWF:PollForActivityTask dan RespondActivityTaskCompleted. Permintaan polling akan memberi pekerja aktivitas indikasi bahwa ia harus menjalankan skrip yang diketahuinya, yang disebut SWF sebagai tugas aktivitas. Informasi yang dikembalikan dari permintaan polling ke SWF dapat menyertakan data khusus eksekusi tunggal yang dikirim ke SWF sebagai bagian dari penjadwalan tugas aktivitas. Setiap server Anda akan melakukan polling SWF secara independen untuk tugas aktivitas guna menunjukkan eksekusi skrip lokal pada host tersebut. Setelah pekerja selesai mengeksekusi skrip, ia memanggil kembali ke SWF melalui API RespondActivityTaskCompleted.
Panggilan balik dari pekerja aktivitas Anda ke SWF menghasilkan riwayat baru yang diserahkan ke komponen penentu yang telah saya sebutkan. Itu akan melihat sejarah, melihat bahwa skrip pertama selesai, dan menjadwalkan yang kedua untuk dieksekusi. Setelah melihat bahwa yang kedua selesai, ia dapat "menutup" alur kerja menggunakan jenis keputusan lain.
Anda memulai seluruh proses eksekusi skrip pada setiap host dengan memanggil API StartWorkflowExecution. Tindakan ini membuat rekaman keseluruhan proses di SWF dan mengeluarkan histori pertama ke proses penentu untuk menjadwalkan eksekusi skrip pertama di host pertama.
Mudah-mudahan ini memberikan sedikit lebih banyak konteks tentang bagaimana menyelesaikan jenis alur kerja ini menggunakan SWF. Jika Anda belum melakukannya, saya akan melihat panduan dev di halaman SWF untuk info tambahan.