根据依赖于数据帧某些字段值的参数化SQL查询,在Spark Dataframe中添加列

时间:2022-01-11 07:53:12

I have several Spark Dataframes(we can call them Table a, table b etc). I want to add a column just to table a, based on a result of a query to one of the other tables, but this table will change every time based on a value of one of the fields of table a. So this query should be parametric. Below I show an example to make the problem clear:

我有几个Spark Dataframes(我们可以称之为表a,表b等)。我想根据对其他表之一的查询结果向表a添加一个列,但是这个表每次都会根据表a的一个字段的值进行更改。所以这个查询应该是参数化的。下面我展示一个示例来解决问题:

Every table has the column OID and a column TableName with the name of the current table, plus other columns.

每个表都有列OID和列TableName,其中包含当前表的名称以及其他列。

    This is the fixed query to be performed on Tab A to add new column:

    Select $ColumnName from $TableName where OID=$oids

    Tab A
    |   oids|TableName  |ColumnName | other fields|New Column: ValueOidDb
    ================================================================
    |    2  |  Book      | Title     |      x      |result query:harry potter
    |    8  |  Book      | Isbn      |      y      |result query: 556 
    |    1  |  Author    | Name      |      z      |result query:Tolkien
    |    4  |  Category  |Description|      b      |result query: Commedy


    Tab Book
    |   OID |TableName   |Title       |Isbn  |other fields|
    ================================================================
    |    2  |  Book      |harry potter| 123  | x          |
    |    8  |  Book      | hobbit     | 556  | y          | 
    |    21 |  Book      | etc        | 8942 | z          |
    |    5  |  Book      | etc2       | 984  | b          |

   Tab Author
    |   OID |TableName     |Name        |nationality |other fields|
    ================================================================
    |    5  |  Author      |J.Rowling   | eng        | x          |
    |    2  |  Author      |Geor. Martin| us         | y          | 
    |    1  |  Author      | Tolkien    | eng        | z          |
    |    13 |  Author      | Dan Brown  | us         | b          |


   |   OID | TableName    |Description |
   =====================================
   |    12 |  Category    | Fantasy    | 
   |    4  |  Category    | Commedy    |  
   |    9  |  Category    | Thriller   | 
   |    7  |  Category    | Action     |  

I tried with this udf

我试过这个udf

    def setValueOid = (oid: Int,TableName: String, TableColumn: String) => {

    try{
      sqlContext.sql(s"Select $currTableColumn from $currTableName where OID = $curroid ").first().toString()
       }
  catch{
      case x: java.lang.NullPointerException =>  "error"  
       }

      }
   sqlContext.udf.register("setValueOid", setValueOid)

   val FinalRtxf =  sqlContext.sql("SELECT all the column of TAB A ,"
                 + " setValueOid(oid, Table,AttributeDatabaseColumn ) as     ValueOidDb"
                 + " FROM TAB A")

I put the code in a try catch because otherwise it gives me a nullpointerexception, but it doesn't work, because it always returns a "problem". If I try this function without a sql query by just passing some manual parameters it works perfectly:

我把代码放在try catch中,否则它会给我一个nullpointerexception,但它不起作用,因为它总是返回一个“问题”。如果我通过传递一些手动参数来尝试没有sql查询的这个函数,它可以完美地工作:

          val try=setValueOid(8,"BOOK","ISBN")
           try: String = [0977326403 ]                    FINISHED   
          Took 4 sec. Last updated by anonymous at November 20 2016, 3:29:28 AM.

I read here that is not possible to make a query inside a udf Trying to execute a spark sql query from a UDF

我在这里读到,无法在udf中进行查询尝试从UDF执行spark sql查询

So how can I solve my problem? I don't know how to make a parametric join. I tried this:

那我怎么解决我的问题呢?我不知道如何进行参数连接。我试过这个:

       %sql
         Select  all attributes TAB A,    
         FROM TAB A  as a
         join (Select $AttributeDatabaseColumn ,TableName  from $Table where  OID=$oid) as b
         on a.Table=b.TableName 

but it gave me this exception:

但它给了我这个例外:

  org.apache.spark.sql.AnalysisException: cannot recognize input near  '$'   'AttributeDatabaseColumn' ',' in select clause; line 3 pos 1       at   org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:318)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)

1 个解决方案

#1


0  

One option:

  • transform each Book, Author, Category to a form:

    将每本书籍,作者,类别转换为表格:

    root
     |-- oid: integer (nullable = false)
     |-- tableName: string (nullable = true)
     |-- properties: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)
    

    For example first record in Book:

    例如Book中的第一条记录:

    val book = Seq((2L, "Book", 
      Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x")
    )).toDF("oid", "title", "properties")
    
    +---+---------+---------------------------------------------------------+
    |oid|tableName|properties                                               |
    +---+---------+---------------------------------------------------------+
    |2  |Book     |Map(title -> harry potter, Isbn -> 123, other field -> x)|
    +---+---------+---------------------------------------------------------+
    
  • union Book, Author, Category as properties.

    union Book,Author,Category as properties。

    val properties = book.union(author).union(category)
    
  • join with base table:

    与基表连接:

    val comb = properties.join(table, Seq($"oid", $"tableName"))
    
  • use case when ... based on tableName to add new column from properties field.

    用例...基于tableName从属性字段添加新列。

#1


0  

One option:

  • transform each Book, Author, Category to a form:

    将每本书籍,作者,类别转换为表格:

    root
     |-- oid: integer (nullable = false)
     |-- tableName: string (nullable = true)
     |-- properties: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)
    

    For example first record in Book:

    例如Book中的第一条记录:

    val book = Seq((2L, "Book", 
      Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x")
    )).toDF("oid", "title", "properties")
    
    +---+---------+---------------------------------------------------------+
    |oid|tableName|properties                                               |
    +---+---------+---------------------------------------------------------+
    |2  |Book     |Map(title -> harry potter, Isbn -> 123, other field -> x)|
    +---+---------+---------------------------------------------------------+
    
  • union Book, Author, Category as properties.

    union Book,Author,Category as properties。

    val properties = book.union(author).union(category)
    
  • join with base table:

    与基表连接:

    val comb = properties.join(table, Seq($"oid", $"tableName"))
    
  • use case when ... based on tableName to add new column from properties field.

    用例...基于tableName从属性字段添加新列。