如何使用MySQL和Apache Spark?

时间:2022-03-14 15:20:28

I want to run my existing application with Apache Spark and MySQL.

我想用Apache Spark和MySQL运行我现有的应用程序。

10 个解决方案

#1


26  

From pySpark, it work for me :

从pySpark,它对我有用:

dataframe_mysql = mySqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/my_bd_name",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "my_tablename",
    user="root",
    password="root").load()

#2


13  

Using Scala, this worked for me : Use the commands below:

使用Scala,这对我有用:使用以下命令:

sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0

import org.apache.spark.sql.SQLContext

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()

dataframe_mysql.show()

#3


10  

For Scala if you use the sbt this will also work.

对于Scala,如果您使用sbt,这也将起作用。

In your build.sbt file:

在build.sbt文件中:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.2",
    "org.apache.spark" %% "spark-sql" % "1.6.2",
    "org.apache.spark" %% "spark-mllib" % "1.6.2",
    "mysql" % "mysql-connector-java" % "5.1.12"
)

Then you just need to declare your usage of the driver.

然后你只需要声明你对驱动程序的使用。

Class.forName("com.mysql.jdbc.Driver").newInstance

val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val data = sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://<HOST>:3306/<database>")
.option("user", <USERNAME>)
.option("password", <PASSWORD>)
.option("dbtable", "MYSQL_QUERY")
.load()

#4


5  

public static void main(String[] args) {
    Map<String, String> options = new HashMap<String, String>();
    options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
    options.put("dbtable", "<TableName>");
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    // DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
    DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
    System.out.println("Data------------------->" + jdbcDF.toJSON().first());
    Row[] rows = jdbcDF.collect();
    System.out.println("Without Filter \n ------------------------------------------------- ");
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
    System.out.println("Filter Data\n ------------------------------------------------- ");
    jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
    rows = jdbcDF.collect();
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
}

#5


5  

With spark 2.0.x,you can use DataFrameReader and DataFrameWriter. Use SparkSession.read to access DataFrameReader and use Dataset.write to access DataFrameWriter.

使用spark 2.0.x,您可以使用DataFrameReader和DataFrameWriter。使用SparkSession.read访问DataFrameReader并使用Dataset.write访问DataFrameWriter。

Suppose using spark-shell.

假设使用spark-shell。

read example

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"

val df=spark.read.jdbc(url,"table_name",prop) 
df.show()

read example 2

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql:dbserver")
  .option("dbtable", “schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

from spark doc

来自spark doc

write example

import org.apache.spark.sql.SaveMode

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
//df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)

中文版戳我

中文版戳我

#6


4  

Based on this infoobjects article try the following (assuming Java or Scala, not sure how this would work with python):

基于这个Infoobjects文章尝试以下(假设Java或Scala,不知道这将如何与python一起使用):

  • add the mysql-connector-java to the path of your spark cluster
  • 将mysql-connector-java添加到spark集群的路径中
  • initialize the driver: Class.forName("com.mysql.jdbc.Driver")
  • 初始化驱动程序:Class.forName(“com.mysql.jdbc.Driver”)
  • create a JdbcRDD data source:
  • 创建一个JdbcRDD数据源:

val myRDD = new JdbcRDD( sc, () => 
                               DriverManager.getConnection(url,username,password),
                        "select first_name,last_name,gender from person limit ?, ?",
                        1,//lower bound
                        5,//upper bound
                        2,//number of partitions
                        r =>
                          r.getString("last_name") + ", " + r.getString("first_name"))

#7


2  

For Spark 2.1.0 and Scala (On Windows 7 OS), below code works pretty fine for me:

对于Spark 2.1.0和Scala(在Windows 7操作系统上),下面的代码对我来说非常好:

import org.apache.spark.sql.SparkSession

object MySQL {
  def main(args: Array[String]) {
    //At first create a Spark Session as the entry point of your app
    val spark:SparkSession = SparkSession
      .builder()
      .appName("JDBC")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "C:/Exp/")
      .getOrCreate();    

    val dataframe_mysql = spark.read.format("jdbc")
                          .option("url", "jdbc:mysql://localhost/feedback")
                          .option("driver", "com.mysql.jdbc.Driver")
                          .option("dbtable", "person") //replace with own
                          .option("user", "root") //replace with own 
                          .option("password", "vertrigo") // replace with own
                          .load()

    dataframe_mysql.show()
  }
}

