Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过Structured Streaming读取kafka的数据

时间:2021-06-08 20:57:23

将arvo格式数据发送到kafka的topic

第一步:定制avro schema:

{  
      "type": "record",  
      "name": "userlog",  
      "fields": [
            {"name": "ip","type": "string"},
            {"name": "identity","type":"string"},
            {"name": "userid","type":"int"},
            {"name": "time","type": "string"},
            {"name": "requestinfo","type": "string"},
            {"name": "state","type": "int"},
            {"name": "responce","type": "string"},
            {"name": "referer","type": "string"},
            {"name": "useragent","type": "string"},
            {"name": "timestamp","type": "long"}
      ]  
}

定义一个avro的schema文件userlog.avsc,内容如上。

该schema包含字段:ip:string,identity:string,userid:int,time:string,requestinfo:string,state:int,responce:string,referer:string,useragent:string,timestamp:long。这些字段用来描述一个网络请求日志。

第二步:创建发送数据到topic的producer对象:

要实现发送数据到kafka上,我们必须通过kafka api生成一个producer对象,用于向kafka生产数据:

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        // 声明kafka broker
        props.put("bootstrap.servers", "192.168.0.121:9092");
        Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props);
        return procuder;
    }

此时需要引入kafka的开发jar包:kafka-clients-0.10.0.1.jar。

第三步:解析avro schema文件为Schema对象,并通过schema对象创建record对象(GenericRecord)

解析avro schema文件为Schema对象,需要依赖包:avro-1.7.5.jar

这里我们定义一个SchemaUtil.java类,该方法提供了一个getAvroSchemaFromHDFSFile方法用来实现从hdfs上读取avro文件,并把该avro文件解析为schema对象。

package com.dx.streaming.producer;

import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class SchemaUtil {
    public static Schema getAvroSchemaFromHDFSFile(String hdfsAvroFile) throws Exception {
        InputStream inputStream;
        Path pt = new Path(hdfsAvroFile);        
        Schema schema = null;
        FileSystem fs =null;
        
        try {
            fs = FileSystem.get(new Configuration());
            if (!fs.exists(pt)) {
                throw new Exception(pt+" file is not exists");
            }
            
            inputStream = fs.open(pt);
            Schema.Parser parser = new Schema.Parser();
            schema = parser.parse(inputStream);
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        } finally {
            if(fs!=null){
                try {
                    fs.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        return schema;
    }
}

通过schema对象创建record对象(GenericRecord),该record存储了实际的生产数据。

            Random random = new Random();
            String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255);
            String identity = UUID.randomUUID().toString();
            int userid = random.nextInt(100);

            SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd ");
            Date date= new Date();
            String yyyyMMdd    =dfs.format(date);            
            String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60);
            String requestInfo = "....";
            int state = random.nextInt(600);
            String responce = "...";
            String referer = "...";
            String useragent = "...";
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            GenericRecord record = new GenericData.Record(schema);
            record.put("ip", ip);
            record.put("identity", identity);
            record.put("userid", userid);
            record.put("time", time);
            record.put("requestinfo", requestInfo);
            record.put("state", state);
            record.put("responce", responce);
            record.put("referer", referer);
            record.put("useragent", useragent);
            record.put("timestamp", format.parse(time).getTime());

备注:上边代码就是按照schema创建了一个GenericRecord对象,该GenericRecord对象用来存储了真是的数据。

而且record对象可以通过Injection<GenericRecord, byte[]>对象转化为byte[],更便于在生产数据过程中传输。

String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
byte[] bytes = recordInjection.apply(record);

实际上在consumer端,接收数据时:当consumer接收到数据时,可以通过Injection<GenericRecord, byte[]> recordInjection对象对接收到的byte[]数据进行avro解析,解析为一个GenericRecord对象。

        Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.121:9092,192.168.0.122:9092");
        props.put("group.id", "testgroup");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());

        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        consumer.subscribe(Collections.singletonList(“topic name”));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse("avro schema file path");
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    String info = String.format(String.format("topic = %s, partition = %s, offset = %d, customer = %s,country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), genericRecord.get("str1")));
                    logger.info(info);
                }
            }
        } finally {
            consumer.close();
        }

备注:具体更多详情请参考《Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

第四步:通过producer发送数据到topic:

发送byte[]数据到kafka:需要先铜鼓kafka api生成一个producer对象,将上边的record数据转化为byte[]格式,调用producre的send方法发送数据。

