Activiti CamelTask(骆驼任务)
作者:Jesai
人生想讲个不成熟的建议
前言:
Camel任务可以从Camel发送和介绍消息,由此强化了activiti的集成功能。 注意camel任务不是BPMN 2.0规范定义的官方任务。 (它也没有对应的图标)。 在activiti中,camel任务时由专用的服务任务实现的。
什么是Camel?
Camel能够在大量的领域语言中让你定义路由以及中间规则,包括基于Java的Fluent API,Spring或者Blueprint XML配置文件,甚至是Scala(是一种基于JVM,集合了面向对象编程和函数式编程优点的高级程序设计语言)DSL。 您能够通过你的IDE或者Java、Scala或者XML编辑器里获得智能化路由规则补全功能。
camel首先是一个规则引擎。其次才是一个开源项目。
Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件。同时,也支持在Spring中使用XML配置定义路由和中介规则。
Camel提供的基于规则的路由(Routing)引擎
from().to().to()
这种表述可以使用Camel定义的DSL语言,xml语言以及scala语言。如下例:
from(“file:path").to("activemq:queue:queuename") 将某文件,读入并写入到ActiveMQ的JMS中。
form("file:path").to("ftp://url")将一个文件,读入并写入到ftp某目录中。
开发涉及的jar包
camel-core-2.19.1.jar
camel-spring-2.19.1.jar
activiti-camel-5.22.0.jar
Camel任务图标
注意:camel任务不是BPMN 2.0标准,它只是Activiti的一个扩展。
流程图设计以及配置:
这是一个非常简单的流程图,只有开始和结束、camel任务节点。实际应用中根据实际情况扩展。
流程图源码:
<?xml version='1.0' encoding='UTF-8'?> <definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/processdef"> <process id="CamelProcess" isExecutable="true"> <startEvent id="sid-80400B65-221F-4DE2-A7AF-1788341E7FAA"> <extensionElements> <activiti:executionListener event="start" class="light.mvc.workflow.taskListener.CamelListenerImpl" /> </extensionElements> </startEvent> <serviceTask id="MyCamelCall" name="camel任务" activiti:type="camel" /> <endEvent id="sid-6920138F-F17F-43A3-8C54-D339AC234DF8" /> <sequenceFlow id="sid-23D8E093-77BF-4D8A-8954-3730952ADEF6" sourceRef="MyCamelCall" targetRef="sid-6920138F-F17F-43A3-8C54-D339AC234DF8" /> <sequenceFlow id="sid-7B6E8FB0-B1F6-425A-8897-C9CF6A2050CB" sourceRef="sid-80400B65-221F-4DE2-A7AF-1788341E7FAA" targetRef="MyCamelCall" /> </process> <bpmndi:BPMNDiagram id="BPMNDiagram_CamelProcess"> <bpmndi:BPMNPlane bpmnElement="CamelProcess" id="BPMNPlane_CamelProcess"> <bpmndi:BPMNShape bpmnElement="sid-80400B65-221F-4DE2-A7AF-1788341E7FAA" id="BPMNShape_sid-80400B65-221F-4DE2-A7AF-1788341E7FAA"> <omgdc:Bounds height="30.0" width="30.0" x="223.75" y="86.0" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape bpmnElement="MyCamelCall" id="BPMNShape_MyCamelCall"> <omgdc:Bounds height="80.0" width="100.36219727999998" x="356.56890136" y="61.0" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape bpmnElement="sid-6920138F-F17F-43A3-8C54-D339AC234DF8" id="BPMNShape_sid-6920138F-F17F-43A3-8C54-D339AC234DF8"> <omgdc:Bounds height="28.0" width="28.0" x="600.0" y="87.0" /> </bpmndi:BPMNShape> <bpmndi:BPMNEdge bpmnElement="sid-23D8E093-77BF-4D8A-8954-3730952ADEF6" id="BPMNEdge_sid-23D8E093-77BF-4D8A-8954-3730952ADEF6"> <omgdi:waypoint x="456.93109863999996" y="101.0" /> <omgdi:waypoint x="600.0" y="101.0" /> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge bpmnElement="sid-7B6E8FB0-B1F6-425A-8897-C9CF6A2050CB" id="BPMNEdge_sid-7B6E8FB0-B1F6-425A-8897-C9CF6A2050CB"> <omgdi:waypoint x="253.75" y="101.0" /> <omgdi:waypoint x="356.56890136" y="101.0" /> </bpmndi:BPMNEdge> </bpmndi:BPMNPlane> </bpmndi:BPMNDiagram> </definitions>
这里在开始任务设置了一个监听类:
监听类实现代码:
/** * */ package light.mvc.workflow.taskListener; import java.util.HashMap; import java.util.Map; import light.mvc.service.workflow.impl.DelegateServiceImpl; import light.mvc.workflow.model.DelegateInfo; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.delegate.DelegateTask; import org.activiti.engine.delegate.JavaDelegate; import org.activiti.engine.delegate.TaskListener; import org.springframework.beans.factory.annotation.Autowired; /** * * 项目名称:lightmvc * 类名称:TaskAsigneeListenerImpl * 类描述: * 创建人:邓家海 * 创建时间:2017年6月1日 下午11:48:55 * 修改人:deng * 修改时间:2017年6月1日 下午11:48:55 * 修改备注: * @version * */ public class CamelListenerImpl implements TaskListener,JavaDelegate { @Override public void notify(DelegateTask delegateTask) { System.out.println("CamelListenerImpl notify is running"); Map<String,Object> map = delegateTask.getVariables(); Map<String, Object> variables = new HashMap<String, Object>(); variables.put("input", "Hello"); Map<String, String> outputMap = new HashMap<String, String>(); variables.put("outputMap", outputMap); delegateTask.setVariables(variables); } @Override public void execute(DelegateExecution delegateTask) throws Exception { // TODO Auto-generated method stub System.out.println("CamelListenerImpl execute is running"); Map<String,Object> map = delegateTask.getVariables(); Map<String, Object> variables = new HashMap<String, Object>(); variables.put("input", "Hello"); Map<String, String> outputMap = new HashMap<String, String>(); variables.put("outputMap", outputMap); delegateTask.setVariables(variables); } }
Camel路由规则的配置:
路由的设置有两种方式,第一种是通过java类来配置,另外一种是通过Spring配置文件配置。
(1)Java类配置:
Spring的环境下扫描路由配置
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring"> <packageScan> <package>light.mvc.workflow.camelRoute</package> </packageScan> </camelContext>
Java配置类
/** * */ package light.mvc.workflow.camelRoute; import org.apache.camel.builder.RouteBuilder; /** * * 项目名称:lightmvc * 类名称:MyCamelCallRoute * 类描述: * 创建人:邓家海 * 创建时间:2017年6月24日 下午9:00:55 * 修改人:deng * 修改时间:2017年6月24日 下午9:00:55 * 修改备注: * @version * */ public class MyCamelCallRoute extends RouteBuilder { public MyCamelCallRoute(){ System.out.println("MyCamelCallRoute is running"); } @Override public void configure() throws Exception { // TODO Auto-generated method stub // from("activiti:CamelProcess:MyCamelCall").to("log:light.mvc.workflow.camelRoute"); System.out.println("MyCamelCallRoute is running"); from("activiti:CamelProcess:MyCamelCall").to("direct:start-activiti"); from("direct:start-activiti").to("activiti:CamelStartprocess"); //from("activiti:CamelProcess:MyCamelCall?copyCamelBodyToBody=true").transform().simple("${property.input} ,jesai"); } }
(2)Spring配置文件配置:
<camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file:d:/temp/inbox?delay=30000"/> <process ref="fileConverter"/> <to uri="file:d:/temp/outbox"/> </route> </camelContext>
至此。一个简单的Activiti整合Camel的例子就已经完成了。运行部署就可以发现,控制台输出
CamelListenerImpl execute is running
同步和异步
之前的例子都是同步的。流程会等到camel规则返回之后才会停止。 一些情况下,我们需要activiti工作流继续运行。这时camelServiceTask的异步功能就特别有用。 你可以通过设置camelServiceTask的async属性来启用这个功能。
<serviceTask id="serviceAsyncPing" activiti:type="camel" activiti:async="true"/>
通过设置这个功能,camel规则会被activiti的jobExecutor异步执行。 当你在camel规则中定义了一个队列,activiti流程会在camelServiceTask执行时继续运行。 camel规则会以完全异步的方式执行。 如果你想在什么地方等待camelServiceTask的返回值,你可以使用一个receiveTask。
<receiveTask id="receiveAsyncPing" name="Wait State" />
关于Camel路由规则:
例如:
public class SimpleCamelCallRoute extends RouteBuilder { @Override
public void configure() throws Exception { from("activiti:SimpleCamelCallProcess:simpleCall").to("log: org.activiti.camel.examples.SimpleCamelCall");
}
}
注:部分 说明
终端URL 引用activiti终端
SimpleCamelCallProcess 流程名
simpleCall 流程中的Camel服务
(1)from("activiti:CamelProcess:MyCamelCall").to("log:light.mvc.workflow.camelRoute");
这个路由规则只是打印日志,什么也不做。
(2)
from("activiti:CamelProcess:MyCamelCall").to("direct:start-activiti");
from("direct:start-activiti").to("activiti:CamelStartprocess");
这个规则可以从已有的任务启动过程中去启动另外一个已经部署完成等待启动的任务。比如在这里,我们可以尝试去启动一个ServiceTask任务
1)首先我们有一个已经部署好的可以运行的ServiceTask任务。流程名称叫做CamelStartprocess
2)这个规则任务有一个监听类:
/** * */ package light.mvc.workflow.serviceTask; import org.activiti.engine.delegate.DelegateExecution; import org.activiti.engine.delegate.Expression; import org.activiti.engine.delegate.JavaDelegate; /** * * 项目名称:lightmvc * 类名称:ServiceTask * 类描述: * 创建人:邓家海 * 创建时间:2017年6月4日 下午6:18:11 * 修改人:deng * 修改时间:2017年6月4日 下午6:18:11 * 修改备注: * @version * */ public class ServiceTask implements JavaDelegate{ //流程变量 private Expression text1; //重写委托的提交方法 @Override public void execute(DelegateExecution execution) throws Exception { System.out.println("serviceTask已经执行已经执行!"); String value1 = (String) text1.getValue(execution); System.out.println(value1); execution.setVariable("var1", new StringBuffer(value1).reverse().toString()); } }
3)部署这个流程
4)设置规则
from("activiti:CamelProcess:MyCamelCall").to("direct:start-activiti");
from("direct:start-activiti").to("activiti:CamelStartprocess");
5)启动我们的Camel任务。可以看到结果
注:这里,我们启动的是Camel任务,但是Camel任务的路由规则又去启动一个Server任务。所以这里一共运行了两个流程实例。
Table 8.5. 已有的camel行为:
行为 |
URL |
描述 |
CamelBehaviorDefaultImpl |
copyVariablesToProperties |
把Activiti变量复制为Camel属性 |
CamelBehaviorCamelBodyImpl |
copyCamelBodyToBody |
只把名为"camelBody"Activiti变量复制成camel的消息体 |
CamelBehaviorBodyAsMapImpl |
copyVariablesToBodyAsMap |
把activiti的所有变量复制到一个map里,作为Camel的消息体 |
上面的表格解释和activiti变量如何传递给camel。下面的表格解释和camel的变量如何返回给activiti。 它只能配置在规则URL中。
Table 8.6. 已有的camel行为:
Url |
描述 |
|
默认 |
如果Camel消息体是一个map,把每个元素复制成activiti的变量,否则把整个camel消息体作为activiti的"camelBody"变量。 |
|
copyVariablesFromProperties |
将Camel属性以相同名称复制为Activiti变量 |
|
copyCamelBodyToBodyAsString |
和默认一样,但是如果camel消息体不是map时,先把它转换成字符串,再设置为"camelBody"。 |
|
copyVariablesFromHeader |
额外把camel头部以相同名称复制成Activiti变量 |
上面两个表是关于流程变量的设置。那么我们在规则里面去实验这几个变量启用会有什么效果:
我们设置了两个变量,一个字符串变量input和一个集合变量OutPutMap
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("input", "Hello");
Map<String, String> outputMap = new HashMap<String, String>();
variables.put("outputMap", outputMap);
from("activiti:CamelProcess:MyCamelCall").transform().simple("${property.input} ,jesai");
Camel变量传递给Activiti
1)copyCamelBodyToBody
from("activiti:CamelProcess:MyCamelCall?copyCamelBodyToBody=true").transform().simple("${property.input} ,jesai");
可以看到camelBody只把名为"camelBody"Activiti变量复制成camel的消息体
2)copyVariablesToProperties
from("activiti:CamelProcess:MyCamelCall?copyVariablesToProperties=true").transform().simple("${property.input} ,jesai");
这里,已经把Activiti变量Input的Hello复制作为camelBody的消息
把Activiti变量复制为Camel属性
3)copyVariablesToBodyAsMap
from("activiti:CamelProcess:MyCamelCall?copyVariablesToBodyAsMap=true").transform().simple("${property.input} ,jesai");
变量放到OutPutMap里面去了。
Activiti变量传递给Camel
1)默认
如果Camel消息体是一个map,把每个元素复制成activiti的变量,否则把整个camel消息体作为activiti的"camelBody"变量。
2)copyVariablesFromProperties
from("activiti:CamelProcess:MyCamelCall?copyVariablesFromProperties=true").transform().simple("${property.input} ,jesai");
将Camel属性以相同名称复制为Activiti变量
3)copyCamelBodyToBodyAsString
from("activiti:CamelProcess:MyCamelCall?copyCamelBodyToBodyAsString=true").transform().simple("${property.input} ,jesai");
和默认一样,但是如果camel消息体不是map时,先把它转换成字符串,再设置为"camelBody"。
4)copyVariablesFromHeader
from("activiti:CamelProcess:MyCamelCall?copyVariablesFromHeader=true").transform().simple("${property.input} ,jesai");
额外把camel头部以相同名称复制成Activiti变量
注:路由规则是在启动部署系统的时候就已经初始化好了的。不支持热更改。而且系统一单部署完成,运行流程实例的时候,不会执行路由配置。
实验:我们在路由的配置类里面打印一个控制台:
启动系统就会看到:
你会发现,它是在系统启动的时候执行的。并不是在流程执行的时候。
Activiti交流QQ群:634320089