Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中

时间:2025-02-14 18:10:23
import com.regotto.entity.SensorReading; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.HashMap; /** * @author regotto */ public class SinkTest { private static void saveToRedis(DataStream<SensorReading> dataStream) { FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder(); builder.setHost("localhost"); // *接口 SinkFunction, 核心方法 invoke dataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() { /** * 将温度数据保存为 id-temperature hash 形式到 redis * @return */ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor"); } @Override public String getKeyFromData(SensorReading sensorReading) { return sensorReading.getId(); } @Override public String getValueFromData(SensorReading sensorReading) { return sensorReading.getTemperature().toString(); } })); } private static void saveToKafka(DataStream<SensorReading> dataStream) { // 将数据输出到 Kafka 中 dataStream.map((MapFunction<SensorReading, String>) value -> value.toString()) .addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema())); } private static void saveToEs(DataStream<SensorReading> dataStream) { // 将数据输出到 ElasticSearch ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200)); //真正的 SinkFunction 是 ElasticsearchSink(使用构建者构建), ElasticsearchSinkFunction 只是负责处理以哪种方式存入 dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> { HashMap<String, String> source = new HashMap<>(); source.put("id", sensorReading.getId()); source.put("temp", sensorReading.getTemperature().toString()); source.put("time", sensorReading.getTimestamp().toString()); IndexRequest indexRequest = Requests.indexRequest() .index("sensor") .type("readingData") .source(source); requestIndexer.add(indexRequest); }).build()); } private static void saveToMysql(DataStream<SensorReading> dataStream) { /*由于性能问题, 官方未提供 mysqlSink, 将数据存入 mysql, 自定义 sink jdbc 要连接处理, 使用 RichSinkFunction, 利用 open, close 方法*/ dataStream.addSink(new RichSinkFunction<SensorReading>() { Connection connection = null; PreparedStatement insertStatement = null; @Override public void open(Configuration parameters) throws Exception { Class.forName(""); connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456"); insertStatement = connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)"); } @Override public void invoke(SensorReading value, Context context) throws Exception { insertStatement.setString(1, value.getId()); insertStatement.setLong(2, value.getTimestamp()); insertStatement.setDouble(3, value.getTemperature()); insertStatement.execute(); } @Override public void close() throws Exception { insertStatement.close(); connection.close(); } }); } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> input = env.readTextFile(""); DataStream<SensorReading> dataStream = input.map((MapFunction<String, SensorReading>) value -> { String[] split = value.split(","); return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2])); }); saveToMysql(dataStream); env.execute(); } }