Producer<String, byte[]> procuder = createProducer();
// 根据avro schema文件生成schema对象。
// 根据schema对象,生成record,并把数据存储到record中。
// 根据schema对象,生成record转化为byte[]的转化器Injection<GenericRecord, byte[]>。
try {
    byte[] bytes = recordInjection.apply(record);

    ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes);
    procuder.send(msg);
} catch (Exception e) {
    e.printStackTrace();
}

上边的四个步骤已经简单的介绍了如何把一个待生产的数据转化为record对象,并把record对象转化为byte[]类型,发送到kafka的几个重要步骤及其实现思路。下边的代码就是一个完整的实现:

package com.dx.streaming.producer;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class TestProducer {
    private static final String avroFilePath = "D:\\Java_Study\\workspace\\kafka-streaming-learn\\conf\\avro\\userlog.avsc";
    //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
    private static final String topic = "t-my";

    public static void main(String[] args) throws Exception {
        int size = 0;
        String appName = "Test Avro";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(appName);
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        Producer<String, byte[]> procuder = createProducer();
        while (true) {
            Random random = new Random();
            String ip = random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255) + ":" + random.nextInt(255);
            String identity = UUID.randomUUID().toString();
            int userid = random.nextInt(100);

            SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd ");
            Date date= new Date();
            String yyyyMMdd    =dfs.format(date);            
            String time = yyyyMMdd+ random.nextInt(24) + ":" + random.nextInt(60) + ":" + random.nextInt(60);
            String requestInfo = "....";
            int state = random.nextInt(600);
            String responce = "...";
            String referer = "...";
            String useragent = "...";
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            GenericRecord record = new GenericData.Record(schema);
            record.put("ip", ip);
            record.put("identity", identity);
            record.put("userid", userid);
            record.put("time", time);
            record.put("requestinfo", requestInfo);
            record.put("state", state);
            record.put("responce", responce);
            record.put("referer", referer);
            record.put("useragent", useragent);
            record.put("timestamp", format.parse(time).getTime());

            System.out.println("ip:" + ip + ",identity:" + identity + ",userid:" + userid + ",time:" + time + ",timestamp:" + format.parse(time).getTime() + "\r\n");

            try {
                byte[] bytes = recordInjection.apply(record);

                ProducerRecord<String, byte[]> msg = new ProducerRecord<String, byte[]>(topic, bytes);
                procuder.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }

            size++;

            if (size % 100 == 0) {
                Thread.sleep(100);
                if (size > 1000) {
                    break;
                }
            }
        }

        // 列出topic的相关信息
        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
        partitions = procuder.partitionsFor(topic);
        for (PartitionInfo p : partitions) {
            System.out.println(p);
        }

        System.out.println("send message over.");
        procuder.close(100, java.util.concurrent.TimeUnit.MILLISECONDS);
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        // 声明kafka broker
        props.put("bootstrap.servers", "192.168.0.121:9092");
        Producer<String, byte[]> procuder = new KafkaProducer<String, byte[]>(props);
        return procuder;
    }
}

此时pom.xm配置如下:

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.11</artifactId>
            <version>0.9.5</version>
        </dependency>

        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>3.2.0</version>
            <type>jar</type>
        </dependency>        
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

声明:若为了满足上边代码,这里的pom配置中个别dependency是多余的,但是下边的Structured Streaming端是需要的。

测试的打印结果:

ip:229:21:203:40,identity:ae6fde10-4687-4682-a760-d9076892eb45,userid:9,time:2018-07-12 12:57:24,timestamp:1531371444000 ip:105:224:103:61,identity:edef8c93-da4e-46d4-bfd3-551b74e6f4df,userid:1,time:2018-07-12 23:57:23,timestamp:1531411043000 ip:252:230:234:213,identity:80e00a81-f6dd-4bf6-93a1-95154babdd08,userid:59,time:2018-07-12 9:36:37,timestamp:1531359397000 ip:76:63:136:50,identity:630b66fb-95d7-4c63-a638-6f24396987d0,userid:33,time:2018-07-12 19:18:18,timestamp:1531394298000
Partition(topic = t-my, partition = 0, leader = 0, replicas = [0,], isr = [0,] send message over.

通过Structured Streaming读取kafka的数据

注意事项:

下边是采用structured streaming方式来编程,而非spark streaming方式来编程;

它们的差别在于使用的API不同,原理上也不尽相同,需要开发人员自己清楚自己使用的是什么技术;

当使用structured streaming编程,且使用kafka+spark时,你需要引入的maven依赖如下:

<!-- spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!-- spark-sql -->
        
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!-- spark-core -->
        
        <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!-- Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html -->

        <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide -->
        <!--
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        -->
        <!-- Spark Streaming Programming Guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide -->

        <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html -->
        <!-- 
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        -->
        <!-- Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0  or higher) http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html -->

        <!-- kafka client -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <!-- kafka client -->

        <!-- avro -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>com.twitter</groupId>
            <artifactId>bijection-avro_2.10</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.4</version>
        </dependency>
        <!-- avro -->

