报错就是找不到kafka的类
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
先贴代码:
在读取kafka 多个topic的时候自定义输出类:
集群打包运行报错:
java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/ConsumerRecord
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1589)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:646)
at com.sjb.testing.OdsCanalEtlData_testing$.main(OdsCanalEtlData_testing.scala:65)
at com.sjb.testing.OdsCanalEtlData_testing.main(OdsCanalEtlData_testing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 38 more
解压包之后发现,kafka依赖没打进去,重写打包,把打成依赖包:
集群继续运行:
java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:288)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:194)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:140)
at com.sjb.testing.OdsCanalEtlData_testing$.main(OdsCanalEtlData_testing.scala:60)
at com.sjb.testing.OdsCanalEtlData_testing.main(OdsCanalEtlData_testing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
POM文件 build
<build> <plugins> <plugin > <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <!-- 所有的编译都依照JDK1.8来搞 --> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.1</version> <executions> <execution> <id>compile-scala</id> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 用于项目的打包插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
打成的jar里面是有包的, 所以猜测可能不是打包的问题,使用scala 重写 类 TopicAndValueDeserializationSchema
class TopicAndValueDeserializationSchema extends KafkaDeserializationSchema[TopicAndValue] { override def isEndOfStream(t: TopicAndValue): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): TopicAndValue = new TopicAndValue(consumerRecord.topic, new String(consumerRecord.value, "UTF8")) override def getProducedType: TypeInformation[TopicAndValue] = TypeInformation.of(new TypeHint[TopicAndValue]() {}) }
再打包集群运行,不报错.....