flowable 6.4.2 - 多实例(与签、或签的实现机制)

时间:2025-03-23 07:04:07

▷ 创建Behavior( SequentialMultiInstanceBehavior、ParallelMultiInstanceBehavior)

```
// #parse(, )

class UserTaskParseHandler {
    
    public void parse(BpmnParse bpmnParse, BaseElement element) {
        (bpmnParse, element); // !!! 设置Behavior

        /**
         * 《与签、或签》
         */
        if (element instanceof Activity && ((Activity) element).getLoopCharacteristics() != null) { // 《与签、或签》
            createMultiInstanceLoopCharacteristics(bpmnParse, (Activity) element); // 篡改 Behavior
        }
    }    
    
    protected void createMultiInstanceLoopCharacteristics(BpmnParse bpmnParse, Activity modelActivity) {

        MultiInstanceLoopCharacteristics loopCharacteristics = (); // 多实例配置

        /**
         * 创建 SequentialMultiInstanceBehavior 或 SequentialMultiInstanceBehavior,并引用modelActivity(即:代理原来的modelActivity)
         * <pre>
         *     当为串行
         *          @see 
         *     当为并行
         *          @see 
         * </pre>
         */
        // Activity Behavior 创建多实例,活动行为
        MultiInstanceActivityBehavior miActivityBehavior = createMultiInstanceActivityBehavior(modelActivity, loopCharacteristics, bpmnParse);

        /**
         * 替换调用原来的Behavior
         */
        (miActivityBehavior); // 替换调用原来的Behavior

        ExpressionManager expressionManager = ().getExpressionManager();

        // loop cardinality
        if ((())) {
            ((()));
        }

        // completion condition 完成条件
        if ((())) { // 完成条件
            (());
        }

        ...
    }
    
}
```

▷ 调用 Behavior(SequentialMultiInstanceBehavior、ParallelMultiInstanceBehavior)

