【尚硅谷Java版】Flink中DataStream API篇之读取数据源

时间:2024-10-30 08:06:17

尚硅谷Java版】Flink中DataStream API篇之读取数据源

  • 使用4种方式读取数据源
    • 方式一:从文件中读取数据
    • 方式二:从集合中读取数据
    • 方式三:从元素中读取数据
    • 方式四:从socket文本流中读取
  • 测试
    • 测试结果
    • 方式五:从kafka中读取数据

        flink可以从各种来源获取数据,然后构建DataStream进行转换处理,一般把数据的输入称为数据源,读取数据的算子就称为源算子(在代码里面调用的那个API)
        在代码中读取数据源的方式有以下三种:

  1. 从文件中读取数据
  2. 从集合中读取数据
  3. 从元素中读取数据
  4. 从kafka中读取数据

在使用以上4种方式读取数据源之前先创建一个名称为Event的类

注意:
这里我们需要注意以下几点:
*1、类必须是公有的
*2、所有属性都是公有的
*3、所有属性的类型都是可以序列化的

package .chapter05;

import ;

/**
 * @author potential
 */
public class Event {
    /**
     * 这里我们需要注意以下几点:
     *      1、类必须是公有的
     *      2、所有属性都是公有的
     *      3、所有属性的类型都是可以序列化的
     */
    public String user;
    public String url;
    public Long timestamp;

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
         = user;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
         = url;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
         = timestamp;
    }

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
         = user;
         = url;
         = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

使用4种方式读取数据源

方式一:从文件中读取数据

(1)创建一个input包,在里面创建文件
在这里插入图片描述
(2)并在文件中添加如下内容(可随意添加,也可直接复制我这里的直接使用即可):

 Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10, 9000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

(3)编写测试类SourceTest,在测试类中使用方式一:从文件中读取数据源

   //(1)从文件中读取数据 批量处理   常用
        DataStreamSource<String> stream1 = env.readTextFile("input/");
  • 1
  • 2

方式二:从集合中读取数据

在测试类中使用方式二:从集合中读取数据源

 ArrayList<Integer> nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

方式三:从元素中读取数据

  DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
  • 1
  • 2
  • 3
  • 4

方式四:从socket文本流中读取

 //(4)、从socket文本流中读取
        DataStreamSource<String> stream4  = env.socketTextStream("hostname", 7777);
  • 1
  • 2

测试过程可以参考:/junR_980218/article/details/125375722

测试

将上面的三种方式同时写入一个测试类SourceTest当中,如下代码所示。进行测试

package .chapter05;

import ;
import ;

import ;

/**
 * @author potential
 */
public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //方便调试,对于全局的并行度设为1
        env.setParallelism(1);
        /**
         * 2、从不同的来源读取数据
         */
        //(1)从文件中读取数据 批量处理   常用
        DataStreamSource<String> stream1 = env.readTextFile("input/");

        //(2)从集合中读取数据  常用于测试
        ArrayList<Integer> nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);

        //(3)、从元素读取数据  常用于测试
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
          //(4)、从socket文本流中读取
        DataStreamSource<String> stream4  = env.socketTextStream("hostname", 7777);

        stream1.print("1");
        numStream.print("nums");
        stream2.print("2");
        stream3.print("3");
        stream4.print("4");

        env.execute();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

测试结果

在这里插入图片描述

方式五:从kafka中读取数据

//(5)从kafka中读取数据——即是消费数据 消费者模型
Properties properties = new Properties();
//"hadoop102:9092":指的是 虚拟机的名称和端口号
properties.setProperty("","hadoop102:9092");
properties.setProperty("","consumer-group");
properties.setProperty("","");
properties.setProperty("","");
properties.setProperty("","latest");

DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));

kafkaStream.print();
env.execute();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

测试的话,在虚拟机中开启zookeeper、kafka、并且创建一个生产者、创建一个主题,然后输入数据进行测试
在这里插入图片描述