▷ 创建Behavior( SequentialMultiInstanceBehavior、ParallelMultiInstanceBehavior)
```
// #parse(, )
class UserTaskParseHandler {
public void parse(BpmnParse bpmnParse, BaseElement element) {
(bpmnParse, element); // !!! 设置Behavior
/**
* 《与签、或签》
*/
if (element instanceof Activity && ((Activity) element).getLoopCharacteristics() != null) { // 《与签、或签》
createMultiInstanceLoopCharacteristics(bpmnParse, (Activity) element); // 篡改 Behavior
}
}
protected void createMultiInstanceLoopCharacteristics(BpmnParse bpmnParse, Activity modelActivity) {
MultiInstanceLoopCharacteristics loopCharacteristics = (); // 多实例配置
/**
* 创建 SequentialMultiInstanceBehavior 或 SequentialMultiInstanceBehavior,并引用modelActivity(即:代理原来的modelActivity)
* <pre>
* 当为串行
* @see
* 当为并行
* @see
* </pre>
*/
// Activity Behavior 创建多实例,活动行为
MultiInstanceActivityBehavior miActivityBehavior = createMultiInstanceActivityBehavior(modelActivity, loopCharacteristics, bpmnParse);
/**
* 替换调用原来的Behavior
*/
(miActivityBehavior); // 替换调用原来的Behavior
ExpressionManager expressionManager = ().getExpressionManager();
// loop cardinality
if ((())) {
((()));
}
// completion condition 完成条件
if ((())) { // 完成条件
(());
}
...
}
}
```
▷ 调用 Behavior(SequentialMultiInstanceBehavior、ParallelMultiInstanceBehavior)
```
public class ContinueProcessOperation extends AbstractOperation {
private static final Logger LOGGER = ();
// ...
@Override
public void run() {
FlowElement currentFlowElement = getCurrentFlowElement(execution);
if (currentFlowElement instanceof FlowNode) { // 节点
continueThroughFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) { // 连线
continueThroughSequenceFlow((SequenceFlow) currentFlowElement);
} else {
throw new FlowableException("Programmatic error: no current flow element found or invalid type: " + currentFlowElement + ". Halting.");
}
}
protected void executeProcessStartExecutionListeners() {
process = (());
executeExecutionListeners(process, (), ExecutionListener.EVENTNAME_START);
}
protected void continueThroughFlowNode(FlowNode flowNode) {
// ...
/**
* Activity
* 继承关系:CallActivity extends Activity ---> extends FlowNode ...
* 继承关系:SubProcess extends Activity ---> extends FlowNode ...
* 继承关系:UserTask extends Task extends Activity ---> extends FlowNode ...
*/
if (flowNode instanceof Activity && ((Activity) flowNode).hasMultiInstanceLoopCharacteristics()) { // !!!《与签、或签》逻辑
/**
* <pre>
* 如 flowNode 是 UserTask && 有配置 loopCharacteristics
* @see
* </pre>
*/
// the multi instance execution will look at async
executeMultiInstanceSynchronous(flowNode);
} else if (forceSynchronousOperation || !()) { // 强制同步执行 或 不允许异步
// ...
} else {
// ...
}
}
// ...
protected void executeMultiInstanceSynchronous(FlowNode flowNode) {
/**
* 1、触发监听器
*/
// Execution listener: event 'start'
if ((())) {
executeExecutionListeners(flowNode, ExecutionListener.EVENTNAME_START);
}
/**
* 没有就创建《根Execution》
*/
if (!hasMultiInstanceRootExecution(execution, flowNode)) {
execution = createMultiInstanceRootExecution(execution); // !!!!
}
/**
* <pre>
* 当为串行
* @see
* 当为并行
* @see
* </pre>
*/
// Execute the multi instance behavior
ActivityBehavior activityBehavior = (ActivityBehavior) ();
if (activityBehavior != null) {
executeActivityBehavior(activityBehavior, flowNode); // !!!! 执行 ActivityBehavior
// ...
} else {
throw new FlowableException("Expected an activity behavior in flow node " + ());
}
}
protected boolean hasMultiInstanceRootExecution(ExecutionEntity execution, FlowNode flowNode) {
ExecutionEntity currentExecution = ();
while (currentExecution != null) {
/**
* 1、是多实例《根》节点
* 2、节点id匹配
*/
if (() && ().equals(())) { // 节点id匹配
return true;
}
currentExecution = ();
}
return false;
}
protected ExecutionEntity createMultiInstanceRootExecution(ExecutionEntity execution) {
ExecutionEntity parentExecution = ();
FlowElement flowElement = (); // 节点信息
// 删除原来的节点
ExecutionEntityManager executionEntityManager = ();
(execution, null); // 删除关联数据
(execution);
/**
* 创建《根Execution》
* 插入数据到 ACT_RU_EXECUTION 表
*/
ExecutionEntity multiInstanceRootExecution = (parentExecution);
(flowElement);
(true);
(false);
return multiInstanceRootExecution;
}
protected void executeActivityBehavior(ActivityBehavior activityBehavior, FlowNode flowNode) {
("Executing activityBehavior {} on activity '{}' with execution {}", (), (), ());
// 触发事件
// ...
// 执行行为
try {
/**
* <pre>
* 当为串行
* @see #execute()
* 当为并行
* @see #execute()
* </pre>
*/
(execution);
} catch (RuntimeException e) {
if (()) {
(execution);
}
throw e;
}
}
// ...
}
```
▷ 配置对象
```
public class MultiInstanceLoopCharacteristics extends BaseElement {
protected CollectionHandler collectionHandler; //
// 循环次数(需参与的人数)-- 可选
// 循环次数确定有两种方式:1、居于配置的本值;2、使用候选人列表的长度做为循环次数
// 循环次数取值的优先级:
// 1、使用inputDataItem设置的el表达式,读取execution的值
// 2、使用collectionVariable设置的值作为key名,读取execution的值 --- 已经废弃
// 3、使用collectionString的值作为值
// 1、使用collectionString的值作为key名,读取execution的值
protected String loopCardinality; // loopCardinalityExpression,支持el表达式
// 参与人列表
protected String inputDataItem; // collectionExpression,支持el表达式,从execution读取《参与人列表》的key名
protected String collectionString; // collectionString,从execution读取《参与人列表》的key名
// 存放的数据的key
protected String elementVariable; // collectionElementVariable,在《创建的子execution》存放候选人信息的key名
protected String elementIndexVariable; // collectionElementIndexVariable,默认为:"loopCounter",在《创建的子execution》存放候选人编号的key名
// 完成条件
protected String completionCondition; // completionCondition,支持el表达式,循环完成的条件
// 执行方式
protected boolean sequential; // 是否串行执行
}
```
▷ 串行任务的<发起> - SequentialMultiInstanceBehavior
```
public class SequentialMultiInstanceBehavior extends MultiInstanceActivityBehavior {
public void execute(DelegateExecution delegateExecution) {
ExecutionEntity execution = (ExecutionEntity) delegateExecution;
// // 读取变量 "loopCounter" 的值(每个子Execution都会有这个属性)
if (getLocalLoopVariable(execution, getCollectionElementIndexVariable()) == null) {
int nrOfInstances = 0;
int nrOfInstances = resolveNrOfInstances(multiInstanceRootExecution); // 要有多个人参与签名
/**
* 创建一个 《子Execution》
*/
// Create child execution that will execute the inner behavior
ExecutionEntity execution = ().createChildExecution((ExecutionEntity) multiInstanceRootExecution);
(());
// 设变量到《根Execution》
// Set Multi-instance variables
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES, nrOfInstances); // 设置变量到《根Execution》 nrOfInstances
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, 0); // nrOfCompletedInstances 已完成的实例数量
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES, 1); // nrOfActiveInstances 活跃的实例数量
logLoopDetails(multiInstanceRootExecution, "initialized", 0, 0, 1, nrOfInstances);
/**
* <pre>
* 1、设置"loopCounter=0"的值到 execution
* 2、触发真实的行为Behavior
* </pre>
*/
executeOriginalBehavior(execution, (ExecutionEntity) multiInstanceRootExecution, 0);
// ...
} else {
// 。。。
(execution);
}
}
protected void executeOriginalBehavior(DelegateExecution execution, ExecutionEntity multiInstanceRootExecution, int loopCounter) {
if (usesCollection() && collectionElementVariable != null) { // collectionElementVariable
/**
* 读取参与人列表
* <pre>
* obj为如下
* 1、使用collectionExpression设置的el表达式,读取execution的值
* 2、使用collectionVariable设置的值作为key名,读取execution的值 --- 已经废弃
* 3、使用collectionString的值作为值
* 1、使用collectionString的值作为key名,读取execution的值
* </pre>
*/
Collection collection = (Collection) resolveAndValidateCollection(execution);
Object value = null;
int index = 0;
Iterator it = ();
while (index <= loopCounter) {
value = ();
index++;
}
/**
* 设置值到execution,key为collectionElementVariable,value为
* 即:各个候选人信息,存到《子execution》里面
*
* <pre>
* 1、execution === 子execution
* 2、collectionElementVariable 要存放候选者信息的key名,默认为:"loopCounter"
* 3、value的候选人信息
* </pre>
*/
setLoopVariable(execution, collectionElementVariable, value);
}
(activity);
/**
* @see #planContinueMultiInstanceOperation(, , int)
*/
().planContinueMultiInstanceOperation((ExecutionEntity) execution, multiInstanceRootExecution, loopCounter);
}
}
```
▷ 串行任务的<结束> - SequentialMultiInstanceBehavior
```
#complete(, )
(task, variables, transientVariables, localScope, commandContext); // !!!
#run()
#trigger(, , )
#trigger(, , )
#leave()
#leave()
public class SequentialMultiInstanceBehavior extends MultiInstanceActivityBehavior {
// ....
public void leave(DelegateExecution execution) {
DelegateExecution multiInstanceRootExecution = getMultiInstanceRootExecution(execution); // 获取《根Execution》
int loopCounter = getLoopVariable(execution, getCollectionElementIndexVariable()) + 1; // loopCounter + 1
int nrOfInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES); // 需参与的实例总数
int nrOfCompletedInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES) + 1; // 已完成的实例数+1
int nrOfActiveInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES); // 活跃的实例数
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, nrOfCompletedInstances); // 已完成的实例数
logLoopDetails(execution, "instance completed", loopCounter, nrOfCompletedInstances, nrOfActiveInstances, nrOfInstances);
// 触发监听器
callActivityEndListeners(execution);
boolean completeConditionSatisfied = completionConditionSatisfied(multiInstanceRootExecution); // 满足《完成条件》el表达式
/**
* 《参与的数量》超过了《需参与的实例总数》 || 满足《完成条件》el表达式
*/
if (loopCounter >= nrOfInstances || completeConditionSatisfied) {
if(completeConditionSatisfied) {
sendCompletedWithConditionEvent(multiInstanceRootExecution); // 满足完成条件
}
else {
sendCompletedEvent(multiInstanceRootExecution); // 所有《子Execution》都完成
}
(execution);
} else {
/**
* 串行多实例《未完成》,触发执行下一个候选人
*
* loopCounter + 1
*/
continueSequentialMultiInstance(execution, loopCounter, (ExecutionEntity) multiInstanceRootExecution);
}
}
// ....
}
```
▷ 并行任务的<发起> - ParallelMultiInstanceBehavior
```
public class ParallelMultiInstanceBehavior extends MultiInstanceActivityBehavior {
public void execute(DelegateExecution delegateExecution) {
// ...
int nrOfInstances = resolveNrOfInstances(multiInstanceRootExecution); // 要有多个人参与签名
if (nrOfInstances < 0) {
throw new FlowableIllegalArgumentException("Invalid number of instances: must be non-negative integer value" + ", but was " + nrOfInstances);
}
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES, nrOfInstances); // 设置变量到《根Execution》 nrOfInstances
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, 0); // nrOfCompletedInstances 已完成的实例数量
setLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES, nrOfInstances); // nrOfActiveInstances 活跃的实例数量
/**
* 创建多个 《子Execution》
*/
List<ExecutionEntity> concurrentExecutions = new ArrayList<>();
for (int loopCounter = 0; loopCounter < nrOfInstances; loopCounter++) {
/**
* @see #createChildExecution()
*/
ExecutionEntity concurrentExecution = ()
.createChildExecution((ExecutionEntity) multiInstanceRootExecution);
(activity);
(true);
(false);
(concurrentExecution);
logLoopDetails(concurrentExecution, "initialized", loopCounter, 0, nrOfInstances, nrOfInstances);
//().recordActivityStart(concurrentExecution);
}
// ...
for (int loopCounter = 0; loopCounter < nrOfInstances; loopCounter++) {
ExecutionEntity concurrentExecution = (loopCounter);
// executions can be inactive, if instances are all automatics
// (no-waitstate) and completionCondition has been met in the meantime
if (()
&& !()
&& !().isEnded()) {
/**
* <pre>
* 1、设置"loopCounter"的值到 concurrentExecution(子Execution)
* 2、触发真实的行为Behavior
* </pre>
*/
executeOriginalBehavior(concurrentExecution, (ExecutionEntity) multiInstanceRootExecution, loopCounter);
}
}
// ...
return nrOfInstances;
}
}
```
▷ 并行任务的<结束> - ParallelMultiInstanceBehavior
```
public class ParallelMultiInstanceBehavior extends MultiInstanceActivityBehavior {
public void leave(DelegateExecution execution) {
boolean zeroNrOfInstances = false;
if (resolveNrOfInstances(execution) == 0) {
// Empty collection, just leave.
zeroNrOfInstances = true;
(execution); // Plan the default leave
}
int loopCounter = getLoopVariable(execution, getCollectionElementIndexVariable()); // 获取当前《子execution》存放的编号
int nrOfInstances = getLoopVariable(execution, NUMBER_OF_INSTANCES); // 需参与的实例总数
int nrOfCompletedInstances = getLoopVariable(execution, NUMBER_OF_COMPLETED_INSTANCES) + 1; // 已完成的实例数 + 1
int nrOfActiveInstances = getLoopVariable(execution, NUMBER_OF_ACTIVE_INSTANCES) - 1; // 活跃的实例数 - 1
DelegateExecution miRootExecution = getMultiInstanceRootExecution(execution); // 获取《多实例的根Execution》
if (miRootExecution != null) { // will be null in case of empty collection
setLoopVariable(miRootExecution, NUMBER_OF_COMPLETED_INSTANCES, nrOfCompletedInstances); // 已完成的实例数 - 更新
setLoopVariable(miRootExecution, NUMBER_OF_ACTIVE_INSTANCES, nrOfActiveInstances); // 活跃的实例数 - 更新
}
/**
* @see #recordActivityEnd(, )
* <pre>
* 1、更新ACT_RU_ACTINST表的 deleteReason、endTime、durationInMillis 字段
* 2、更新ACT_HI_ACTINST表的 deleteReason、endTime、durationInMillis 字段
* </pre>
*/
().recordActivityEnd((ExecutionEntity) execution, null);
// 调用监听器
callActivityEndListeners(execution);
logLoopDetails(execution, "instance completed", loopCounter, nrOfCompletedInstances, nrOfActiveInstances, nrOfInstances);
if (zeroNrOfInstances) {
return;
}
ExecutionEntity executionEntity = (ExecutionEntity) execution;
if (() != null) { // 《根Execution》
/**
* = false;
*/
();
/**
* 设置 = true; 即:把缓存的executionEntity,设置设置为必须更新到db
*/
lockFirstParentScope(executionEntity);
boolean isCompletionConditionSatisfied = completionConditionSatisfied(()); // 满足《完成条件》el表达式
if (nrOfCompletedInstances >= nrOfInstances || isCompletionConditionSatisfied) {
ExecutionEntity leavingExecution = null;
if (nrOfInstances > 0) {
leavingExecution = ();
} else {
().recordActivityEnd((ExecutionEntity) execution, null);
leavingExecution = executionEntity;
}
Activity activity = (Activity) ();
verifyCompensation(execution, leavingExecution, activity); // 校验补偿
verifyCallActivity(leavingExecution, activity);
if (isCompletionConditionSatisfied) { // 满足《完成条件》el表达式
LinkedList<DelegateExecution> toVerify = new LinkedList<>(());
while (!()) {
DelegateExecution childExecution = ();
if (((ExecutionEntity) childExecution).isInserted()) {
();
}
List<DelegateExecution> childExecutions = (List<DelegateExecution>) ();
if (childExecutions != null && !()) {
(childExecutions);
}
}
sendCompletedWithConditionEvent(leavingExecution);
} else { // 所有《子Execution》都完成
sendCompletedEvent(leavingExecution);
}
(leavingExecution);
}
} else {
sendCompletedEvent(execution);
(execution);
}
}
}
```
▷ 并行完成的时候,如何避免写覆盖(CAS)
```
<update parameterType="">
update ${prefix}ACT_RU_EXECUTION set
REV_ = #{revisionNext, jdbcType=INTEGER}, -- 乐观锁字段
BUSINESS_KEY_ = #{businessKey, jdbcType=VARCHAR},
PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
ACT_ID_ = #{activityId, jdbcType=VARCHAR},
IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER},
NAME_ = #{name, jdbcType=VARCHAR}
where ID_ = #{id, jdbcType=VARCHAR}
and REV_ = #{revision, jdbcType=INTEGER} -- 乐观锁字段
</update>
```
.