5.5 输出算子
5.5.1 概述
- print也是一种输出类PrintSinkFunction
创建了一个PrintSinkFunction操作,然后所谓addSink方法的参数
PrintSinkFunction这个类继承自RichSinkFunction富函数类
- RichSourceFunction类
- 继承了AbstractRichFunction富函数类
因此就可以调用富函数类的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等
- RichSourceFunction类又实现了SinkFunction这个接口,所以本质上也是SinkFunction
- SinkFunction接口的抽象方法有invoke,传入是value,以及当前的上下文
- 如果需要自定义输出算子
可以调用DataStream的addSink方法
然后传入自己实现的SinkFunction
- flink提供的第三方系统连接器
5.5.2 输出到文件
- StreamFileSink流失文件输出类
- 来源
继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)
- 底层
底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储
- 创建实例
使用Builder构建器构建
)
RowFormatBuilder是行编码
BulkFormatBuilder是列存储编码格式
- 代码
public class SinkToFileTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
//2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象
//调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder
// 其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器
//写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件
//然后再使用builder创建实例
StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存
DefaultRollingPolicy.builder()//使用builder构建实例
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来
.build()
)
.build();
//1.写入文件调用addSink()方法,并传入SinkFunction
stream
.map(data -> data.toString())//把Event类型转换成String
.addSink(streamingFileSink);
env.execute();
}
}
- 结果
5.5.3 输出到kafka
- 代码
public class SinkToKafka {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.从kafka中读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","hadoop2:9092");
properties.setProperty("group.id", "consumer-group");
DataStreamSource<String> kafkaStream = env.addSource(
new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
//2.用flink进行简单的etl处理转换
SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
//3.结果数据写入kafka
//FlinkKafkaProducer传参borckList,topicid,序列化
result.addSink(new FlinkKafkaProducer<String>(
"hadoop2:9092","events",new SimpleStringSchema()));
env.execute();
}
}
- kafka输出结果
5.5.4 输出到redis
- 引入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
- 代码 继承自RichSinkFunction
去调构造方法,换入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令
FlinkJedisPoolConfig用这个没毛病,直接继承的FlinkJedisConfigBase
- 代码
public class SinkToRedis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.输入ClickSource是自定义输入
DataStreamSource<Event> stream = env.addSource(new ClickSource());
//2.创建一个jedis连接配置
//FlinkJedisPoolConfig直接继承的FlinkJedisConfigBase
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop2")
.build();
//3.写入redis
stream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
//3.自定义类实现 redisMapper接口
public static class MyRedisMapper implements RedisMapper<Event>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表
}
@Override
public String getKeyFromData(Event data) {
return data.user;
}
@Override
public String getValueFromData(Event data) {
return data.url;
}
}
}
- 结果
5.5.5 输出到ElasticSearch
- 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
- 代码
传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>
public class SinToES {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.输入
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
//2.定义hosts的列表
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop",9200));
//3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法
//向es发送请求,并插入数据
ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
@Override
//输入,运行上下文,发送任务请求
public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
HashMap<String, String> map = new HashMap<>();
map.put(element.user, element.url);
//构建一个indexrequest
IndexRequest request = Requests.indexRequest()
.index("clicks")
.type("types")
.source(map);
indexer.add(request);
}
};
//4.写入es
//传入参数是List<HttpHost>和ElasticsearchSinkFunction<T>
stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build());
env.execute();
}
}
- 结果
5.5.6 输入到Mysql
- 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
- 代码
三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
单一抽象方法,lambda使用
public class SinkToMysql {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.输入
DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L));
//三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置
stream.addSink(JdbcSink.sink(
"INSERT INTO clicks (user,url) VALUES(?,?)",
((statement,event)->{
statement.setString(1,event.user);
statement.setString(2,event.url);
}),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test2")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
env.execute();
}
}
- mysql前期准备
- 创建mysql的test2
- 创建clicks表
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
- 结果
5.5.7 自定义Sink输出
- 分析
调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close
- 导入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
- 代码
略