STORM入门之(集成ElasticSearch)

时间:2021-01-23 19:59:30

本片文章基于本专题Demo进行 传送门:http://blog.csdn.net/column/details/17004.html

由于Storm集成ES过于陈旧,所以会照成连接ES客户端抛出node不可用异常,ES集群搭建为2.4.1版本 所以无论如何连接都是node不可用,解决方法修改Storm源码。

主要修改源码的连接ES部分,构建集群客户端,修正文件4个 如图:

STORM入门之(集成ElasticSearch)


修改后的源码

NewEsConfig

    TransportAddress[] getTransportAddresses() {
String[] ns = nodes;
TransportAddress[] addressArr = new TransportAddress[ns.length];
for (int i = 0; i < ns.length; i++) {
try {
addressArr[i] = new InetSocketTransportAddress(InetAddress.getByName(ns[i]), 9300);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

return addressArr;
}

Settings toBasicSettings() {
return Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("transport.tcp.compress", true)
.build();
}

NewStormElasticSearchClient

   public Client construct() {
Settings settings = esConfig.toBasicSettings();
TransportClient transportClient = TransportClient.builder().settings(settings).build().addTransportAddresses(esConfig.getTransportAddresses());
return transportClient;
}

Topology构建

 /**
* 构建ElasticBolt
*/
private static void builtEsIndexBolt(TopologyBuilder builder){
NewEsConfig esConfig = new NewEsConfig("ES-CLS", new String[]{"10.2.4.15","10.2.4.42","10.2.4.43"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
NewEsIndexBolt indexBolt = new NewEsIndexBolt(esConfig, tupleMapper);
builder.setBolt("es-bolt",indexBolt,1).shuffleGrouping("BoltA");
}

Bolt发送方式

 public void declareOutputFields(OutputFieldsDeclarer arg0) {
arg0.declare(new Fields("source", "index","type","id"));
}

入库Json

{"id":1,"ide":"eclipse","name":"Java"}

结果

STORM入门之(集成ElasticSearch)