com.datastax.spark.connector.writer.NullKeyColumnException:键列年的null值无效

时间:2021-08-28 20:48:47

Below is my code.

以下是我的代码。

directKafkaStream.foreachRDD(rdd -> 
     {
        rdd.foreach(record -> 
             {
                messages1.add(record._2);
             });
                JavaRDD<String> lines = sc.parallelize(messages1);
                JavaPairRDD<Integer, String> data = lines.mapToPair(new PairFunction<String, Integer, String>()
                {
                    @Override
                    public Tuple2<Integer, String> call(String a)
                    {
                        String[] tokens = StringUtil.split(a, '%');
                        return new Tuple2<Integer, String>(Integer.getInteger(tokens[3]),tokens[2]);
                    }
                }); // map to get year and name of the movie
                Function2<String, String, String> reduceSumFunc = (accum, n) -> (accum.concat(n)); // function for reduce
                JavaPairRDD<Integer, String> yearCount = data.reduceByKey(reduceSumFunc); // reduceByKey to count
                javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra(); // this is the error line
            });

Here is the error I am getting.

这是我得到的错误。

com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year
    at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47)
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56)
    at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)        

Description:

1) Trying to connect Kafka and cassandra using spark

1)尝试使用spark连接Kafka和cassandra

2) Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra

2)能够存储JavaRDD但不能将JavaPairRDD存储到cassandra中

3) DB I have given comment in the line where the error is

3)DB我在错误行中给出了评论

1 个解决方案

#1


0  

One of your values for year is null, this is not allowed. Check your data and look for what's generating a null integer.

您的年份值之一为null,这是不允许的。检查您的数据并查找生成空整数的内容。

#1


0  

One of your values for year is null, this is not allowed. Check your data and look for what's generating a null integer.

您的年份值之一为null,这是不允许的。检查您的数据并查找生成空整数的内容。