使用spark streaming,使用超过22个参数的scala case类索引到Elasticsearch时出错

时间:2022-03-09 20:52:17

I am using Spark Streaming with Scala to write the log data to Elastic search.

我使用Spark Streaming和Scala将日志数据写入Elastic搜索。

I am not able to create case scala with more than 22 argument required in my case and is not supported in scala 2.10. So using the below approach to create the class instead of case class

我无法在我的案例中创建超过22个参数的case scala,并且在scala 2.10中不受支持。因此,使用以下方法创建类而不是case类

Scala class

class FactUsage(d_EVENT_TYPE_NR: Long,EVENT_GRP_DESC: String,EVENT_DESC: String,CUST_TYPE_CD: Long,TICKET_RATING_CD: Long,BUS_UNIT_DESC: String,CUST_MKT_SEGM_DESC: String,EVENT_DTTM: String,EVENT_DTNR: Long,SERVED_PARTY_IMEI_NUM: String,SERVED_PARTY_IMSI_NUM: String,SERVED_PARTY_PHONE_NUM: Long,OTHER_PARTY_ID: String,EVENT_DURATION_QTY: Long,EVENT_VOLUME_DOWN_QTY: Long,EVENT_VOLUME_TOTAL_QTY: Long,EVENT_VOLUME_UP_QTY: Long,ACCESS_POINT_ID: String,d_CELL_NR: Long,d_CONTRACT_NR: Long,d_CUSTOMER_NR: Long,d_CUSTOMER_TOP_PARENT_NR: String,d_DEVICE_NR: Long,d_ORIGIN_DESTINATION_NR: Long,d_DIRECTION_NR: Long,d_OTHER_OPER_NR: Long,d_OTHER_SUBSCR_OPER_NR: Long,d_ROAMING_NR: Long,d_SALES_AGENT_NR: String,d_SERVED_OPER_NR: Long,d_SERVED_SUBSCR_OPER_NR: Long,d_TARIFF_MODEL_NR: Long,d_TERMINATION_NR: Long,d_USAGE_SERVICE_NR: Long,RUN_ID: String) extends Product with Serializable 
{
def canEqual(that:Any)=that.isInstanceOf[FactUsage]
def productArity = 35 // Number of columns

def productElement(idx: Int) = idx match 
{
case 0 => d_EVENT_TYPE_NR;case 1 =>EVENT_GRP_DESC;case 2 =>EVENT_DESC;case 3 =>CUST_TYPE_CD;case 4 =>TICKET_RATING_CD;case 5 =>BUS_UNIT_DESC;case 6 =>CUST_MKT_SEGM_DESC;case 7 =>EVENT_DTTM;case 8 =>EVENT_DTNR;case 9 =>SERVED_PARTY_IMEI_NUM;case 10 =>SERVED_PARTY_IMSI_NUM;case 11 =>SERVED_PARTY_PHONE_NUM;case 12 =>OTHER_PARTY_ID;case 13 =>EVENT_DURATION_QTY;case 14 =>EVENT_VOLUME_DOWN_QTY;case 15 =>EVENT_VOLUME_TOTAL_QTY;case 16 =>EVENT_VOLUME_UP_QTY;case 17 =>ACCESS_POINT_ID;case 18 =>d_CELL_NR;case 19 =>d_CONTRACT_NR;case 20 =>d_CUSTOMER_NR;case 21 =>d_CUSTOMER_TOP_PARENT_NR;case 22 =>d_DEVICE_NR;case 23 =>d_ORIGIN_DESTINATION_NR;case 24 =>d_DIRECTION_NR;case 25 =>d_OTHER_OPER_NR;case 26 =>d_OTHER_SUBSCR_OPER_NR;case 27 =>d_ROAMING_NR;case 28 =>d_SALES_AGENT_NR;case 29 =>d_SERVED_OPER_NR;case 30 =>d_SERVED_SUBSCR_OPER_NR;case 31 =>d_TARIFF_MODEL_NR;case 32 =>d_TERMINATION_NR;case 33 =>d_USAGE_SERVICE_NR;case 34 =>RUN_ID 
}
}

Spark Streaming Code to Write to Elasticsearch

Spark Streaming Code写入Elasticsearch

val rddAbcServerLog = lines.filter(x => x.toString.contains("abc_server_logs"))
EsSparkStreaming.saveToEs(rddAbcServerLog.map(line => parser.formatDelimeted(line)).map(p => parser.runES(p.toString)), esindex + "/" + estype)

I have debugged and there is no issues with the functions used in lambda expression. Error comes while writing to Elasticsearch

我已经调试过,lambda表达式中使用的函数没有问题。写入Elasticsearch时出错

Error

17/04/15 11:34:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out..
        at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
        at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
        at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
        at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
        at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
        at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
        at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
        at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
        at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
17/04/15 11:34:05 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [xx.xxx.xx.xx:10200] returned Bad Request(400) - failed to parse; Bailing out..
        at org.elasticsearch.hadoop.rest.RestClient.processBulkResponse(RestClient.java:250)
        at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:202)
        at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:220)
        at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:242)
        at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:182)
        at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:159)
        at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
        at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
        at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

17/04/15 11:34:05 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/04/15 11:34:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/04/15 11:34:05 INFO TaskSchedulerImpl: Cancelling stage 0

Note: Code might have weird naming conventions and masked IPs, I have modified the code for posting to the public forum

注意:代码可能有奇怪的命名约定和屏蔽IP,我已经修改了发布到公共论坛的代码

1 个解决方案

#1


0  

What you're doing is cumbersome and error-prone. Instead, use multiple case classes.

你正在做的事情很麻烦且容易出错。相反,使用多个案例类。

case class Group(grpDesc: String, eventDesc: String)
case class Event(dttm: String, dtnr: String)

...and so on

...等等

Then when you've grouped all related items into their own case classes:

然后,当您将所有相关项目分组到自己的案例类中时:

case class FactUsage(group: Group, event: Event, ...)

You should pass an instance of FactUsage to saveToEs.

您应该将FactUsage的实例传递给saveToEs。

#1


0  

What you're doing is cumbersome and error-prone. Instead, use multiple case classes.

你正在做的事情很麻烦且容易出错。相反,使用多个案例类。

case class Group(grpDesc: String, eventDesc: String)
case class Event(dttm: String, dtnr: String)

...and so on

...等等

Then when you've grouped all related items into their own case classes:

然后,当您将所有相关项目分组到自己的案例类中时:

case class FactUsage(group: Group, event: Event, ...)

You should pass an instance of FactUsage to saveToEs.

您应该将FactUsage的实例传递给saveToEs。