既然是读取kafka的avro的record的byte[]格式记录,这里就需要对其进行byte[]进行解析(解析为行:这里先将byte[]转化为record,再将record解析为了object[],之后通过RowFactory.create(object[])转化为Row的格式),解析函数独立定义了一个udf对象来处理:

package com.dx.streaming.producer;

import java.text.SimpleDateFormat;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.api.java.UDF1;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

public class AvroParserUDF implements UDF1<byte[], Row> {
    private static final long serialVersionUID = -2369806025607566774L;
    private String avroSchemaFilePath=null;
    private transient Schema schema = null;
    private transient Injection<GenericRecord, byte[]> recordInjection = null;

    public AvroParserUDF(String avroSchemaFilePath) {
        this.avroSchemaFilePath=avroSchemaFilePath;
    }

    public Row call(byte[] data) throws Exception {
        if(this.recordInjection==null){            
            this.schema = SchemaUtil.getAvroSchemaFromHDFSFile(this.avroSchemaFilePath);        
            this.recordInjection = GenericAvroCodecs.toBinary(schema);        
        }
        
        GenericRecord record = this.recordInjection.invert(data).get();

        int timeIndex = record.getSchema().getFields().indexOf(record.getSchema().getField("time"));

        int iColumns = record.getSchema().getFields().size();
        Object[] values = new Object[iColumns];
        for (int i = 0; i < iColumns; i++) {
            values[i] = record.get(i);
            if (values[i] instanceof org.apache.avro.util.Utf8) {
                values[i] = values[i].toString();
            }
        }
//        SimpleDateFormat dfs=new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");
//        SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd 00:00:00");
//        System.out.println(df.format(dfs.parse("2018-07-03 21:23:58")));
//        output 2018-07-03 00:00:00
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:MM:SS");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
        values[timeIndex] = df.format(sdf.parse((String) values[timeIndex]));

        return RowFactory.create(values);
    }
}

实现思路:使用sparkSession.readStream().format("kafka")方式读取kafka指定的topic,对kafka的byte[]格式数据转化(转化为Row),对Row进行操作。

package com.dx.streaming.producer;

import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import com.databricks.spark.avro.SchemaConverters;

public class TestConsumer {
    //private static final String avroFilePath = "D:\\Java_Study\\workspace\\kafka-streaming-learn\\conf\\avro\\userlog.avsc";
    private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
    private static final String topic = "t-my";

    public static void main(String[] args) throws Exception {
        String appName = "Test Avro";
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName);
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        Map<String, String> kafkaOptions = new HashMap<String, String>();
        kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092");
        
        Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);    
        AvroParserUDF udf = new AvroParserUDF(avroFilePath);
        StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();        
        sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields()));

        Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY())
                .selectExpr("deserialize(value) as row").select("row.*");

        stream.printSchema();

        // Print new data to console
        StreamingQuery query = stream.writeStream().format("console").start();
        
        try {
            query.awaitTermination();
            sparkSession.streams().awaitAnyTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}

打包,提交用spark-submit:

[spark@master work]$ more submit.sh 
#! /bin/bash
jars=""

for file in `ls /home/spark/work/jars/*.jar`
do
        jars=$file,$jars
        #echo $jars
done


echo "------------------------------------"
echo $jars
echo "------------------------------------"

/opt/spark-2.2.1-bin-hadoop2.7/bin/spark-submit \
--jars $jars \
--master yarn \
--verbose \
--driver-java-options "-XX:+TraceClassPaths" \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 1G \
--class com.dx.streaming.producer.TestConsumer \
/home/spark/work/kafka-streaming-test.jar

#--properties-file /home/spark/work/conf/spark-properties.conf \

jars:

[spark@master work]$ cd jars
[spark@master jars]$ ls
bijection-avro_2.11-0.9.5.jar  kafka-clients-0.10.0.1.jar  spark-sql_2.11-2.2.0.jar             spark-streaming_2.11-2.2.0.jar
bijection-core_2.11-0.9.5.jar  spark-avro_2.11-3.2.0.jar   spark-sql-kafka-0-10_2.11-2.2.0.jar  spark-streaming-kafka-0-10_2.11-2.2.0.jar

打印结果(备注这里是使用spark-submit提交方式):