#8


2  

For Java, this worked for me:

对于Java,这对我有用:

@Bean
public SparkConf sparkConf() {
    SparkConf sparkConf = new SparkConf()
            .setAppName(appName)
            .setSparkHome(sparkHome)
            .setMaster(masterUri);

    return sparkConf;
}

@Bean
public JavaSparkContext javaSparkContext() {
    return new JavaSparkContext(sparkConf());
}

@Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .sparkContext(javaSparkContext().sc())
            .appName("Java Spark SQL basic example")
            .getOrCreate();
}

Properties properties = new Properties();
        properties.put("user", "root");
        properties.put("password", "root");
        properties.put("driver", "com.mysql.cj.jdbc.Driver");
        sparkSession.read()
                    .jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
                    .show();

of course, for MySQL, I needed the connector:

当然,对于MySQL,我需要连接器:

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>

And I get

我明白了

+-------+------------------+--------------+---------------+
|BOOK_ID|        BOOK_TITLE|   BOOK_AUTHOR|      BORR_NAME|
+-------+------------------+--------------+---------------+
|      1|        Gyűrű kúra|J.R.K. Tolkien|   Sára Sarolta|
|      2|     Kecske-eledel|     Mekk Elek|Maláta Melchior|
|      3|      Répás tészta| Vegán Eleazár|           null|
|      4|Krumpli és pityóka| Farmer Emília|           null|
+-------+------------------+--------------+---------------+

#9


2  

For Java(using maven), add spark dependencies and sql driver dependencies in your pom.xml file,

对于Java(使用maven),在pom.xml文件中添加spark依赖项和sql驱动程序依赖项,

<properties>
    <java.version>1.8</java.version>
    <spark.version>1.6.3</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 <dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>

Sample code, suppose your mysql locates at local, database name is test, user name is root and password is password, and two tables in test db are table1 and table2

示例代码,假设您的mysql位于本地,数据库名称为test,用户名为root,密码为password,test db中的两个表为table1和table2

SparkConf sparkConf = new SparkConf();
SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
SQLContext sqlContext = new SQLContext(sc);

// here you can run sql query
String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
// or use an existed table directly
// String sql = "table1";
DataFrame dataFrame = sqlContext
    .read()
    .format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
    .option("user", "root")
    .option("password", "password")
    .option("dbtable", sql)
    .load();

// continue your logical code
......

#10


1  

   val query: String =
    "select col1, col2 from schema.table_name where condition"

  val url= "jdbc:mysql://<ip>:3306/<schema>"
  val username = ""
  val password = ""
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val df = sqlContext.load("jdbc", Map(
    "url" -> (url + "/?user=" + username + "&password=" + password),
    "dbtable" -> s"($query) as tbl",
    "driver" -> "com.mysql.jdbc.Driver"))

df.show()

#1


26  

From pySpark, it work for me :

从pySpark,它对我有用:

dataframe_mysql = mySqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/my_bd_name",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "my_tablename",
    user="root",
    password="root").load()

#2


13  

Using Scala, this worked for me : Use the commands below:

使用Scala,这对我有用:使用以下命令:

sudo -u root spark-shell --jars /mnt/resource/lokeshtest/guava-12.0.1.jar,/mnt/resource/lokeshtest/hadoop-aws-2.6.0.jar,/mnt/resource/lokeshtest/aws-java-sdk-1.7.3.jar,/mnt/resource/lokeshtest/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar --packages com.databricks:spark-csv_2.10:1.2.0

import org.apache.spark.sql.SQLContext

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

val dataframe_mysql = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://Public_IP:3306/DB_NAME").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "tblage").option("user", "sqluser").option("password", "sqluser").load()

