import ;
import .*;
import ;
import ;
import ;
import ;
public class DemoTest {
public static void main(String[] args) throws Exception {
//创建Flink流处理执行环境
StreamExecutionEnvironment environment = ();
//设置并行度
(1);
//调用Flink自定义Source
DataStreamSource<Sick> source = (new DataSource());
//打印数据
();
//DataHub连接配置。
DatahubSinkFunction<RecordEntry> sinkFunction = new DatahubSinkFunction<>(
"***************************************",//EndPoint
"*************",//DataHub项目名称
"*************",//主题topic的名称
"**********",//accessID
"************"//accessKey
);
//序列化信息
DataStream<RecordEntry> operator = (new MapFunction<Sick, RecordEntry>() {
@Override
public RecordEntry map(Sick sick) throws Exception {
RecordEntry entry = getRecordEntry(sick);
return entry;
}
});
(sinkFunction);
//启动程序
();
}
//序列化方法
public static RecordEntry getRecordEntry(Sick sick) {
//注册Schema信息
RecordSchema recordSchema = new RecordSchema();
(new Field("name", ));
(new Field("sex", ));
(new Field("age", ));
(new Field("area", ));
(new Field("disease_status", ));
(new Field("date", ));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
//将数据流里面的对象根据下标赋值
(0, ());
(1, ());
(2, ());
(3, ());
(4, sick.getDisease_status());
(5, ());
(recordData);
return recordEntry;
}
}