Camel的数据转换
在做系统集成的时候,必不可少的任务就是将数据从一种格式转换为另一种格式,再把转换后的格式发到目标系统:
Camel提供的Message translator可以分为:
■ Using a Processor
■ Using beans
■ Using <transform>
1,利用processor的方式在Apache Camel框架入门示例 已经有个介绍.blog.csdn.net/kkdelta/article/details/7231640
通过实现Processor的方法process(Exchange exchange),通常要做的步骤是,从inbound消息里得到数据,转换,再放到outbound的消息里.
XXX data = exchange.getIn().getBody(XXX.class);
//转换
exchange.getOut().setBody(yyyObject);
2,利用bean的方式,写一个java bean如下:
public class OrderToCsvBean {
public String map(String custom) {
String id = custom.substring(0, 9);
String customerId = custom.substring(10, 19);
String date = custom.substring(20, 29);
String items = custom.substring(30);
String[] itemIds = items.split("@");
StringBuilder csv = new StringBuilder();
csv.append(id.trim());
csv.append(",").append(date.trim());
csv.append(",").append(customerId.trim());
for (String item : itemIds) {
csv.append(",").append(item.trim());
}
return csv.toString();
}
}
在route里的代码如下:
from("file:d:/temp/inbox1/?delay=30000").bean(new OrderToCsvBean()).to("file:d:/temp/outbox1");
Camel有一套算法来选择调用bean里的什么方法(比如说message的header里是否通过CamelBeanMethodName设置了方法名称,bean是否只有一个方法,某一个方法是否加了@Handler注解等等),这里的bean只有一个map方法,Camel会调用这个方法.
3,用route中已经定义好的transform方法,一般用的较少.
from("direct:start").transform(body().regexReplaceAll("\n", "<br/>")).to("mock:result");
4,Camel集成对许多常用的数据格式的解析:通过调用unmarshal方法可以得到解析后的结果.
CSV:
from("file:d:/temp/inbox1/?delay=30000").unmarshal().csv().process(processor).to("file:d:/temp/outbox1");
processor里可以直接得到解析后的数据:
public class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("process...");
List csvData = (List) exchange.getIn().getBody();
StringBuffer strbf = new StringBuffer();
if (csvData.get(0).getClass().equals(String.class)) {//single line CSV
strbf.append(parseOrder(csvData));
} else {//multiple line CSV
for (List<String> csvOrder : (List<List<String>>) csvData) {
strbf.append(parseOrder(csvOrder));
}
}
exchange.getOut().setBody(strbf.toString());
}
private String parseOrder(List<String> csvData) {
System.out.println(csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2));
return csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2);
}
}
Apache Camel框架之HTTP路由
继介绍完Camel如何处理FTP,JMS接口之后,今天介绍一下系统集成的时候经常遇到的另一个接口,HTTP,一个示例需求如下图所示:(图片来源于Camel in Action)
本文给出一个简单的代码示例如何用Camel来实现这样一个应用:
1,在一个JAVA类里配置如下路由:这里只示例了HTTP的部分,其他功能实现可以参见Apache Camel框架系列的其他博客.
- public class HttpPollWithQuartzCamel {
- public static void main(String args[]) throws Exception {
- CamelContext context = new DefaultCamelContext();
- context.addRoutes(new RouteBuilder() {
- public void configure() {
- from("quartz://report?cron=10 * * * * ?&stateful=true")
- .to("http://localhost:8080/prjWeb/test.camelreq")
- .to("file:d:/temp/outbox?fileName=http.csv");
- );
- }
- });
- context.start();
- boolean loop = true;
- while (loop) {
- Thread.sleep(25000);
- }
- context.stop();
- }
- }
对上面代码的简单解释: from("quartz://report?cron=10 * * * * ?&stateful=true"),配置一个quartz Job,每隔10秒发送一个HTTP request,将收到的内容保存为文件.
这里的http url可以是任何可以访问的http url,如果在http访问时候需要代理可以这么配置:"http://www.baidu.com?proxyHost=proxy.xxx.com&proxyPort=8080"
这个例子需要用到quartz,和httpclient等jar包,可以从这里下载: http://download.csdn.net/detail/kkdelta/4051072
Apache Camel框架集成Spring
Apache Camel提供了和Spring的集成,通过Spring容器(ApplicationContext)来管理Camel的CamelContext,这样的话,就不需要写代码来控制CamelContext的初始化,启动和停止了.Camel会随着Spring的启动而启动起来.
本文将Apache Camel框架入门示例(http://blog.csdn.net/kkdelta/article/details/7231640)中的例子集成到Spring中,下面简单介绍一下集成的基本步骤.
1,新建一个Eclipse工程,将Spring3的jar包,和Camel的jar包配置到工程的classpath.
2,Route类要继承RouteBuilde,如下
- public class FileProcessWithCamelSpring extends RouteBuilder {
- @Override
- public void configure() throws Exception {
- FileConvertProcessor processor = new FileConvertProcessor();
- from("file:d:/temp/inbox?delay=30000").process(processor).to("file:d:/temp/outbox");
- }
- }
3,Processor仍然和和入门示例的代码相同.
- public class FileConvertProcessor implements Processor{
- @Override
- public void process(Exchange exchange) throws Exception {
- try {
- InputStream body = exchange.getIn().getBody(InputStream.class);
- BufferedReader in = new BufferedReader(new InputStreamReader(body));
- StringBuffer strbf = new StringBuffer("");
- String str = null;
- str = in.readLine();
- while (str != null) {
- System.out.println(str);
- strbf.append(str + " ");
- str = in.readLine();
- }
- exchange.getOut().setHeader(Exchange.FILE_NAME, "converted.txt");
- // set the output to the file
- exchange.getOut().setBody(strbf.toString());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
4,创建一个Spring的配置文件如***意要将camel的xmlns加入文件中
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:camel="http://camel.apache.org/schema/spring"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"
- default-autowire="byName" default-init-method="init">
- <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">
- <package>com.test.camel</package>
- </camelContext>
- </beans>
5,启动Spring容器,Camel会自动启动,不用像入门示例那样CamelContext context = new DefaultCamelContext(), context.addRoutes(..); context.start();
ApplicationContext ac = new ClassPathXmlApplicationContext("config/cameltest.xml");
while (true) {
Thread.sleep(2000);
}
可见,Camel可以很容易的和Spring集成.
Camel还提供了"Spring DSL"来在XML中配置Route规则,不需要用JAVA类(如上面的FileProcessWithCamelSpring )来实现route.
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:camel="http://camel.apache.org/schema/spring"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"
- default-autowire="byName" default-init-method="init">
- <bean id="fileConverter" class="com.test.camel.FileConvertProcessor"/>
- <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="file:d:/temp/inbox?delay=30000"/>
- <process ref="fileConverter"/>
- <to uri="file:d:/temp/outbox"/>
- </route>
- </camelContext>
- </beans>
与第五步一样启动Spring容器,Camel会每隔30秒轮询一下看d:/temp/inbox是否有文件,有的话则进行处理.
Apache Camel Route节点的消息载体Exchange
Exchange ID 如果不指定,Camel会默认设置一个,可以用来标识一个route的一次执行.
MEP message exchange pattern,有InOnly和InOut方式.
Exception 但route出异常的时候,抛出的异常赋值给这个变量(但在示例中似乎不是这样?).
In message 上一个节点传入的内容,是mandatory.的.
Out message 当MEP 是InOut的时候才用,不是mandatory.的.
Headers 键值对<String Object>,在下一个节点可以再取出来.
Attachments 在调用web service或者发邮件的时候放附件.
Body 消息内容,java对象.
Exchange的结构如下图所示:
Exchange可以直接作为参数在route用到的方法中使用,如果route中的方法不是Exchange,Camel会根据一套规则将Exchange中的Body转换成该方法的参数类型.
这个结构里的各部分内容可以像下面的代码示例的方式进行访问:
- from("file:d:/temp/inbox?delay=3000")
- .bean(new ExchangeManipulateBean(),"implicitConvert")
- .bean(new ExchangeManipulateBean(),"traverseExchange")
- .to("file:d:/temp/outbox");
- public class ExchangeManipulateBean {
- private static Logger log = Logger.getLogger(ExchangeManipulateBean.class);
- public void implicitConvert(String msg){
- log.debug("implicitConvert: " + msg);
- }
- public void traverseExchange(Exchange exchange){
- log.debug("exchange.getExchangeId() " + exchange.getExchangeId());
- log.debug("exchange.getPattern() " + exchange.getPattern());
- log.debug("exchange.getException() " + exchange.getException());
- Map<String, Object> props = exchange.getProperties();
- for(String key:props.keySet()){
- log.debug("props: " + key + " " + props.get(key));
- }
- Message in = exchange.getIn();//in always null,but can be accessed without exception
- log.debug("exchange.getIn() " + in);
- log.debug("in.getMessageId() " + in.getMessageId() );
- log.debug("in.getClass().getName() " + in.getClass().getName());
- Map<String, Object> headers = in.getHeaders();
- for(String key:headers.keySet()){
- log.debug("headers: " + key + " " + headers.get(key));
- }
- Object body = in.getBody();
- log.debug("body " + body + " " + body.getClass().getName());
- log.debug("in.getAttachmentNames() " + in.getAttachmentNames());
- //Message out = exchange.getOut();//whenever out is touched,information in headers are lost
2* exchange.getException()不能得到Exception,需要通过这样的方式: Exception exception = (Exception) exchange.getProperty(Exchange.EXCEPTION_CAUGHT);
3* exchange.getOut()被调用后,还需要将 "In Message"的header里的内容在copy到 "Out message"中去.通常都绕过out往下一个节点传值,可以用这种方式:exchange.getIn().setBody("$contents"),
在下一个节点仍然可以得到内容,同时前面节点set在header中的内容也存在.
4*本人用的是Camel 2.7.5,不知道后续的版本会不会把这些"pitfall"给填上.
Camel的数据转换
在做系统集成的时候,必不可少的任务就是将数据从一种格式转换为另一种格式,再把转换后的格式发到目标系统:
Camel提供的Message translator可以分为:
■ Using a Processor
■ Using beans
■ Using <transform>
1,利用processor的方式在Apache Camel框架入门示例 已经有个介绍.blog.csdn.net/kkdelta/article/details/7231640
通过实现Processor的方法process(Exchange exchange),通常要做的步骤是,从inbound消息里得到数据,转换,再放到outbound的消息里.
XXX data = exchange.getIn().getBody(XXX.class);
//转换
exchange.getOut().setBody(yyyObject);
2,利用bean的方式,写一个java bean如下:
public class OrderToCsvBean {
public String map(String custom) {
String id = custom.substring(0, 9);
String customerId = custom.substring(10, 19);
String date = custom.substring(20, 29);
String items = custom.substring(30);
String[] itemIds = items.split("@");
StringBuilder csv = new StringBuilder();
csv.append(id.trim());
csv.append(",").append(date.trim());
csv.append(",").append(customerId.trim());
for (String item : itemIds) {
csv.append(",").append(item.trim());
}
return csv.toString();
}
}
在route里的代码如下:
from("file:d:/temp/inbox1/?delay=30000").bean(new OrderToCsvBean()).to("file:d:/temp/outbox1");
Camel有一套算法来选择调用bean里的什么方法(比如说message的header里是否通过CamelBeanMethodName设置了方法名称,bean是否只有一个方法,某一个方法是否加了@Handler注解等等),这里的bean只有一个map方法,Camel会调用这个方法.
3,用route中已经定义好的transform方法,一般用的较少.
from("direct:start").transform(body().regexReplaceAll("\n", "<br/>")).to("mock:result");
4,Camel集成对许多常用的数据格式的解析:通过调用unmarshal方法可以得到解析后的结果.
CSV:
from("file:d:/temp/inbox1/?delay=30000").unmarshal().csv().process(processor).to("file:d:/temp/outbox1");
processor里可以直接得到解析后的数据:
public class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("process...");
List csvData = (List) exchange.getIn().getBody();
StringBuffer strbf = new StringBuffer();
if (csvData.get(0).getClass().equals(String.class)) {//single line CSV
strbf.append(parseOrder(csvData));
} else {//multiple line CSV
for (List<String> csvOrder : (List<List<String>>) csvData) {
strbf.append(parseOrder(csvOrder));
}
}
exchange.getOut().setBody(strbf.toString());
}
private String parseOrder(List<String> csvData) {
System.out.println(csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2));
return csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2);
}
}
Apache Camel提供了和Spring的集成,通过Spring容器(ApplicationContext)来管理Camel的CamelContext,这样的话,就不需要写代码来控制CamelContext的初始化,启动和停止了.Camel会随着Spring的启动而启动起来.
本文将Apache Camel框架入门示例(http://blog.csdn.net/kkdelta/article/details/7231640)中的例子集成到Spring中,下面简单介绍一下集成的基本步骤.
1,新建一个Eclipse工程,将Spring3的jar包,和Camel的jar包配置到工程的classpath.
2,Route类要继承RouteBuilde,如下
- public class FileProcessWithCamelSpring extends RouteBuilder {
- @Override
- public void configure() throws Exception {
- FileConvertProcessor processor = new FileConvertProcessor();
- from("file:d:/temp/inbox?delay=30000").process(processor).to("file:d:/temp/outbox");
- }
- }
3,Processor仍然和和入门示例的代码相同.
- public class FileConvertProcessor implements Processor{
- @Override
- public void process(Exchange exchange) throws Exception {
- try {
- InputStream body = exchange.getIn().getBody(InputStream.class);
- BufferedReader in = new BufferedReader(new InputStreamReader(body));
- StringBuffer strbf = new StringBuffer("");
- String str = null;
- str = in.readLine();
- while (str != null) {
- System.out.println(str);
- strbf.append(str + " ");
- str = in.readLine();
- }
- exchange.getOut().setHeader(Exchange.FILE_NAME, "converted.txt");
- // set the output to the file
- exchange.getOut().setBody(strbf.toString());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
4,创建一个Spring的配置文件如***意要将camel的xmlns加入文件中
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:camel="http://camel.apache.org/schema/spring"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"
- default-autowire="byName" default-init-method="init">
- <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">
- <package>com.test.camel</package>
- </camelContext>
- </beans>
5,启动Spring容器,Camel会自动启动,不用像入门示例那样CamelContext context = new DefaultCamelContext(), context.addRoutes(..); context.start();
ApplicationContext ac = new ClassPathXmlApplicationContext("config/cameltest.xml");
while (true) {
Thread.sleep(2000);
}
可见,Camel可以很容易的和Spring集成.
Camel还提供了"Spring DSL"来在XML中配置Route规则,不需要用JAVA类(如上面的FileProcessWithCamelSpring )来实现route.
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:camel="http://camel.apache.org/schema/spring"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"
- default-autowire="byName" default-init-method="init">
- <bean id="fileConverter" class="com.test.camel.FileConvertProcessor"/>
- <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="file:d:/temp/inbox?delay=30000"/>
- <process ref="fileConverter"/>
- <to uri="file:d:/temp/outbox"/>
- </route>
- </camelContext>
- </beans>
与第五步一样启动Spring容器,Camel会每隔30秒轮询一下看d:/temp/inbox是否有文件,有的话则进行处理.