Created
December 15, 2011 03:58
-
-
Save anson0370/1479771 to your computer and use it in GitHub Desktop.
core run of kongur
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void run(KongurContext context, ActivityNode currentNode, Long processInstanceId, String nodeType, | |
PvmExecution executor) throws SchedulerException, BizException { | |
// 如果有流程实例 先更新流程实例 再去做节点内容 | |
// 先select再update 避免mysql中的死锁 | |
if (processInstanceId != null) { | |
Activity activity = activityDAO.selectFailedByProcessInstanceIdAndActivityName(processInstanceId, | |
currentNode.getName()); | |
if (activity == null) { | |
if (nodeType.endsWith(NodeExecutor.NOTIFY_EXECUTOR)) { | |
// 这种情况允许继续执行 去重在节点的notify执行器中做 | |
// 现在可能的就是taskComplete和webpageComplete | |
logger.info("notify当前节点时没有未完成的activity记录[processId:" + processInstanceId + " activityName:" | |
+ currentNode.getName() + "],继续执行"); | |
// 直接插入成功的activity记录 | |
saveActivityRecord(context, currentNode, processInstanceId, KongurContextKeys.STATUS_NOTIFY, true); | |
} else { | |
logger.warn("不存在未完成的activity记录[processId:" + processInstanceId + " activityName:" | |
+ currentNode.getName() + "]"); | |
// 挂起流程 | |
context.setStatus(KongurContextKeys.STATUS_WAIT); | |
return; | |
} | |
} else { | |
// 乐观锁去重 | |
int rows = activityDAO.updateActivityToSuccess(activity.getId()); | |
if (rows == 0) { | |
logger.warn("重复执行被拦截[activityId:" + activity.getId() + "]"); | |
// 挂起流程 | |
context.setStatus(KongurContextKeys.STATUS_WAIT); | |
return; | |
} | |
} | |
} | |
logger.debug("ready for execution " + "[nodeName:" + currentNode.getName() + ", nodeType:" + nodeType | |
+ ", executor:" + executor.getClass().getName() + "]"); | |
context.getProps().put(VariableKeys.CURRENT_PROCESS_NODE_NAME, currentNode.getName()); | |
context.getProps().put(VariableKeys.CURRENT_PROCESS_NODE_TYPE, currentNode.getType()); | |
try { | |
// 执行执行器 | |
ExecutionExecutor.execute(executor, context); | |
// 异常处理 | |
ExceptionStrategyHelper.doExceptionStrategy(context, currentNode); | |
// 执行后置拦截器 | |
InterceptorExecutor.runAfterInterceptors(context); | |
} catch (ExecutionException e) { | |
throw new SchedulerException("执行中出错,执行器:" + executor.getClass().getName(), e); | |
} | |
if (processInstanceId != null) { | |
variableHandler.persistVariables(context.getProps(), processInstanceId, context.getProcessDefinition() | |
.getBizApp(), context.getProcessDefinition().getBizModule()); | |
} | |
// 知道下一步要去哪里就应该在当前事务中预先创建下一步的activity记录 | |
if (context.isReady() || context.isAsync()) { | |
//异常策略会直接设置currentNode | |
if (!"end".equals(context.getCurrentNode().getType())) { | |
context.setCurrentNode(computeNextNode(context)); | |
} | |
if (processInstanceId != null) { | |
ProcessInstance processInstance = processInstanceDAO.selectByPrimaryKey(processInstanceId); | |
processInstance.setLastActivity(processInstance.getCurrntActivity()); | |
processInstance.setCurrntActivity(context.getCurrentNode().getName()); | |
processInstance.setGmtModified(new Date()); | |
processInstanceDAO.updateByPrimaryKey(processInstance); | |
saveActivityRecord(context, context.getCurrentNode(), processInstanceId, | |
KongurContextKeys.STATUS_READY, false); | |
} | |
} else if (context.isFork()) { | |
// 对于fork的节点 就把所有fork路径开始的activity都创建了 | |
context.getPrivateProps().put(KongurContextKeys.CURRENT_FORK_ACTIVITY_LIST, | |
forkActivity(context, currentNode, processInstanceId)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment