Spark Streaming:在流转换期间使用外部数据

时间:2021-01-24 23:12:31

I have a situation where I have to filter data-points in a stream based on some condition involving a reference to external data. I have loaded up the external data in a Dataframe (so that I get to query on it using SQL interface). But when I tried to query on Dataframe I see that we cannot access it inside the transform (filter) function. (sample code below)

我有一种情况,我必须根据涉及外部数据引用的某些条件过滤流中的数据点。我已经在Dataframe中加载了外部数据(以便我可以使用SQL接口对其进行查询)。但是当我尝试查询Dataframe时,我发现我们无法在transform(filter)函数中访问它。 (以下示例代码)

    // DStream is created and temp table called 'locations' is registered
    dStream.filter(dp => {
             val responseDf = sqlContext.sql("select location from locations where id='001'")
             responseDf.show()  //nothing is displayed
             // some condition evaluation using responseDf
             true
    }) 

Am I doing something wrong? If yes, then what would be a better approach to load external data in-memory and query it during stream transformation stage.

难道我做错了什么?如果是,那么在流转换阶段加载外部数据并在流转换阶段查询它会是一种更好的方法。

1 个解决方案

#1


0  

Using SparkSession instead of SQLContext solved the issue. Code below,

使用SparkSession而不是SQLContext解决了这个问题。代码如下,

          val sparkSession = SparkSession.builder().appName("APP").getOrCreate()
          val df = sparkSession.createDataFrame(locationRepo.getLocationInfo, classOf[LocationVO])
          df.createOrReplaceTempView("locations")

          val dStream: DStream[StreamDataPoint] = getdStream()

          dStream.filter(dp => {
                 val sparkAppSession = SparkSession.builder().appName("APP").getOrCreate()
                 val responseDf = sparkAppSession.sql("select location from locations where id='001'")
                 responseDf.show()  // this prints the results
                 // some condition evaluation using responseDf
                 true
          }) 

#1


0  

Using SparkSession instead of SQLContext solved the issue. Code below,

使用SparkSession而不是SQLContext解决了这个问题。代码如下,

          val sparkSession = SparkSession.builder().appName("APP").getOrCreate()
          val df = sparkSession.createDataFrame(locationRepo.getLocationInfo, classOf[LocationVO])
          df.createOrReplaceTempView("locations")

          val dStream: DStream[StreamDataPoint] = getdStream()

          dStream.filter(dp => {
                 val sparkAppSession = SparkSession.builder().appName("APP").getOrCreate()
                 val responseDf = sparkAppSession.sql("select location from locations where id='001'")
                 responseDf.show()  // this prints the results
                 // some condition evaluation using responseDf
                 true
          })