Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中
import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
/**
* @author regotto
*/
public class SinkTest {
private static void saveToRedis(DataStream<SensorReading> dataStream) {
FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
builder.setHost("localhost");
// *接口 SinkFunction, 核心方法 invoke
dataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() {
/**
* 将温度数据保存为 id-temperature hash 形式到 redis
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor");
}
@Override
public String getKeyFromData(SensorReading sensorReading) {
return sensorReading.getId();
}
@Override
public String getValueFromData(SensorReading sensorReading) {
return sensorReading.getTemperature().toString();
}
}));
}
private static void saveToKafka(DataStream<SensorReading> dataStream) {
// 将数据输出到 Kafka 中
dataStream.map((MapFunction<SensorReading, String>) value -> value.toString())
.addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema()));
}
private static void saveToEs(DataStream<SensorReading> dataStream) {
// 将数据输出到 ElasticSearch
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
//真正的 SinkFunction 是 ElasticsearchSink(使用构建者构建), ElasticsearchSinkFunction 只是负责处理以哪种方式存入
dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> {
HashMap<String, String> source = new HashMap<>();
source.put("id", sensorReading.getId());
source.put("temp", sensorReading.getTemperature().toString());
source.put("time", sensorReading.getTimestamp().toString());
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingData")
.source(source);
requestIndexer.add(indexRequest);
}).build());
}
private static void saveToMysql(DataStream<SensorReading> dataStream) {
/*由于性能问题, 官方未提供 mysqlSink, 将数据存入 mysql, 自定义 sink
jdbc 要连接处理, 使用 RichSinkFunction, 利用 open, close 方法*/
dataStream.addSink(new RichSinkFunction<SensorReading>() {
Connection connection = null;
PreparedStatement insertStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
insertStatement
= connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)");
}
@Override
public void invoke(SensorReading value, Context context) throws Exception {
insertStatement.setString(1, value.getId());
insertStatement.setLong(2, value.getTimestamp());
insertStatement.setDouble(3, value.getTemperature());
insertStatement.execute();
}
@Override
public void close() throws Exception {
insertStatement.close();
connection.close();
}
});
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> input = env.readTextFile("");
DataStream<SensorReading> dataStream = input.map((MapFunction<String, SensorReading>) value -> {
String[] split = value.split(",");
return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
});
saveToMysql(dataStream);
env.execute();
}
}