根据依赖于数据帧某些字段值的参数化SQL查询,在Spark Dataframe中添加列

时间:2022-01-11 07:53:12

I have several Spark Dataframes(we can call them Table a, table b etc). I want to add a column just to table a, based on a result of a query to one of the other tables, but this table will change every time based on a value of one of the fields of table a. So this query should be parametric. Below I show an example to make the problem clear:

我有几个Spark Dataframes(我们可以称之为表a,表b等)。我想根据对其他表之一的查询结果向表a添加一个列,但是这个表每次都会根据表a的一个字段的值进行更改。所以这个查询应该是参数化的。下面我展示一个示例来解决问题:

Every table has the column OID and a column TableName with the name of the current table, plus other columns.


    This is the fixed query to be performed on Tab A to add new column:

    Select $ColumnName from $TableName where OID=$oids

    Tab A
    |   oids|TableName  |ColumnName | other fields|New Column: ValueOidDb
    |    2  |  Book      | Title     |      x      |result query:harry potter
    |    8  |  Book      | Isbn      |      y      |result query: 556 
    |    1  |  Author    | Name      |      z      |result query:Tolkien
    |    4  |  Category  |Description|      b      |result query: Commedy

    Tab Book
    |   OID |TableName   |Title       |Isbn  |other fields|
    |    2  |  Book      |harry potter| 123  | x          |
    |    8  |  Book      | hobbit     | 556  | y          | 
    |    21 |  Book      | etc        | 8942 | z          |
    |    5  |  Book      | etc2       | 984  | b          |

   Tab Author
    |   OID |TableName     |Name        |nationality |other fields|
    |    5  |  Author      |J.Rowling   | eng        | x          |
    |    2  |  Author      |Geor. Martin| us         | y          | 
    |    1  |  Author      | Tolkien    | eng        | z          |
    |    13 |  Author      | Dan Brown  | us         | b          |

   |   OID | TableName    |Description |
   |    12 |  Category    | Fantasy    | 
   |    4  |  Category    | Commedy    |  
   |    9  |  Category    | Thriller   | 
   |    7  |  Category    | Action     |  

I tried with this udf


    def setValueOid = (oid: Int,TableName: String, TableColumn: String) => {

      sqlContext.sql(s"Select $currTableColumn from $currTableName where OID = $curroid ").first().toString()
      case x: java.lang.NullPointerException =>  "error"  

   sqlContext.udf.register("setValueOid", setValueOid)

   val FinalRtxf =  sqlContext.sql("SELECT all the column of TAB A ,"
                 + " setValueOid(oid, Table,AttributeDatabaseColumn ) as     ValueOidDb"
                 + " FROM TAB A")

I put the code in a try catch because otherwise it gives me a nullpointerexception, but it doesn't work, because it always returns a "problem". If I try this function without a sql query by just passing some manual parameters it works perfectly:

我把代码放在try catch中,否则它会给我一个nullpointerexception,但它不起作用,因为它总是返回一个“问题”。如果我通过传递一些手动参数来尝试没有sql查询的这个函数,它可以完美地工作:

          val try=setValueOid(8,"BOOK","ISBN")
           try: String = [0977326403 ]                    FINISHED   
          Took 4 sec. Last updated by anonymous at November 20 2016, 3:29:28 AM.

I read here that is not possible to make a query inside a udf Trying to execute a spark sql query from a UDF

我在这里读到,无法在udf中进行查询尝试从UDF执行spark sql查询

So how can I solve my problem? I don't know how to make a parametric join. I tried this:


         Select  all attributes TAB A,    
         FROM TAB A  as a
         join (Select $AttributeDatabaseColumn ,TableName  from $Table where  OID=$oid) as b
         on a.Table=b.TableName 

but it gave me this exception:


  org.apache.spark.sql.AnalysisException: cannot recognize input near  '$'   'AttributeDatabaseColumn' ',' in select clause; line 3 pos 1       at   org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:318)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)

1 个解决方案



One option:

  • transform each Book, Author, Category to a form:


     |-- oid: integer (nullable = false)
     |-- tableName: string (nullable = true)
     |-- properties: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)

    For example first record in Book:


    val book = Seq((2L, "Book", 
      Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x")
    )).toDF("oid", "title", "properties")
    |oid|tableName|properties                                               |
    |2  |Book     |Map(title -> harry potter, Isbn -> 123, other field -> x)|
  • union Book, Author, Category as properties.

    union Book,Author,Category as properties。

    val properties = book.union(author).union(category)
  • join with base table:


    val comb = properties.join(table, Seq($"oid", $"tableName"))
  • use case when ... based on tableName to add new column from properties field.




One option:

  • transform each Book, Author, Category to a form:


     |-- oid: integer (nullable = false)
     |-- tableName: string (nullable = true)
     |-- properties: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)

    For example first record in Book:


    val book = Seq((2L, "Book", 
      Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x")
    )).toDF("oid", "title", "properties")
    |oid|tableName|properties                                               |
    |2  |Book     |Map(title -> harry potter, Isbn -> 123, other field -> x)|
  • union Book, Author, Category as properties.

    union Book,Author,Category as properties。

    val properties = book.union(author).union(category)
  • join with base table:


    val comb = properties.join(table, Seq($"oid", $"tableName"))
  • use case when ... based on tableName to add new column from properties field.