+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
|            ip|            identity|userid|               time|requestinfo|state|responce|referer|useragent|    timestamp|
+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
|36:177:233:179|27be47c9-bcbc-4cd...|    27|2019-11-03 00:00:00|       ....|   88|     ...|    ...|      ...|1530624238000|
|251:92:177:212|d711ca29-e2a7-4fb...|    24|2020-04-03 00:00:00|       ....|  129|     ...|    ...|      ...|1530570507000|
|26:177:105:119|a98020dd-4fcb-4a0...|     4|2018-11-03 00:00:00|       ....|  322|     ...|    ...|      ...|1530619861000|
|161:25:246:252|11bd7af7-b9db-428...|     3|2021-10-03 00:00:00|       ....|  249|     ...|    ...|      ...|1530582412000|
| 48:131:47:112|c519b7cb-0265-4db...|     6|2021-09-03 00:00:00|       ....|  234|     ...|    ...|      ...|1530578717000|
|  43:74:113:73|e5888022-97ad-425...|    99|2019-02-03 00:00:00|       ....|  406|     ...|    ...|      ...|1530584052000|
|230:162:238:87|ae9ecc0d-6df5-418...|    55|2022-09-03 00:00:00|       ....|  128|     ...|    ...|      ...|1530561467000|
|  0:138:183:88|2565b673-baed-4c9...|    85|2019-03-03 00:00:00|       ....|  460|     ...|    ...|      ...|1530548103000|
|210:30:157:209|59a0f81c-7dfc-444...|    31|2021-07-03 00:00:00|       ....|  179|     ...|    ...|      ...|1530632595000|
| 129:251:8:241|5483365c-79ef-429...|    96|2022-03-03 00:00:00|       ....|  368|     ...|    ...|      ...|1530600670000|
|  32:70:106:42|d1dfa208-2a3f-4fe...|    40|2020-01-03 00:00:00|       ....|  184|     ...|    ...|      ...|1530559512000|
|95:109:238:129|709eebbc-13fc-4e9...|    11|2019-02-03 00:00:00|       ....|  463|     ...|    ...|      ...|1530623652000|
|123:171:142:15|0a4cc7d1-bdac-442...|    79|2022-08-03 00:00:00|       ....|  417|     ...|    ...|      ...|1530590205000|
| 72:141:54:221|b94d268a-a464-4d7...|    94|2021-07-03 00:00:00|       ....|    1|     ...|    ...|      ...|1530567806000|
|201:79:234:119|f1ca2db5-1688-459...|    66|2018-07-03 00:00:00|       ....|  531|     ...|    ...|      ...|1530565671000|
|188:41:197:190|fe3d9faf-5376-4bb...|    86|2022-08-03 00:00:00|       ....|  522|     ...|    ...|      ...|1530568567000|
| 197:115:58:51|1c9494e2-5dcc-4a4...|    73|2018-11-03 00:00:00|       ....|  214|     ...|    ...|      ...|1530630682000|
| 213:242:0:177|e06cd131-da6d-499...|    11|2022-05-03 00:00:00|       ....|  530|     ...|    ...|      ...|1530604390000|
| 70:109:32:120|37c95b44-d692-48e...|    66|2018-07-03 00:00:00|       ....|    7|     ...|    ...|      ...|1530576459000|
|100:203:217:78|cff08213-b679-4b2...|    51|2020-04-03 00:00:00|       ....|  128|     ...|    ...|      ...|1530548883000|
+--------------+--------------------+------+-------------------+-----------+-----+--------+-------+---------+-------------+
only showing top 20 rows

18/07/13 05:58:36 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "efd34a20-36ae-48a5-89c3-2107bab3cbca",
  "runId" : "a73386c3-34cf-43ec-abe8-904671e269c8",
  "name" : null,
  "timestamp" : "2018-07-12T21:58:31.590Z",
  "numInputRows" : 19800,
  "processedRowsPerSecond" : 3887.6889848812093,
  "durationMs" : {
    "addBatch" : 3595,
    "getBatch" : 252,
    "getOffset" : 612,
    "queryPlanning" : 122,
    "triggerExecution" : 5092,
    "walCommit" : 487
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[t-my]]",
    "startOffset" : null,
    "endOffset" : {
      "t-my" : {
        "0" : 19800
      }
    },
    "numInputRows" : 19800,
    "processedRowsPerSecond" : 3887.6889848812093
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@20bb170f"
  }
}

参考:

在Spark结构化流readStream、writeStream 输入输出,及过程ETL

Spark Structured Streaming入门编程指南

Structured Streaming 实现思路与实现概述

Spark结构式流编程指南

Kafka 如何读取offset topic内容 (__consumer_offsets)