dataframe_mysql.show()

#3


10  

For Scala if you use the sbt this will also work.

对于Scala,如果您使用sbt,这也将起作用。

In your build.sbt file:

在build.sbt文件中:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.6.2",
    "org.apache.spark" %% "spark-sql" % "1.6.2",
    "org.apache.spark" %% "spark-mllib" % "1.6.2",
    "mysql" % "mysql-connector-java" % "5.1.12"
)

Then you just need to declare your usage of the driver.

然后你只需要声明你对驱动程序的使用。

Class.forName("com.mysql.jdbc.Driver").newInstance

val conf = new SparkConf().setAppName("MY_APP_NAME").setMaster("MASTER")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val data = sqlContext.read
.format("jdbc")
.option("url", "jdbc:mysql://<HOST>:3306/<database>")
.option("user", <USERNAME>)
.option("password", <PASSWORD>)
.option("dbtable", "MYSQL_QUERY")
.load()

#4


5  

public static void main(String[] args) {
    Map<String, String> options = new HashMap<String, String>();
    options.put("url","jdbc:postgresql://<DBURL>:<PORT>/<Database>?user=<UserName>&password=<Password>");
    options.put("dbtable", "<TableName>");
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("DBConnection").setMaster("local[*]"));
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    // DataFrame jdbcDF = sqlContext.load("jdbc", options).cache();
    DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable"));
    System.out.println("Data------------------->" + jdbcDF.toJSON().first());
    Row[] rows = jdbcDF.collect();
    System.out.println("Without Filter \n ------------------------------------------------- ");
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
    System.out.println("Filter Data\n ------------------------------------------------- ");
    jdbcDF = jdbcDF.select("agency_id","route_id").where(jdbcDF.col("route_id").$less$eq(3));
    rows = jdbcDF.collect();
    for (Row row2 : rows) {
        System.out.println(row2.toString());
    }
}

#5


5  

With spark 2.0.x,you can use DataFrameReader and DataFrameWriter. Use SparkSession.read to access DataFrameReader and use Dataset.write to access DataFrameWriter.

使用spark 2.0.x,您可以使用DataFrameReader和DataFrameWriter。使用SparkSession.read访问DataFrameReader并使用Dataset.write访问DataFrameWriter。

Suppose using spark-shell.

假设使用spark-shell。

read example

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"

val df=spark.read.jdbc(url,"table_name",prop) 
df.show()

read example 2

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql:dbserver")
  .option("dbtable", “schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

from spark doc

来自spark doc

write example

import org.apache.spark.sql.SaveMode

val prop=new java.util.Properties()
prop.put("user","username")
prop.put("password","yourpassword")
val url="jdbc:mysql://host:port/db_name"
//df is a dataframe contains the data which you want to write.
df.write.mode(SaveMode.Append).jdbc(url,"table_name",prop)

中文版戳我

中文版戳我

#6


4  

Based on this infoobjects article try the following (assuming Java or Scala, not sure how this would work with python):

基于这个Infoobjects文章尝试以下(假设Java或Scala,不知道这将如何与python一起使用):

  • add the mysql-connector-java to the path of your spark cluster
  • 将mysql-connector-java添加到spark集群的路径中
  • initialize the driver: Class.forName("com.mysql.jdbc.Driver")
  • 初始化驱动程序:Class.forName(“com.mysql.jdbc.Driver”)
  • create a JdbcRDD data source:
  • 创建一个JdbcRDD数据源:

val myRDD = new JdbcRDD( sc, () => 
                               DriverManager.getConnection(url,username,password),
                        "select first_name,last_name,gender from person limit ?, ?",
                        1,//lower bound
                        5,//upper bound
                        2,//number of partitions
                        r =>
                          r.getString("last_name") + ", " + r.getString("first_name"))

#7


2  

For Spark 2.1.0 and Scala (On Windows 7 OS), below code works pretty fine for me:

对于Spark 2.1.0和Scala(在Windows 7操作系统上),下面的代码对我来说非常好:

import org.apache.spark.sql.SparkSession

object MySQL {
  def main(args: Array[String]) {
    //At first create a Spark Session as the entry point of your app
    val spark:SparkSession = SparkSession
      .builder()
      .appName("JDBC")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "C:/Exp/")
      .getOrCreate();    

    val dataframe_mysql = spark.read.format("jdbc")
                          .option("url", "jdbc:mysql://localhost/feedback")
                          .option("driver", "com.mysql.jdbc.Driver")
                          .option("dbtable", "person") //replace with own
                          .option("user", "root") //replace with own 
                          .option("password", "vertrigo") // replace with own
                          .load()

    dataframe_mysql.show()
  }
}