```
public class ContinueProcessOperation extends AbstractOperation {

    private static final Logger LOGGER = ();

    // ...

    @Override
    public void run() {
        FlowElement currentFlowElement = getCurrentFlowElement(execution);
        if (currentFlowElement instanceof FlowNode) { // 节点
            continueThroughFlowNode((FlowNode) currentFlowElement);
        } else if (currentFlowElement instanceof SequenceFlow) { // 连线
            continueThroughSequenceFlow((SequenceFlow) currentFlowElement);
        } else {
            throw new FlowableException("Programmatic error: no current flow element found or invalid type: " + currentFlowElement + ". Halting.");
        }
    }

    protected void executeProcessStartExecutionListeners() {
         process = (());
        executeExecutionListeners(process, (), ExecutionListener.EVENTNAME_START);
    }

    protected void continueThroughFlowNode(FlowNode flowNode) {
        
        // ...
        /**
         *      Activity
         *          继承关系:CallActivity extends Activity ---> extends FlowNode ...
         *          继承关系:SubProcess extends Activity ---> extends FlowNode ...
         *          继承关系:UserTask extends Task extends Activity ---> extends FlowNode ...
         */
        if (flowNode instanceof Activity && ((Activity) flowNode).hasMultiInstanceLoopCharacteristics()) { // !!!《与签、或签》逻辑
            /**
             * <pre>
             * 如 flowNode 是 UserTask && 有配置 loopCharacteristics
             *      @see 
             * </pre>
             */
            // the multi instance execution will look at async
            executeMultiInstanceSynchronous(flowNode);

        } else if (forceSynchronousOperation || !()) { //  强制同步执行 或 不允许异步
            // ...

        } else {
            // ...

        }
    }

    // ...

    protected void executeMultiInstanceSynchronous(FlowNode flowNode) {

        /**
         * 1、触发监听器
         */
        // Execution listener: event 'start'
        if ((())) {
            executeExecutionListeners(flowNode, ExecutionListener.EVENTNAME_START);
        }

        /**
         * 没有就创建《根Execution》
         */
        if (!hasMultiInstanceRootExecution(execution, flowNode)) {
            execution = createMultiInstanceRootExecution(execution); // !!!!
        }

        /**
         * <pre>
         *     当为串行
         *          @see 
         *     当为并行
         *          @see 
         * </pre>
         */
        // Execute the multi instance behavior
        ActivityBehavior activityBehavior = (ActivityBehavior) ();

        if (activityBehavior != null) {
            executeActivityBehavior(activityBehavior, flowNode); // !!!! 执行 ActivityBehavior
            
            // ...
            
        } else {
            throw new FlowableException("Expected an activity behavior in flow node " + ());
        }
    }
    
    protected boolean hasMultiInstanceRootExecution(ExecutionEntity execution, FlowNode flowNode) {
        ExecutionEntity currentExecution = ();
        while (currentExecution != null) {
            /**
             * 1、是多实例《根》节点
             * 2、节点id匹配
             */
            if (() && ().equals(())) { // 节点id匹配
                return true;
            }
            currentExecution = ();
        }
        return false;
    }
    
    protected ExecutionEntity createMultiInstanceRootExecution(ExecutionEntity execution) {
        ExecutionEntity parentExecution = ();
        FlowElement flowElement = (); // 节点信息

        // 删除原来的节点
        ExecutionEntityManager executionEntityManager = ();
        (execution, null); // 删除关联数据
        (execution);

        /**
         * 创建《根Execution》
         * 插入数据到 ACT_RU_EXECUTION 表
         */
        ExecutionEntity multiInstanceRootExecution = (parentExecution);
        (flowElement);
        (true);
        (false);
        return multiInstanceRootExecution;
    }

    protected void executeActivityBehavior(ActivityBehavior activityBehavior, FlowNode flowNode) {
        ("Executing activityBehavior {} on activity '{}' with execution {}", (), (), ());

        // 触发事件
        // ...

        // 执行行为
        try {
            /**
             * <pre>
             *     当为串行
             *          @see #execute()
             *     当为并行
             *          @see #execute()
             * </pre>
             */
            (execution);
        } catch (RuntimeException e) {
            if (()) {
                (execution);
            }
            throw e;
        }
    }

    // ...
}

```

▷ 配置对象


```
public class MultiInstanceLoopCharacteristics extends BaseElement {
    protected CollectionHandler collectionHandler; // 
    
    // 循环次数(需参与的人数)-- 可选
    // 循环次数确定有两种方式:1、居于配置的本值;2、使用候选人列表的长度做为循环次数
    // 循环次数取值的优先级:
    //      1、使用inputDataItem设置的el表达式,读取execution的值
    //      2、使用collectionVariable设置的值作为key名,读取execution的值  --- 已经废弃
    //      3、使用collectionString的值作为值
    //          1、使用collectionString的值作为key名,读取execution的值
    protected String loopCardinality;  // loopCardinalityExpression,支持el表达式
    
    // 参与人列表
    protected String inputDataItem; // collectionExpression,支持el表达式,从execution读取《参与人列表》的key名
    protected String collectionString; // collectionString,从execution读取《参与人列表》的key名
    
    // 存放的数据的key
    protected String elementVariable; // collectionElementVariable,在《创建的子execution》存放候选人信息的key名
    protected String elementIndexVariable; // collectionElementIndexVariable,默认为:"loopCounter",在《创建的子execution》存放候选人编号的key名
    
    // 完成条件
    protected String completionCondition; // completionCondition,支持el表达式,循环完成的条件
    
    // 执行方式
    protected boolean sequential; // 是否串行执行
}	



```

▷ 串行任务的<发起> - SequentialMultiInstanceBehavior

