java spark-streaming接收TCP/Kafka数据

时间:2023-01-29 20:56:15

 本文将展示

1、如何使用spark-streaming接入TCP数据并进行过滤;

2、如何使用spark-streaming接入TCP数据并进行wordcount;

内容如下:

1、使用maven,先解决pom依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

 

1、接收TCP数据并过滤,打印含有error的行

package com.xiaoju.dqa.realtime_streaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;


//nc -lk 9999
public class SparkStreamingTCP {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("streaming word count");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaDStream<String> lines = jssc.socketTextStream("10.93.21.21", 9999);
        JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("error");
            }
        });
        errorLines.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

执行方法

$ spark-submit realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
# 另起一个窗口
$ nc -lk 9999
# 输入数据

2、接收Kafka数据并进行计数(WordCount)

package com.xiaoju.dqa.realtime_streaming;

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

import scala.Tuple2;

// bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
public class SparkStreamingKafka {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("yarn-client").setAppName("streaming word count");
        //String topic = "offline_log_metrics";
        String topic = "test";
        int part = 1;
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
        Map<String ,Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = topic.split(";");
        for (int i=0; i<topics.length; i++) {
            topicMap.put(topics[i], 1);
        }
        List<JavaPairReceiverInputDStream<String, String>> list = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
        for (int i = 0; i < part; i++) {
            list.add(KafkaUtils.createStream(jssc,
                    "10.93.21.21:2181",
                    "bigdata_qa",
                    topicMap));
        }
        JavaPairDStream<String, String> wordCountLines = list.get(0);
        for (int i = 1; i < list.size(); i++) {
            wordCountLines = wordCountLines.union(list.get(i));
        }
        JavaPairDStream<String, Integer> counts = wordCountLines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>(){
            @Override
            public Iterable<String> call(Tuple2<String, String> stringStringTuple2){
                List<String> list2 = null;
                try {
                    if ("".equals(stringStringTuple2._2) || stringStringTuple2._2 == null) {
                        System.out.println("_2 is null");
                        throw new Exception("_2 is null");
                    }
                    list2 = Arrays.asList(stringStringTuple2._2.split(" "));
                } catch (Exception ex) {
                    ex.printStackTrace();
                    System.out.println(ex.getMessage());
                }
                return list2;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = null;
                try {
                    if (s==null || "".equals(s)) {
                        tuple2 = new Tuple2<String, Integer>(s, 0);
                        throw new Exception("s is null");
                    }
                    tuple2 = new Tuple2<String, Integer>(s, 1);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                return tuple2;
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        counts.print();

        jssc.start();
        try {
            jssc.awaitTermination();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            jssc.close();
        }
    }
}

执行方法

 $ spark-submit --queue=root.XXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
# 另开一个窗口,启动kafka生产者
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 输入数据