从并行的mysql中读取数据

时间:2021-04-24 23:09:44

Im trying to read data from mysql and write it back to parquet file in s3 with specific partitions as follows:

我试图从mysql读取数据并将其写回s3中具有特定分区的镶木地板文件,如下所示:

df=sqlContext.read.format('jdbc')\
   .options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
         dbtable='tbl',
         numPartitions=4 )\
   .load()


df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])

My problem is that it open only one connection to mysql (instead of 4) and it doesn't write to parquert until it fetches all the data from mysql, because my table in mysql is huge (100M rows) the process failed on OutOfMemory.

我的问题是它只打开一个与mysql的连接(而不是4),并且它不会写入parquert,直到它从mysql中获取所有数据,因为我在mysql中的表很大(100M行),在OutOfMemory上进程失败。

Is there a way to configure Spark to open more than one connection to mysql and to write partial data to parquet?

有没有办法配置Spark来打开多个与mysql的连接并将部分数据写入拼接?

2 个解决方案

#1


5  

You should set these properties:

您应该设置以下属性:

partitionColumn, 
lowerBound, 
upperBound, 
numPartitions

as it is documented here: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

正如它在此处记录的那样:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

#2


3  

For Spar >= 2.0 I've created a class with next methods:

对于Spar> = 2.0,我创建了一个包含下一个方法的类:

...
private val dbUrl =
s"""jdbc:mysql://${host}:${port}/${db_name}
    |?zeroDateTimeBehavior=convertToNull
    |&read_buffer_size=100M""".stripMargin.replace("\n", "")

def run(sqlQuery: String): DataFrame = {
println(sqlQuery)
Datapipeline.spark.read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", dbUrl)
  .option("user", user)
  .option("password", pass)
  .option("dbtable", s"($sqlQuery) as tmp")
  .load()
}
...
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = {
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${
  if (whereClause.length > 0) s" where $whereClause"
}"
val df = run(sql).collect()(0)

Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int])
}

def getTableFields(table: String): String = {
val sql =
  s"""
     |SELECT *
     |FROM information_schema.COLUMNS
     |WHERE table_name LIKE '$table'
     |  AND TABLE_SCHEMA LIKE '${db_name}'
     |ORDER BY ORDINAL_POSITION
   """.stripMargin
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ")
}

/**
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in
* <numPartitions> for a <table> with WHERE clause
* @param table - a table name
* @param whereClause - WHERE clause without "WHERE" key word
* @param partitionColumn - column name used for partitioning, should be numeric
* @param numPartitions - number of partitions
* @return - a DataFrame
*/
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = {
val bounds = getBounds(table, whereClause, partitionColumn)

val fields = getTableFields(table)
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions)

val lowerBound = bounds(0)
val partitionRange: Int = ((bounds(1) - bounds(0)) / numPartitions)

for (i <- 0 to numPartitions - 2) {
  dfs(i) = run(
    s"""select $fields from $table
        | where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${
      if (whereClause.length > 0)
        s" and $whereClause"
    }
     """.stripMargin.replace("\n", ""))
}

dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${
  if (whereClause.length > 0)
    s" and $whereClause"
}".replace("\n", ""))

dfs.reduceLeft((res, df) => res.union(df))

}

Last run method will create a number of necessary partitions. When you call an action method Spark will create as many parallel tasks as many partitions have been defined for the DataFrame returned by the run method.

上次运行方法将创建许多必要的分区。当您调用操作方法时,Spark将创建尽可能多的并行任务,因为已为run方法返回的DataFrame定义了许多分区。

Enjoy.

#1


5  

You should set these properties:

您应该设置以下属性:

partitionColumn, 
lowerBound, 
upperBound, 
numPartitions

as it is documented here: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

正如它在此处记录的那样:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

#2


3  

For Spar >= 2.0 I've created a class with next methods:

对于Spar> = 2.0,我创建了一个包含下一个方法的类:

...
private val dbUrl =
s"""jdbc:mysql://${host}:${port}/${db_name}
    |?zeroDateTimeBehavior=convertToNull
    |&read_buffer_size=100M""".stripMargin.replace("\n", "")

def run(sqlQuery: String): DataFrame = {
println(sqlQuery)
Datapipeline.spark.read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", dbUrl)
  .option("user", user)
  .option("password", pass)
  .option("dbtable", s"($sqlQuery) as tmp")
  .load()
}
...
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = {
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${
  if (whereClause.length > 0) s" where $whereClause"
}"
val df = run(sql).collect()(0)

Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int])
}

def getTableFields(table: String): String = {
val sql =
  s"""
     |SELECT *
     |FROM information_schema.COLUMNS
     |WHERE table_name LIKE '$table'
     |  AND TABLE_SCHEMA LIKE '${db_name}'
     |ORDER BY ORDINAL_POSITION
   """.stripMargin
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ")
}

/**
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in
* <numPartitions> for a <table> with WHERE clause
* @param table - a table name
* @param whereClause - WHERE clause without "WHERE" key word
* @param partitionColumn - column name used for partitioning, should be numeric
* @param numPartitions - number of partitions
* @return - a DataFrame
*/
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = {
val bounds = getBounds(table, whereClause, partitionColumn)

val fields = getTableFields(table)
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions)

val lowerBound = bounds(0)
val partitionRange: Int = ((bounds(1) - bounds(0)) / numPartitions)

for (i <- 0 to numPartitions - 2) {
  dfs(i) = run(
    s"""select $fields from $table
        | where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${
      if (whereClause.length > 0)
        s" and $whereClause"
    }
     """.stripMargin.replace("\n", ""))
}

dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${
  if (whereClause.length > 0)
    s" and $whereClause"
}".replace("\n", ""))

dfs.reduceLeft((res, df) => res.union(df))

}

Last run method will create a number of necessary partitions. When you call an action method Spark will create as many parallel tasks as many partitions have been defined for the DataFrame returned by the run method.

上次运行方法将创建许多必要的分区。当您调用操作方法时,Spark将创建尽可能多的并行任务,因为已为run方法返回的DataFrame定义了许多分区。

Enjoy.