```
public class SequentialMultiInstanceBehavior extends MultiInstanceActivityBehavior {
    public void execute(DelegateExecution delegateExecution) {
        ExecutionEntity execution = (ExecutionEntity) delegateExecution;
        // // 读取变量 "loopCounter" 的值(每个子Execution都会有这个属性)
        if (getLocalLoopVariable(execution, getCollectionElementIndexVariable()) == null) { 

            int nrOfInstances = 0;
            int nrOfInstances = resolveNrOfInstances(multiInstanceRootExecution); // 要有多个人参与签名
            
            /**
             * 创建一个 《子Execution》
             */
            // Create child execution that will execute the inner behavior
            ExecutionEntity execution = ().createChildExecution((ExecutionEntity) multiInstanceRootExecution);
            (());
    
            // 设变量到《根Execution》
            // Set Multi-instance variables
            setLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES, nrOfInstances); // 设置变量到《根Execution》 nrOfInstances
            setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, 0); // nrOfCompletedInstances 已完成的实例数量
            setLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES, 1); // nrOfActiveInstances 活跃的实例数量
            logLoopDetails(multiInstanceRootExecution, "initialized", 0, 0, 1, nrOfInstances);

            /**
             * <pre>
             *     1、设置"loopCounter=0"的值到 execution
             *     2、触发真实的行为Behavior
             * </pre>
             */
            executeOriginalBehavior(execution, (ExecutionEntity) multiInstanceRootExecution, 0);
            // ...
        } else {
            // 。。。
            (execution);
        }
    }
    
    protected void executeOriginalBehavior(DelegateExecution execution, ExecutionEntity multiInstanceRootExecution, int loopCounter) {
        if (usesCollection() && collectionElementVariable != null) { // collectionElementVariable 
        
            /**
             * 读取参与人列表
             * <pre>
             *     obj为如下
             *          1、使用collectionExpression设置的el表达式,读取execution的值
             *          2、使用collectionVariable设置的值作为key名,读取execution的值  --- 已经废弃
             *          3、使用collectionString的值作为值
             *              1、使用collectionString的值作为key名,读取execution的值
             * </pre>
             */
            Collection collection = (Collection) resolveAndValidateCollection(execution);

            Object value = null;
            int index = 0;
            Iterator it = ();
            while (index <= loopCounter) {
                value = ();
                index++;
            }

            /**
             * 设置值到execution,key为collectionElementVariable,value为
             * 即:各个候选人信息,存到《子execution》里面
             *
             * <pre>
             *     1、execution === 子execution
             *     2、collectionElementVariable 要存放候选者信息的key名,默认为:"loopCounter"
             *     3、value的候选人信息
             * </pre>
             */
            setLoopVariable(execution, collectionElementVariable, value);
        }

        (activity);

        /**
         * @see #planContinueMultiInstanceOperation(, , int)
         */
        ().planContinueMultiInstanceOperation((ExecutionEntity) execution, multiInstanceRootExecution, loopCounter);
    }
}

```

▷ 串行任务的<结束> - SequentialMultiInstanceBehavior

```
#complete(, )
	(task, variables, transientVariables, localScope, commandContext); // !!!
		#run()
			#trigger(, , )
                #trigger(, , ) 
                	#leave()
                		#leave()
                		
public class SequentialMultiInstanceBehavior extends MultiInstanceActivityBehavior {
    
    // ....
    
    public void leave(DelegateExecution execution) {
        DelegateExecution multiInstanceRootExecution = getMultiInstanceRootExecution(execution); // 获取《根Execution》
        int loopCounter = getLoopVariable(execution, getCollectionElementIndexVariable()) + 1; // loopCounter + 1
        int nrOfInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES); // 需参与的实例总数
        int nrOfCompletedInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES) + 1; // 已完成的实例数+1
        int nrOfActiveInstances = getLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES); // 活跃的实例数

        setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, nrOfCompletedInstances);  // 已完成的实例数
        logLoopDetails(execution, "instance completed", loopCounter, nrOfCompletedInstances, nrOfActiveInstances, nrOfInstances);

        // 触发监听器
        callActivityEndListeners(execution);

        boolean completeConditionSatisfied = completionConditionSatisfied(multiInstanceRootExecution); // 满足《完成条件》el表达式
        /**
         * 《参与的数量》超过了《需参与的实例总数》 ||  满足《完成条件》el表达式
         */
        if (loopCounter >= nrOfInstances || completeConditionSatisfied) {
            if(completeConditionSatisfied) {
                sendCompletedWithConditionEvent(multiInstanceRootExecution); // 满足完成条件
            }
            else {
                sendCompletedEvent(multiInstanceRootExecution); // 所有《子Execution》都完成
            }

            (execution);
        } else {
            /**
             * 串行多实例《未完成》,触发执行下一个候选人
             *
             * loopCounter + 1
             */
            continueSequentialMultiInstance(execution, loopCounter, (ExecutionEntity) multiInstanceRootExecution);
        }
    }
    
    // ....
}
```

