【尚硅谷Java版】Flink中DataStream API篇之读取数据源
- 使用4种方式读取数据源
- 方式一:从文件中读取数据
- 方式二:从集合中读取数据
- 方式三:从元素中读取数据
- 方式四:从socket文本流中读取
- 测试
- 测试结果
- 方式五:从kafka中读取数据
flink可以从各种来源获取数据,然后构建DataStream进行转换处理,一般把数据的输入称为数据源,读取数据的算子就称为源算子
(在代码里面调用的那个API)
在代码中读取数据源的方式有以下三种:
- 从文件中读取数据
- 从集合中读取数据
- 从元素中读取数据
- 从kafka中读取数据
在使用以上4种方式读取数据源之前先创建一个名称为Event的类
注意:
这里我们需要注意以下几点:
*1、类必须是公有的
*2、所有属性都是公有的
*3、所有属性的类型都是可以序列化的
package .chapter05;
import ;
/**
* @author potential
*/
public class Event {
/**
* 这里我们需要注意以下几点:
* 1、类必须是公有的
* 2、所有属性都是公有的
* 3、所有属性的类型都是可以序列化的
*/
public String user;
public String url;
public Long timestamp;
public String getUser() {
return user;
}
public void setUser(String user) {
= user;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
= url;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
= timestamp;
}
public Event() {
}
public Event(String user, String url, Long timestamp) {
= user;
= url;
= timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
使用4种方式读取数据源
方式一:从文件中读取数据
(1)创建一个input
包,在里面创建文件
(2)并在文件中添加如下内容(可随意添加,也可直接复制我这里的直接使用即可):
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10, 9000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
(3)编写测试类SourceTest
,在测试类中使用方式一:从文件中读取数据源
//(1)从文件中读取数据 批量处理 常用
DataStreamSource<String> stream1 = env.readTextFile("input/");
- 1
- 2
方式二:从集合中读取数据
在测试类中使用方式二:从集合中读取数据源
ArrayList<Integer> nums=new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);
ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Mary","./home",1000L));
events.add(new Event("Bob","./cart",2000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
方式三:从元素中读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
- 1
- 2
- 3
- 4
方式四:从socket文本流中读取
//(4)、从socket文本流中读取
DataStreamSource<String> stream4 = env.socketTextStream("hostname", 7777);
- 1
- 2
测试过程可以参考:/junR_980218/article/details/125375722
测试
将上面的三种方式同时写入一个测试类SourceTest
当中,如下代码所示。进行测试
package .chapter05;
import ;
import ;
import ;
/**
* @author potential
*/
public class SourceTest {
public static void main(String[] args) throws Exception {
//1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//方便调试,对于全局的并行度设为1
env.setParallelism(1);
/**
* 2、从不同的来源读取数据
*/
//(1)从文件中读取数据 批量处理 常用
DataStreamSource<String> stream1 = env.readTextFile("input/");
//(2)从集合中读取数据 常用于测试
ArrayList<Integer> nums=new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);
ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Mary","./home",1000L));
events.add(new Event("Bob","./cart",2000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);
//(3)、从元素读取数据 常用于测试
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//(4)、从socket文本流中读取
DataStreamSource<String> stream4 = env.socketTextStream("hostname", 7777);
stream1.print("1");
numStream.print("nums");
stream2.print("2");
stream3.print("3");
stream4.print("4");
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
测试结果
方式五:从kafka中读取数据
//(5)从kafka中读取数据——即是消费数据 消费者模型
Properties properties = new Properties();
//"hadoop102:9092":指的是 虚拟机的名称和端口号
properties.setProperty("","hadoop102:9092");
properties.setProperty("","consumer-group");
properties.setProperty("","");
properties.setProperty("","");
properties.setProperty("","latest");
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
kafkaStream.print();
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
测试的话,在虚拟机中开启zookeeper、kafka、并且创建一个生产者、创建一个主题,然后输入数据进行测试