在Cassandra中保存结构类型数据

时间:2021-02-14 23:13:10

I have a rdd in spark;

我有一个火花的rdd;

[((u'imdb.com/title/tt1389137', '2009-04-18'), 1),
 ((u'imdb.com/title/tt0829482', '2010-09-02'), 1),
 ((u'imdb.com/title/tt0167260', '2010-04-12'), 1),
 ((u'imdb.com/title/tt1682180', '2009-11-24'), 1),
 ((u'imdb.com/title/tt1124035', '2011-02-24'), 1),
 ((u'imdb.com/title/tt0056058', '2009-02-17'), 1),
 ((u'imdb.com/title/tt0308644', '2011-06-27'), 1),
...]

I converted it to DataFrame. The schema;

我将它转换为DataFrame。架构;

root
 |-- url_date: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |-- count: long (nullable = true)

I would like to save this data into cassandra. I created a table as;

我想将这些数据保存到cassandra中。我创建了一个表格;

create table movies_tweets_per_day (url_date frozen<set<text>>,count int, PRIMARY KEY (url_date));

And try to write the data into cassandra table;

并尝试将数据写入cassandra表;

movies_tweets_per_day.select("url_date", "count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="movies_tweets_per_day", keyspace="mykeyspace")\
.save(mode="overwrite")

Here is the error I got;

这是我得到的错误;

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-24-5ff2c90cdbe3> in <module>()
      6 movies_tweets_per_day.printSchema()
      7 
----> 8 movies_tweets_per_day.select("url_date", "count").write.format("org.apache.spark.sql.cassandra").options(table="movies_tweets_per_day", keyspace="assignment2").save(mode="overwrite")

/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    546             self.format(format)
    547         if path is None:
--> 548             self._jwrite.save()
    549         else:
    550             self._jwrite.save(path)

/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/lib/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o686.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 64.0 failed 1 times, most recent failure: Lost task 2.0 in stage 64.0 (TID 296, localhost, executor driver): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:65)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [imdb.com/title/tt1772341,2010-12-14] of type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema to Set[AnyRef].
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$30.applyOrElse(TypeConverter.scala:608)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:596)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$31.applyOrElse(TypeConverter.scala:812)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
    at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:795)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:26)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:24)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

I tried creating the table in cassandra with;

我尝试用cassandra创建表;

create table movies_tweets_per_day (url_date frozen<map<text,text>>,count int, PRIMARY KEY (url_date));

create table movies_tweets_per_day (url_date list<frozen<text>>,count int, PRIMARY KEY (url_date));

Any of them work. How can I solve the problem? Thanks in advance!

其中任何一个都有效。我该如何解决这个问题?提前致谢!

1 个解决方案

#1


0  

convert the struct to ArrayType. Example

将结构转换为ArrayType。例

val df = sc.sparkContext.parallelize(List((List("imdb.com/title/tt1389137", "2009-04-18"), 1),
      (List("imdb.com/title/tt0829482", "2010-09-02"), 1),
      (List("imdb.com/title/tt0167260", "2010-04-12"), 1),
      (List("imdb.com/title/tt1682180", "2009-11-24"), 1),
      (List("imdb.com/title/tt1124035", "2011-02-24"), 1),
      (List("imdb.com/title/tt0056058", "2009-02-17"), 1),
      (List("imdb.com/title/tt0308644", "2011-06-27"), 1))).toDF("url_date", "count")

The above will give following schema:

以上将给出以下架构:

root
 |-- url_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count: integer (nullable = true)

This can be easily saved into cassandra table.

这可以很容易地保存到cassandra表中。

For conversion you can use map method on RDD.

对于转换,您可以在RDD上使用map方法。

#1


0  

convert the struct to ArrayType. Example

将结构转换为ArrayType。例

val df = sc.sparkContext.parallelize(List((List("imdb.com/title/tt1389137", "2009-04-18"), 1),
      (List("imdb.com/title/tt0829482", "2010-09-02"), 1),
      (List("imdb.com/title/tt0167260", "2010-04-12"), 1),
      (List("imdb.com/title/tt1682180", "2009-11-24"), 1),
      (List("imdb.com/title/tt1124035", "2011-02-24"), 1),
      (List("imdb.com/title/tt0056058", "2009-02-17"), 1),
      (List("imdb.com/title/tt0308644", "2011-06-27"), 1))).toDF("url_date", "count")

The above will give following schema:

以上将给出以下架构:

root
 |-- url_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- count: integer (nullable = true)

This can be easily saved into cassandra table.

这可以很容易地保存到cassandra表中。

For conversion you can use map method on RDD.

对于转换,您可以在RDD上使用map方法。