▷ 并行任务的<发起> - ParallelMultiInstanceBehavior 

```
public class ParallelMultiInstanceBehavior extends MultiInstanceActivityBehavior {

    public void execute(DelegateExecution delegateExecution) {
        // ... 
        int nrOfInstances = resolveNrOfInstances(multiInstanceRootExecution); // 要有多个人参与签名
        if (nrOfInstances < 0) {
            throw new FlowableIllegalArgumentException("Invalid number of instances: must be non-negative integer value" + ", but was " + nrOfInstances);
        }

        setLoopVariable(multiInstanceRootExecution, NUMBER_OF_INSTANCES, nrOfInstances); // 设置变量到《根Execution》 nrOfInstances
        setLoopVariable(multiInstanceRootExecution, NUMBER_OF_COMPLETED_INSTANCES, 0);  // nrOfCompletedInstances 已完成的实例数量
        setLoopVariable(multiInstanceRootExecution, NUMBER_OF_ACTIVE_INSTANCES, nrOfInstances);  // nrOfActiveInstances 活跃的实例数量

        /**
         * 创建多个 《子Execution》
         */
        List<ExecutionEntity> concurrentExecutions = new ArrayList<>();
        for (int loopCounter = 0; loopCounter < nrOfInstances; loopCounter++) {
            /**
             * @see #createChildExecution()
             */
            ExecutionEntity concurrentExecution = ()
                    .createChildExecution((ExecutionEntity) multiInstanceRootExecution);
            (activity);
            (true);
            (false);

            (concurrentExecution);
            logLoopDetails(concurrentExecution, "initialized", loopCounter, 0, nrOfInstances, nrOfInstances);

            //().recordActivityStart(concurrentExecution);
        }

        // ... 
        for (int loopCounter = 0; loopCounter < nrOfInstances; loopCounter++) {
            ExecutionEntity concurrentExecution = (loopCounter);
            // executions can be inactive, if instances are all automatics
            // (no-waitstate) and completionCondition has been met in the meantime
            if (()
                    && !()
                    && !().isEnded()) {
                /**
                 * <pre>
                 *     1、设置"loopCounter"的值到 concurrentExecution(子Execution)
                 *     2、触发真实的行为Behavior
                 * </pre>
                 */
                executeOriginalBehavior(concurrentExecution, (ExecutionEntity) multiInstanceRootExecution, loopCounter);
            }
        }

        // ...
        return nrOfInstances;
    }
}
```

▷ 并行任务的<结束> - ParallelMultiInstanceBehavior


