构建一个spark流样本应用程序。

时间:2021-01-30 20:52:32

I am trying to build a simple spark job that reads from a kafka cluster, counts the words and stores in Hbase.

我正在尝试构建一个简单的spark工作,从kafka集群中读取数据,计算Hbase中的单词和存储。

The code i am using is based on the example from here:

我使用的代码基于以下示例:

Importing data in Hbase using Spark and Kafka

使用Spark和Kafka在Hbase中导入数据。

This is the scala code:

这是scala代码:

package org.example.main

import java.util.Properties
import org.apache.hadoop.hbase.{ HBaseConfiguration, HColumnDescriptor, HTableDescriptor }
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Put }
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

object scalaConsumer {
  def main(args : Array[String]) {

    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "localhost:2181")

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable("testTable")) {
      val tableDesc = new HTableDescriptor("testTable")
      tableDesc.addFamily(new HColumnDescriptor("metric"))
      admin.createTable(tableDesc)
    }

    // setup streaming context
    val ssc = new StreamingContext("master", "MetricAggregatorTest", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
    ssc.checkpoint("checkpoint")
    val topics = "test"
    val numThreads = 2
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val zkQuorum = "localhost:2181"
    val lines = KafkaUtils.createStream(ssc, zkQuorum, "consumer-group", topicpMap)
      .map { case (key, value) => ((key, Math.floor(System.currentTimeMillis() / 60000).toLong * 60), value.toInt) }

    val aggr = lines.reduceByKeyAndWindow(add _, Minutes(1), Minutes(1), 2)
    val tableName = "testTable"
    aggr.foreach(line => saveToHBase(line, zkQuorum, tableName))

    ssc.start

    ssc.awaitTermination
  }

  def add(a : Int, b : Int) = { (a + b) }

  def saveToHBase(rdd : RDD[((String, Long), Int)], zkQuorum : String, tableName : String) = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkQuorum)

    val jobConfig = new JobConf(conf)
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])

    new PairRDDFunctions(rdd.map { case ((metricId, timestamp), value) => createHBaseRow(metricId, timestamp, value) }).saveAsHadoopDataset(jobConfig)
  }

  def createHBaseRow(metricId : String, timestamp : Long, value : Int) = {
    val record = new Put(Bytes.toBytes(metricId + "~" + timestamp))

    record.add(Bytes.toBytes("metric"), Bytes.toBytes("col"), Bytes.toBytes(value.toString))

    (new ImmutableBytesWritable, record)
  }

}

and the pom.xml file:

砰的一声。xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.test.scalConsumer</groupId>
    <artifactId>scalConsumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>"Spark Test"</name>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
        <repository>
            <id>maven-hadoop</id>
            <name>Hadoop Releases</name>
            <url>https://repository.cloudera.com/content/repositories/releases/</url>
        </repository>
        <repository>
            <id>cloudera-repos</id>
            <name>Cloudera Repos</name>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>org.example.main.scalaConsumer</mainClass>
                        </manifest>
                    </archive>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>  
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
        <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>0.90.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>0.98.5-hadoop2</version>
        </dependency>
        <dependency>
                 <groupId>org.apache.hbase</groupId>
                 <artifactId>hbase-client</artifactId>
                 <version>0.98.5-hadoop2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.0.0-cdh5.1.0</version>
        </dependency>
    </dependencies>
</project>

I am building the jar file using maven:

我正在使用maven构建jar文件:

mvn package

and running with this command:

运行这个命令:

~/Desktop/spark/bin/spark-submit --class org.example.main.scalaConsumer scalConsumer-0.0.1-SNAPSHOT.jar 

The error which i am assuming is a linking error due to mismatched versions (first time using maven and scala) is this:

我所假设的错误是由于不匹配的版本导致的链接错误(第一次使用maven和scala):

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

From searching i ve seen that this is a common occurence but i have failed to find a solution. Am i missing something in my dependencies?

从搜索中我发现这是一个常见的现象,但我没有找到解决办法。我是否遗漏了我的依赖项?

1 个解决方案

#1


0  

Perhaps it is to do with this in your pom file:

也许这与你的pom文件有关:

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.90.3</version>
    </dependency>

You are referring to version 0.90.3, while in all other cases you are referring to 0.98.5-hadoop:

您指的是版本0.90.3,而在所有其他情况下,您指的是0.98.5-hadoop:

    <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>0.98.5-hadoop2</version>
    </dependency>

0.90 is actually a terribly old version of HBase (2011)!

0.90实际上是一个可怕的老版本的HBase (2011)!

#1


0  

Perhaps it is to do with this in your pom file:

也许这与你的pom文件有关:

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.90.3</version>
    </dependency>

You are referring to version 0.90.3, while in all other cases you are referring to 0.98.5-hadoop:

您指的是版本0.90.3,而在所有其他情况下,您指的是0.98.5-hadoop:

    <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>0.98.5-hadoop2</version>
    </dependency>

0.90 is actually a terribly old version of HBase (2011)!

0.90实际上是一个可怕的老版本的HBase (2011)!