There are some examples for use SQL over Spark Streaming in foreachRDD()
. But if I want to use SQL in tranform()
:
有一些在foreachRDD()中使用SQL / Spark流的示例。但是,如果我想要在tranform()中使用SQL:
case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
if (rdd.count > 0) {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("logstash")
val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
} else {
rdd
}
}).print()
I got such error:
我得到了这样的错误:
[error] /Users/raochenlin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/LogStash.scala:52: no type parameters for method transform: (transformFunc: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[U])(implicit evidence$5: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U] exist so that it can be applied to arguments (org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable]) [error] --- because --- [error] argument expression's type is not compatible with formal parameter type; [error] found : org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable] [error] required: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[?U] [error] lines.transform( rdd => { [error] ^ [error] one error found [error] (compile:compile) Compilation failed
(错误)/用户/ raochenlin /下载/ spark-1.2.0-bin-hadoop2.4 / logstash / src / main / scala / logstash。scala:52:没有方法转换的类型参数(transformFunc: org.apache.spark.rdd)。RDD[String] => org.apache.spark.rdd.RDD[U](隐式证据$5:scala.reflec.classtag [U]) org.apache.spark.stream.dstream。DStream[U]的存在使它可以应用于参数(org.apache.spark.rdd)。抽样[String]= > org.apache.spark.rdd。(_ >:LogStash抽样。由于---[错误]参数表达式的类型与形式参数类型不兼容;(错误)发现:org.apache.spark.rdd。抽样[String]= > org.apache.spark.rdd。(_ >:LogStash抽样。具有字符串<:java.io的AlertMsg。可序列化的][错误]要求:org.apache.spark.rdd。抽样[String]= > org.apache.spark.rdd.RDD[?U][错误]。变换抽样= > {(错误)^(错误)发现一个错误(错误)(编译:编译)编译失败
Seems only if I use sqlreport.map(r => r.toString)
can be a correct usage?
除非我使用sqlreport。map(r => r. tostring)是正确的用法吗?
1 个解决方案
#1
0
dstream.transform
take a function transformFunc: (RDD[T]) ⇒ RDD[U]
In this case, the if
must result in the same type on both evaluations of the condition, which is not the case:
dstream。变换函数transformFunc:抽样[T])⇒抽样(U)在这种情况下,如果必须导致相同类型的两个条件的评估,这并非如此:
if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]
In this case, remove the optimization of if rdd.count ...
sothat you have an unique transformation path.
在这种情况下,删除if rdd的优化。数……所以你有一个独特的变换路径。
#1
0
dstream.transform
take a function transformFunc: (RDD[T]) ⇒ RDD[U]
In this case, the if
must result in the same type on both evaluations of the condition, which is not the case:
dstream。变换函数transformFunc:抽样[T])⇒抽样(U)在这种情况下,如果必须导致相同类型的两个条件的评估,这并非如此:
if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]
In this case, remove the optimization of if rdd.count ...
sothat you have an unique transformation path.
在这种情况下,删除if rdd的优化。数……所以你有一个独特的变换路径。