Created
July 2, 2015 21:44
-
-
Save mikepii/b530736a83b8d6de420e to your computer and use it in GitHub Desktop.
luigi-swf WaitForExternalTasks base class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WaitForExternalTasks(luigi.Task, SwfHeartbeatCancel): | |
"""Waits for a task in another workflow to complete. | |
Polls for the `complete` method on :meth:`wait_for`'s result | |
to return `True`. | |
Example of common parameters to override:: | |
timeout = 10 * hours | |
check_interval = 1 * minutes | |
Parameters to override: | |
* `timeout`: After how long to raise an exception in seconds. | |
Note that the total time this might wait is actually | |
(`swf_retries` + 1) * `timeout`. (default 21 min) | |
* `check_interval`: Interval between checks in seconds. (default 20s) | |
* `swf_heartbeat_check_factor`: Used to calculate the SWF heartbeat | |
timeout in seconds = `check_interval` * `swf_heartbeat_check_factor` + 60 | |
* `swf_start_to_close_timeout_buffer`: Used to calculate | |
`swf_start_to_close_timeout` = `timeout` + | |
`swf_start_to_close_timeout_buffer` | |
* `swf_retries`: Works as is usual in `luigi-swf`. (default 2) | |
""" | |
timeout = 21 * minutes | |
check_interval = 20 * seconds | |
swf_heartbeat_check_factor = 3 | |
swf_start_to_close_timeout_buffer = 30 * seconds | |
swf_retries = 2 | |
@property | |
def swf_start_to_close_timeout(self): | |
return self.timeout + self.swf_start_to_close_timeout_buffer | |
@property | |
def swf_task_list(self): | |
for task in self._wait_for_flat(): | |
if hasattr(task, 'swf_task_list'): | |
return task.swf_task_list | |
return None | |
@property | |
def swf_heartbeat_timeout(self): | |
return (self.check_interval * self.swf_heartbeat_check_factor + | |
2 * minutes) | |
def complete(self): | |
for task in self._wait_for_flat(): | |
if not task.complete(): | |
return False | |
return True | |
def wait_for(self): | |
"""Please override this and return a task or list of tasks.""" | |
raise NotImplementedError | |
def _wait_for_flat(self): | |
wait_for = self.wait_for() | |
assert not isinstance(wait_for, string_types) | |
if isinstance(wait_for, collections.Iterable): | |
return wait_for | |
else: | |
return [wait_for] | |
def run(self): | |
start_t = datetime.datetime.utcnow() | |
elapsed_t = 0 * seconds | |
hb_i = 0 | |
sleep(random.uniform(0, 2)) | |
while elapsed_t <= self.timeout: | |
if hb_i == 0: | |
self.heartbeat() | |
hb_i = (hb_i + 1) % self.swf_heartbeat_check_factor | |
if self.cancel_requested: | |
self.ack_cancel() | |
return | |
if self.complete(): | |
return | |
sleep(self.check_interval + random.uniform(-3, 3)) | |
elapsed_t = (datetime.datetime.utcnow() - start_t).total_seconds() | |
raise RuntimeError("Timeout exceeded") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment