- treasure-data/digdag#701 このバグを自分で報告したので自分で直せるかなと思ってdigdagのretry機構を調べようした
- そしたら、そもそもdigdagの全体像が全く分からないことに気付いた
- テストコードを適当に見ていったらWorkflowExecutorTest.retryOnGroupingTask()のテストケースが、実際にdigファイルを渡してworkflowを実行するテストケースだったので、コレを使ってどのようにdigdagがworkflowを実行しているのかをまず調べることにした <- イマココ
- 全てのoperatorはBaseOperatorのrun methodから実行されるrunTask methodにそのOperatorの処理本体が実装されている
- この構成はプラグインの参考実装としてリポジトリに含まれているExampleOperatorの実装を読むと分かる
- 雑にgrepかけたら、実際のretryに関する制御っぽい部分がBaseOperator.run()にあった。
@Override
public TaskResult run()
{
RetryControl retry = RetryControl.prepare(request.getConfig(), request.getLastStateParams(), false);
try {
try {
return runTask();
}
finally {
workspace.close();
}
}
catch (RuntimeException ex) {
// Propagate polling TaskExecutionException instances
if (ex instanceof TaskExecutionException) {
TaskExecutionException tex = (TaskExecutionException) ex;
boolean isPolling = !tex.isError();
if (isPolling) {
// TODO: reset retry state params
throw tex;
}
}
boolean doRetry = retry.evaluate();
if (doRetry) {
throw TaskExecutionException.ofNextPollingWithCause(ex,
retry.getNextRetryInterval(),
ConfigElement.copyOf(retry.getNextRetryStateParams()));
}
else {
throw ex;
}
}
}
- 至ってシンプルである
runTask
実行時になんらかの例外が投げられた場合、それがTaskExecutionException
クラスの例外でなければ(この辺の処理はあとでしらべる)RetryControl クラスのオブジェクトからevaluate()
メソッドの返り値のbooleanを見て、trueならなんかそれっぽい例外クラスをthrowしているTaskExecutionException
クラスの例外であってもその例外オブジェクトのisError()
がfalseならばそれを上にもっかいthrowする。どうやらこれはpollingのためにretry残数に関係なくretryさせるための機構のようだ。- ところでこの辺がnestされたtaskのretry挙動に関係ありそうだな。。
- TaskExecutionExceptionをcatchしている箇所をgrepしたらOperatorManagerのこの辺 がなんか怪しい
catch (TaskExecutionException ex) {
if (ex.getRetryInterval().isPresent()) {
if (!ex.getError(cf).isPresent()) {
logger.debug("Retrying task {}", ex.toString());
}
else {
logger.error("Task failed, retrying", ex);
}
callback.retryTask(request.getSiteId(),
request.getTaskId(), request.getLockId(), agentId,
ex.getRetryInterval().get(), ex.getStateParams(cf).get(),
ex.getError(cf));
}
else {
logger.error("Task {} failed.\n{}", request.getTaskName(), formatExceptionMessage(ex));
logger.debug("", ex);
// TODO use debug to log stacktrace here
callback.taskFailed(request.getSiteId(),
request.getTaskId(), request.getLockId(), agentId,
ex.getError(cf).get()); // TODO is error set?
}
}
ex.getRetryInterval().isPresent()
がtrueならばretryされるように読める- ということはretryしない時は
ex.getRetryInterval()
がNoneになる? - ところで、しれっと
isPresent()
とかOf()
とか、scalaのOptionalクラスみたいなことしてんなと思ったら guava というgoogleが提供するjavaの便利libraryに入っているらしい - これがfalseになる条件ってなんだろう?と思ったら
TaskExecutionException
のprivateなコンストラクタにretryIntervalにOptional.absent()を代入するやつが存在する - コレを使って
TaskExecutionException
のinstanceを作ってthrowする時がretryが終わる時なのだろうか? - ちなみにこのコンストラクタはpublicなコンストラクタの一つである これから実行されている
- このpublicなコンストラクタは例えばこのようなところで使われていることから、やはりretryを抜けるのに使われていると思われる
catch (Exception e) {
String formattedErrorMessage = String.format(errorMessage, errorMessageParameters);
if (!retry(e)) {
logger.warn("{}: giving up", formattedErrorMessage, e);
throw new TaskExecutionException(e);
}
int retryIteration = retryState.params().get(RETRY, int.class, 0);
retryState.params().set(RETRY, retryIteration + 1);
int interval = (int) Math.min(retryInterval.min().getSeconds() * Math.pow(2, retryIteration), retryInterval.max().getSeconds());
logger.warn("{}: retrying in {} seconds", formattedErrorMessage, interval, e);
throw state.pollingTaskExecutionException(interval);
}
- というわけでWorkflowExecutorTestのretryOnGroupingTask() から読んでいく
- このテストケースでは以下のworkflowを実際に実行して結果をassertしている
+first:
echo>: ""
append_file: out
+doit:
_retry: 3
+task1:
echo>: "try"
append_file: out
+task2:
fail>: task failed expectedly
- というわけでWorkflowExecutorTestのretryOnGroupingTask() から読んでいく
- 追ってくと分かるのだが、いきなりTransactionManagerとか出てきてビビるのだが、digdagもDBをjob storeとしたJob Queueのようなアーキテクチャになってるっぽいことが分かる
- で、jobをDBに登録してattemptIdを生成してWorkflowExecuterのrunUntilDone(long attemptId) に渡されて、ここでなんか終わるまで監視してるっぽい
- ここにこそ秘密があるに違いないということでここを頑張って読んでいく
- と思ったがやっていることはWorkflowからtaskを生成してEnqueueするところまでっぽいかった
- 少なくともOperatorManager.callExecutorでoperator.run()が呼ばれているのでここにブレークポイント貼ってtestをdebug実行してみる
- stack traceを調べたら、main threadは一旦ここで止まった
- ブレークポイントから辿っていくとMultiThreadAgent.run()が別Threadで動作している
@Override
public void run()
{
while (!stop) {
try {
synchronized (addActiveTaskLock) {
if (executor.isShutdown()) {
break;
}
// Because addActiveTaskLock is locked, no one increases activeTaskCount in this synchronized block. Now get the maximum count.
int maximumActiveTasks = activeTaskCount.get();
// Because the maximum count doesn't increase, here can know that at least N number of threads are idling.
int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks;
// Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately.
int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
if (maxAcquire > 0) {
transactionManager.begin(() -> {
List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000);
for (TaskRequest req : reqs) {
executor.submit(() -> {
try {
runner.run(req);
}
catch (Throwable t) {
logger.error("Uncaught exception. Task queue will detect this failure and this task will be retried later.", t);
errorReporter.reportUncaughtError(t);
}
finally {
activeTaskCount.decrementAndGet();
}
});
activeTaskCount.incrementAndGet();
}
return null;
});
}
else {
// no executor thread is available. sleep for a while until a task execution finishes
addActiveTaskLock.wait(500);
}
}
}
catch (Throwable t) {
logger.error("Uncaught exception during acquiring tasks from a server. Ignoring. Agent thread will be retried.", t);
errorReporter.reportUncaughtError(t);
try {
// sleep before retrying
Thread.sleep(1000);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
- ここで
runner
オブジェクトはOperatorManager
インスタンスである - request数分だけ
run(TaskRequest request)
->runWithHeartbeat(request)
->workspaceManager.withExtractedArchive(request)
->runWithWorkspace(projectPath, request)
runWithWorkspace(projectPath, request)
の中でcallExecutor(projectPath, type, mergedRequest)
が実行される
TaskRequest mergedRequest = TaskRequest.builder()
.from(request)
.localConfig(new CheckedConfig(localConfig, usedKeys))
.config(new CheckedConfig(config, usedKeys))
.build();
TaskResult result = callExecutor(projectPath, type, mergedRequest);
- callExecutorの最後のほうで以下のようにoperatorに引数渡して実行して結果を返してるっぽい
OperatorContext context = new DefaultOperatorContext(
projectPath, mergedRequest, secretProvider, privilegedVariables);
Operator operator = factory.newOperator(context);
return operator.run();
- さらに調べてようやくどこで
MultiThreadAgent
がrun()
されているか特定できた - https://github.com/treasure-data/digdag/blob/97986007ad8aae2c35c4cf450b07867c9a8eb219/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java#L150 ここで org.embulk.guice.Bootstrap.initializeCloseable() が実行されるが、その後なんやかんやあって Guice.createInjector で、DIコンテナに含めるインスタンスを生成してそうなメソッドがある
- この中でLocalAgentManagerの中に
@PostConstruct
アノテーションがついてるメソッドがあり、この中でThread.runしてたんだよ!!!!
@Inject
public LocalAgentManager(
AgentConfig config,
AgentId agentId,
TaskServerApi taskServer,
OperatorManager operatorManager,
TransactionManager transactionManager)
{
if (config.getEnabled()) {
this.agentFactory =
() -> new MultiThreadAgent(config, agentId, taskServer, operatorManager, transactionManager, errorReporter);
}
else {
this.agentFactory = null;
}
}
@PostConstruct
public synchronized void start()
{
if (agentFactory != null && thread == null) {
agent = agentFactory.get();
Thread thread = new ThreadFactoryBuilder()
.setDaemon(false) // tasks taken from the queue should be certainly processed or callbacked to the server
.setNameFormat("local-agent-%d")
.build()
.newThread(agent);
thread.start();
this.thread = thread;
}
}
- enqueue時にretry分のtaskを個別に追加して実行しているだけ
- だとするとgroup taskのretryの挙動がおかしいのはこの時に追加されたretry用のtaskがおかしい?
- taskをretryのたびに全部取ってきて標準出力に吐かせてしらべたら、retry用に複製されたtaskにはupstreamsがセットされていない。このせいでErrorが起きても止まらない。。のか?
- ⇡のsequense図で
WorkflowExecutor
のrunWhile()
の中でenqueueReadyTasks()
-> taskIdでぐるぐる ->enqueueTask(dispatcher, taskId)
の中に意味深な箇所がある
if (task.getTaskType().isGroupingOnly()) {
return retryGroupingTask(lockedTask);
}
- debuggerで追うと、
+retry_on_group+doit
のtaskだけがこのifでtrueになる。どうやらこれはtaskの直下にtaskの定義しかない場合はisGroupingOnly()
がtrue評価される模様 retryGroupingTask
は以下のように定義されている
private boolean retryGroupingTask(TaskControl lockedTask)
{
// rest task state of subtasks
StoredTask task = lockedTask.get();
TaskTree tree = new TaskTree(sm.getTaskRelations(task.getAttemptId()));
List<Long> childrenIdList = tree.getRecursiveChildrenIdList(task.getId());
lockedTask.copyInitialTasksForRetry(childrenIdList);
lockedTask.setGroupRetryReadyToPlanned();
return true;
}
+retry_on_group+doit
taskの中で定義されている子taskのidをリストで取り出して、それをcopyしているlockedTask.copyInitialTasksForRetry(childrenIdList);
は以下の通り
@Override
public boolean copyInitialTasksForRetry(List<Long> recursiveChildrenIdList)
{
List<StoredTask> tasks = handle.createQuery(
selectTaskDetailsQuery() + " where t.id " + inLargeIdListExpression(recursiveChildrenIdList) +
" and " + bitAnd("t.state_flags", Integer.toString(TaskStateFlags.``)) + " != 0" // only initial tasks
)
.map(stm)
.list();
if (tasks.isEmpty()) {
return false;
}
for (StoredTask task : tasks) {
Task newTask = Task.taskBuilder()
.from(task)
.state(TaskStateCode.BLOCKED)
.stateFlags(TaskStateFlags.empty())
.build();
addSubtask(tasks.get(0).getAttemptId(), newTask);
}
return true;
}
-
文字通りtaskをcopyして新規taskとして追加している(DBにinsertしている)
-
なお、retryの度に
recursiveChildrenIdList
は当然増えていくのだが、複製元のtaskはstate_flags
がINITIAL_TASK
とのANDを取ってtrueになるものだけに絞り込んでいるので、1回目に複製されたtaskだけがretryの度に複製される -
試しに毎回の
retryGroupingTask()
が実行される度にその時のtask一覧を全部取ってきて見てみる
+ public List<StoredTask> getAllTasks(){
+ return handle.createQuery(selectTaskDetailsQuery()).map(stm).list();
+ }
+ System.out.println("");
+ lockedTask.getAllTasks().forEach(System.out::println);
StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:36.514Z, stateParams={"retry_count":1}, retryCount=1, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:36.514Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:37.916Z, stateParams={"retry_count":2}, retryCount=2, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.888Z, startedAt=2018-01-01T12:29:36.654Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.834Z, startedAt=2018-01-01T12:29:36.659Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.860Z, startedAt=2018-01-01T12:29:36.664Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=10, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=11, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=12, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.916Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=1, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, fullName=+retry_on_group, config={}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=2, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:34.665Z, startedAt=2018-01-01T12:29:30.740Z, stateParams={}, retryCount=0, parentId=1, fullName=+retry_on_group+first, config={"echo>":"","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=3, attemptId=1, upstreams=[2], updatedAt=2018-01-01T12:29:38.975Z, stateParams={"retry_count":3}, retryCount=3, parentId=1, fullName=+retry_on_group+doit, config={"_retry":3}, taskType=TaskType{GROUPING_ONLY}, state=planned, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=4, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:35.807Z, startedAt=2018-01-01T12:29:34.824Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=5, attemptId=1, upstreams=[4], updatedAt=2018-01-01T12:29:36.469Z, startedAt=2018-01-01T12:29:35.821Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=6, attemptId=1, upstreams=[5], updatedAt=2018-01-01T12:29:30.197Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{INITIAL_TASK}}
StoredTask{id=7, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.888Z, startedAt=2018-01-01T12:29:36.654Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=8, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.834Z, startedAt=2018-01-01T12:29:36.659Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=9, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:37.860Z, startedAt=2018-01-01T12:29:36.664Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=10, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.952Z, startedAt=2018-01-01T12:29:38.045Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=11, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.946Z, startedAt=2018-01-01T12:29:38.049Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=error, stateFlags=TaskStateFlags{}}
StoredTask{id=12, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.922Z, startedAt=2018-01-01T12:29:38.054Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=success, stateFlags=TaskStateFlags{}}
StoredTask{id=13, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task1, config={"echo>":"try","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=14, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task2, config={"fail>":"task failed expectedly"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
StoredTask{id=15, attemptId=1, upstreams=[], updatedAt=2018-01-01T12:29:38.975Z, stateParams={}, retryCount=0, parentId=3, fullName=+retry_on_group+doit+task3, config={"echo>":"task3","append_file":"out"}, taskType=TaskType{}, state=blocked, stateFlags=TaskStateFlags{}}
- 毎回taskが3つずつ増えていく様子が見えるが、stateを見るとやはり初回と結果が違う
- よく見たら、retry用にcopyされたtaskにはupstreamsがセットされていなかったことに気づいた
- 仮説だが、task実行時にupstreamsがセットされていたら、upstreamsにセットされているtaskIdのtaskのstateがすべて
success
になっていないと実行されないとかそういう挙動をするのではないだろうか - なんか
WorkflowExecutor
のこの辺 がその辺やってそうな名前のメソッドを呼んでいるのだが、あとで読む - copyしたときに直前のtaskに依存するようにupstreamsを設定するようにしてみた
- 修正したのは
DatabaseSessionStoreManager#copyInitialTasksForRetry
のこの辺
+ DatabaseTaskControlStore store = new DatabaseTaskControlStore(handle);
+ int index = 0;
for (StoredTask task : tasks) {
Task newTask = Task.taskBuilder()
- .from(task)
- .state(TaskStateCode.BLOCKED)
- .stateFlags(TaskStateFlags.empty())
- .build();
- addSubtask(tasks.get(0).getAttemptId(), newTask);
+ .from(task)
+ .state(TaskStateCode.BLOCKED)
+ .stateFlags(TaskStateFlags.empty())
+ .build();
+
+ long newTaskId = addSubtask(tasks.get(0).getAttemptId(), newTask);
+ if (index > 0) {
+ store.addDependencies(newTaskId, Arrays.asList(newTaskId - 1));
+ }
+ ++index;
- これで以下のworkflowを実行しても、failした後のtaskである
+doit+task3
が実行されなくなった
+first:
echo>: ""
append_file: out
+doit:
_retry: 3
+task1:
echo>: "try"
append_file: out
+task2:
fail>: task failed expectedly
+task3:
echo>: "not executed"
append_file: out