本片文章基于本专题Demo进行 传送门:http://blog.csdn.net/column/details/17004.html
由于Storm集成ES过于陈旧,所以会照成连接ES客户端抛出node不可用异常,ES集群搭建为2.4.1版本 所以无论如何连接都是node不可用,解决方法修改Storm源码。
主要修改源码的连接ES部分,构建集群客户端,修正文件4个 如图:
修改后的源码
NewEsConfig
TransportAddress[] getTransportAddresses() {
String[] ns = nodes;
TransportAddress[] addressArr = new TransportAddress[ns.length];
for (int i = 0; i < ns.length; i++) {
try {
addressArr[i] = new InetSocketTransportAddress(InetAddress.getByName(ns[i]), 9300);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
return addressArr;
}
Settings toBasicSettings() {
return Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("transport.tcp.compress", true)
.build();
}
NewStormElasticSearchClient
public Client construct() {
Settings settings = esConfig.toBasicSettings();
TransportClient transportClient = TransportClient.builder().settings(settings).build().addTransportAddresses(esConfig.getTransportAddresses());
return transportClient;
}
Topology构建
/**
* 构建ElasticBolt
*/
private static void builtEsIndexBolt(TopologyBuilder builder){
NewEsConfig esConfig = new NewEsConfig("ES-CLS", new String[]{"10.2.4.15","10.2.4.42","10.2.4.43"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
NewEsIndexBolt indexBolt = new NewEsIndexBolt(esConfig, tupleMapper);
builder.setBolt("es-bolt",indexBolt,1).shuffleGrouping("BoltA");
}
Bolt发送方式
public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("source", "index","type","id"));
}
入库Json
{"id":1,"ide":"eclipse","name":"Java"}