#8


2  

For Java, this worked for me:

对于Java,这对我有用:

@Bean
public SparkConf sparkConf() {
    SparkConf sparkConf = new SparkConf()
            .setAppName(appName)
            .setSparkHome(sparkHome)
            .setMaster(masterUri);

    return sparkConf;
}

@Bean
public JavaSparkContext javaSparkContext() {
    return new JavaSparkContext(sparkConf());
}

@Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .sparkContext(javaSparkContext().sc())
            .appName("Java Spark SQL basic example")
            .getOrCreate();
}

Properties properties = new Properties();
        properties.put("user", "root");
        properties.put("password", "root");
        properties.put("driver", "com.mysql.cj.jdbc.Driver");
        sparkSession.read()
                    .jdbc("jdbc:mysql://localhost:3306/books?useSSL=false", "(SELECT books.BOOK_ID as BOOK_ID, books.BOOK_TITLE as BOOK_TITLE, books.BOOK_AUTHOR as BOOK_AUTHOR, borrowers.BORR_NAME as BORR_NAME FROM books LEFT OUTER JOIN borrowers ON books.BOOK_ID = borrowers.BOOK_ID) as t", properties) // join example
                    .show();

of course, for MySQL, I needed the connector:

当然,对于MySQL,我需要连接器:

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>

And I get

我明白了

+-------+------------------+--------------+---------------+
|BOOK_ID|        BOOK_TITLE|   BOOK_AUTHOR|      BORR_NAME|
+-------+------------------+--------------+---------------+
|      1|        Gyűrű kúra|J.R.K. Tolkien|   Sára Sarolta|
|      2|     Kecske-eledel|     Mekk Elek|Maláta Melchior|
|      3|      Répás tészta| Vegán Eleazár|           null|
|      4|Krumpli és pityóka| Farmer Emília|           null|
+-------+------------------+--------------+---------------+

#9


2  

For Java(using maven), add spark dependencies and sql driver dependencies in your pom.xml file,

对于Java(使用maven),在pom.xml文件中添加spark依赖项和sql驱动程序依赖项,

<properties>
    <java.version>1.8</java.version>
    <spark.version>1.6.3</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 <dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>6.0.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>

Sample code, suppose your mysql locates at local, database name is test, user name is root and password is password, and two tables in test db are table1 and table2

示例代码,假设您的mysql位于本地,数据库名称为test,用户名为root,密码为password,test db中的两个表为table1和table2

SparkConf sparkConf = new SparkConf();
SparkContext sc = new SparkContext("local", "spark-mysql-test", sparkConf);
SQLContext sqlContext = new SQLContext(sc);

// here you can run sql query
String sql = "(select * from table1 join table2 on table1.id=table2.table1_id) as test_table";
// or use an existed table directly
// String sql = "table1";
DataFrame dataFrame = sqlContext
    .read()
    .format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true")
    .option("user", "root")
    .option("password", "password")
    .option("dbtable", sql)
    .load();

// continue your logical code
......

#10


1  

   val query: String =
    "select col1, col2 from schema.table_name where condition"

  val url= "jdbc:mysql://<ip>:3306/<schema>"
  val username = ""
  val password = ""
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val df = sqlContext.load("jdbc", Map(
    "url" -> (url + "/?user=" + username + "&password=" + password),
    "dbtable" -> s"($query) as tbl",
    "driver" -> "com.mysql.jdbc.Driver"))

df.show()