Streaming从Spark2X迁移到Spark1.5 summary

时间:2022-11-29 00:03:07

配置文件的加载是一个难点,在local模式下非常容易,但是submit后一直报找不到文件,后来采用将properties文件放在加载类同一个package下,打包到同一个jar中解决。

Streaming从Spark2X迁移到Spark1.5 summaryStreaming从Spark2X迁移到Spark1.5 summary

 

import java.io.{BufferedInputStream , InputStream}
import java.util.Properties

/**
  * Created by wulei on 2018/4/4.
  * Description: 参数初始化公共类
  */
object InitPropertiesUtil extends Serializable {
  /**
    * get kafka's properties
    * @return java.util.Properties
    */
  def initKafkaPro: Properties = {
    val prop: Properties = new Properties
    val in: InputStream = getClass.getResourceAsStream("conf/kafka.properties")
    if (in == null){
      println("ERROR : kafka.properties init failed in is null")
    }
    prop.load(new BufferedInputStream(in))
    prop
  }

  /**
    * get redis's properties
    * @return java.util.Properties
    */
  def initRedisPro: Properties = {
    val prop: Properties = new Properties
    val in: InputStream = getClass.getResourceAsStream("conf/redis.properties")
    prop.load(new BufferedInputStream(in))
    prop
  }
}

 

 

问题: ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory

java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40)
        at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
        at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362)
        at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
        at java.sql.DriverManager.getConnection(DriverManager.java:571)
        at java.sql.DriverManager.getConnection(DriverManager.java:233)
        at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
        ... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
        at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
        ... 36 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
        ... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
        at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
        at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
        ... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
        ... 46 more

追踪ClientRpcControllerFactory这个类,将会发现是phoenix-core-4.4.0-HBase-1.0.jar中的,根据报错信息,可以知道也确实HBase的连接出现问题,于是我第一步去找是否将phoenix-core-4.4.0-HBase-1.0.jar这个包拷贝到Spark Client下,有的,

第二,我去找自己的spark-submit jars命令中是否对该包的引用有误,比如路径、逗号符号等,错误依然不在此。嗯! 放弃思考,已超出自己的理解范围了,马上在*上找到这个原生Bug

I had have same problem. The reason of problem,Phoenix use a custom rpc controller factory which is a Phoenix-specific one to configure the priorities for index and system catalog table in cluster side. It is called ClientRpcControllerFactory.

In sometimes Phoenix-enabled clusters are used from pure-HBase client applications resulting in ClassNotFoundExceptions in application code or MapReduce jobs. Since hbase configuration is shared between Phoenix-clients and HBase clients, having different configurations at the client side is hard. That's why you get this exception. This problem is fixed by HBASE-14960. If you hbase version older than 2.0.0, 1.2.0, 1.3.0, 0.98.17 You can define your client side rpc controller with this setting in hbase-site.xml:

  <property>
    <name>hbase.rpc.controllerfactory.class</name>
    <value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>  
  </property>