```
public class ParallelMultiInstanceBehavior extends MultiInstanceActivityBehavior {

    public void leave(DelegateExecution execution) {

        boolean zeroNrOfInstances = false;
        if (resolveNrOfInstances(execution) == 0) {
            // Empty collection, just leave.
            zeroNrOfInstances = true;
            (execution); // Plan the default leave
        }

        int loopCounter = getLoopVariable(execution, getCollectionElementIndexVariable()); // 获取当前《子execution》存放的编号
        int nrOfInstances = getLoopVariable(execution, NUMBER_OF_INSTANCES);  // 需参与的实例总数
        int nrOfCompletedInstances = getLoopVariable(execution, NUMBER_OF_COMPLETED_INSTANCES) + 1; // 已完成的实例数 + 1
        int nrOfActiveInstances = getLoopVariable(execution, NUMBER_OF_ACTIVE_INSTANCES) - 1; // 活跃的实例数 - 1 

        DelegateExecution miRootExecution = getMultiInstanceRootExecution(execution); // 获取《多实例的根Execution》
        if (miRootExecution != null) { // will be null in case of empty collection
            setLoopVariable(miRootExecution, NUMBER_OF_COMPLETED_INSTANCES, nrOfCompletedInstances); // 已完成的实例数 - 更新
            setLoopVariable(miRootExecution, NUMBER_OF_ACTIVE_INSTANCES, nrOfActiveInstances); // 活跃的实例数 - 更新
        }

        /**
         * @see #recordActivityEnd(, )
         * <pre>
         *     1、更新ACT_RU_ACTINST表的 deleteReason、endTime、durationInMillis 字段
         *     2、更新ACT_HI_ACTINST表的 deleteReason、endTime、durationInMillis 字段
         * </pre>
         */
        ().recordActivityEnd((ExecutionEntity) execution, null);
        
        // 调用监听器
        callActivityEndListeners(execution);

        logLoopDetails(execution, "instance completed", loopCounter, nrOfCompletedInstances, nrOfActiveInstances, nrOfInstances);

        if (zeroNrOfInstances) {
            return;
        }

        ExecutionEntity executionEntity = (ExecutionEntity) execution;
        if (() != null) { // 《根Execution》
            /**
             *  = false;
             */
            ();

            /**
             * 设置 = true; 即:把缓存的executionEntity,设置设置为必须更新到db
             */
            lockFirstParentScope(executionEntity);

            boolean isCompletionConditionSatisfied = completionConditionSatisfied(()); // 满足《完成条件》el表达式
            if (nrOfCompletedInstances >= nrOfInstances || isCompletionConditionSatisfied) {

                ExecutionEntity leavingExecution = null;
                if (nrOfInstances > 0) {
                    leavingExecution = ();
                } else {
                    ().recordActivityEnd((ExecutionEntity) execution, null);
                    leavingExecution = executionEntity;
                }

                Activity activity = (Activity) ();
                verifyCompensation(execution, leavingExecution, activity); // 校验补偿
                verifyCallActivity(leavingExecution, activity);

                if (isCompletionConditionSatisfied) { // 满足《完成条件》el表达式
                    LinkedList<DelegateExecution> toVerify = new LinkedList<>(());
                    while (!()) {
                        DelegateExecution childExecution = ();
                        if (((ExecutionEntity) childExecution).isInserted()) {
                            ();
                        }

                        List<DelegateExecution> childExecutions = (List<DelegateExecution>) ();
                        if (childExecutions != null && !()) {
                            (childExecutions);
                        }
                    }
                    sendCompletedWithConditionEvent(leavingExecution);
                } else { // 所有《子Execution》都完成
                    sendCompletedEvent(leavingExecution);
                }

                (leavingExecution);
            }

        } else {
            sendCompletedEvent(execution);
            (execution);
        }
    }
}
```

▷ 并行完成的时候,如何避免写覆盖(CAS)

```
<update  parameterType="">
    update ${prefix}ACT_RU_EXECUTION set
      REV_ = #{revisionNext, jdbcType=INTEGER}, -- 乐观锁字段
      BUSINESS_KEY_ = #{businessKey, jdbcType=VARCHAR},
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR},
      ACT_ID_ = #{activityId, jdbcType=VARCHAR},
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN},
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN},
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN},
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN},
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR},
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR},
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER},
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER},
      NAME_ = #{name, jdbcType=VARCHAR}
    where ID_ = #{id, jdbcType=VARCHAR}
      and REV_ = #{revision, jdbcType=INTEGER}  -- 乐观锁字段
</update>
```

.