从Dataflow Job连接到Cloud SQL

时间:2022-09-01 15:34:25

I'm struggling to use JdbcIO with Apache Beam 2.0 (Java) to connect to a Cloud SQL instance from Dataflow within the same project.

我正在努力使用JdbcIO与Apache Beam 2.0(Java)在同一个项目中从Dataflow连接到Cloud SQL实例。

I'm getting the following error:

我收到以下错误:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
  • According to the documentation the dataflow service account *@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to all resources within the same project if he's got "Editor" permissions.

    根据文档,数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com如果拥有“编辑”权限,则应该可以访问同一项目中的所有资源。

  • When I run the same Dataflow job with DirectRunner everything works fine.

    当我使用DirectRunner运行相同的Dataflow作业时,一切正常。

This is the code I'm using:

这是我正在使用的代码:

private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";

PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
 .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
  .withUsername(JDBC_USER).withPassword(JDBC_PW))
 .withQuery(
  "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
 .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
 .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () {
  public KV < String, Double > mapRow(ResultSet resultSet) throws Exception {
   return KV.of(resultSet.getString(1), resultSet.getDouble(2));
  }
 }));

EDIT:

编辑:

Using the following approach outside of beam within another dataflow job seems to work fine with DataflowRunner which tells me that the database might not be the problem.

在另一个数据流作业中使用beam之外的以下方法似乎可以正常使用DataflowRunner,它告诉我数据库可能不是问题。

java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);

4 个解决方案

#1


2  

I think this approach may work better, please try the com.mysql.jdbc.GoogleDriver, and use the maven dependencies listed here.

我认为这种方法可能会更好,请尝试使用com.mysql.jdbc.GoogleDriver,并使用此处列出的maven依赖项。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

Related question: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?

相关问题:我在哪里找到并下载这个jar文件com.mysql.jdbc.GoogleDriver?

#2


2  

Following these instructions on how to connect to Cloud SQL from Java:

遵循以下有关如何从Java连接到Cloud SQL的说明:

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

I managed to make it work.

我设法让它发挥作用。

This is what the code looks like (you must replace MYDBNAME, MYSQLINSTANCE, USER and PASSWORD with your values.

这就是代码的样子(您必须用您的值替换MYDBNAME,MYSQLINSTANCE,USER和PASSWORD。

Heads up: MYSQLINSTANCE format is project:zone:instancename.

抬头:MYSQLINSTANCE格式是project:zone:instancename。

And I'm using a custom class (Customer) to store the values for each row, instead of key-value pairs.

我正在使用自定义类(Customer)来存储每行的值,而不是键值对。

p.apply(JdbcIO. <Customer> read()
    .withDataSourceConfiguration(
        JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", 
            "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
        )
    )
    .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
    .withCoder( AvroCoder.of(Customer.class) )
    .withRowMapper(
        new JdbcIO.RowMapper < Customer > ()
        {
            @Override
            public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
            {
                final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                LOG.info(resultSet.getString(2));
                Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                return customer;
            }
        }
    )
);

I hope this helps.

我希望这有帮助。

#3


1  

Hi it worked for me in the way u did it.Additionaly i removed withusername and password methods from the db configuration method and my pipeline configurations looks like below

嗨,这对我来说就像你做的那样。添加我从db配置方法中删除了用户名和密码方法,我的管道配置如下所示

PCollection < KV <  Double, Double >> exchangeRates = p.apply(JdbcIO. < KV <  Double, Double >> read()
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
             )
     .withQuery(
      "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
     .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
     .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () {
      @Override
       public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception {
         LOG.info(resultSet.getDouble(1)+ "Came");
          return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
      }
     }));

Hope this will help

希望这会有所帮助

#4


0  

I am new to Cloud and just wanted to know if there is any way that we make the connection somewhere else instead of making it here:

我是Cloud的新手,只是想知道我们是否有任何方法可以在其他地方建立连接而不是在这里进行连接:

   withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                .create("com.mysql.jdbc.Driver",
                        "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8"));

Need the answer urgently as I dont want the connection to be merged with actual processing instead just start the connection here

迫切需要答案,因为我不希望连接与实际处理合并而只是在这里启动连接

#1


2  

I think this approach may work better, please try the com.mysql.jdbc.GoogleDriver, and use the maven dependencies listed here.

我认为这种方法可能会更好,请尝试使用com.mysql.jdbc.GoogleDriver,并使用此处列出的maven依赖项。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

Related question: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?

相关问题:我在哪里找到并下载这个jar文件com.mysql.jdbc.GoogleDriver?

#2


2  

Following these instructions on how to connect to Cloud SQL from Java:

遵循以下有关如何从Java连接到Cloud SQL的说明:

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

I managed to make it work.

我设法让它发挥作用。

This is what the code looks like (you must replace MYDBNAME, MYSQLINSTANCE, USER and PASSWORD with your values.

这就是代码的样子(您必须用您的值替换MYDBNAME,MYSQLINSTANCE,USER和PASSWORD。

Heads up: MYSQLINSTANCE format is project:zone:instancename.

抬头:MYSQLINSTANCE格式是project:zone:instancename。

And I'm using a custom class (Customer) to store the values for each row, instead of key-value pairs.

我正在使用自定义类(Customer)来存储每行的值,而不是键值对。

p.apply(JdbcIO. <Customer> read()
    .withDataSourceConfiguration(
        JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", 
            "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
        )
    )
    .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
    .withCoder( AvroCoder.of(Customer.class) )
    .withRowMapper(
        new JdbcIO.RowMapper < Customer > ()
        {
            @Override
            public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
            {
                final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                LOG.info(resultSet.getString(2));
                Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                return customer;
            }
        }
    )
);

I hope this helps.

我希望这有帮助。

#3


1  

Hi it worked for me in the way u did it.Additionaly i removed withusername and password methods from the db configuration method and my pipeline configurations looks like below

嗨,这对我来说就像你做的那样。添加我从db配置方法中删除了用户名和密码方法,我的管道配置如下所示

PCollection < KV <  Double, Double >> exchangeRates = p.apply(JdbcIO. < KV <  Double, Double >> read()
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
             )
     .withQuery(
      "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
     .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
     .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () {
      @Override
       public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception {
         LOG.info(resultSet.getDouble(1)+ "Came");
          return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
      }
     }));

Hope this will help

希望这会有所帮助

#4


0  

I am new to Cloud and just wanted to know if there is any way that we make the connection somewhere else instead of making it here:

我是Cloud的新手,只是想知道我们是否有任何方法可以在其他地方建立连接而不是在这里进行连接:

   withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                .create("com.mysql.jdbc.Driver",
                        "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8"));

Need the answer urgently as I dont want the connection to be merged with actual processing instead just start the connection here

迫切需要答案,因为我不希望连接与实际处理合并而只是在这里启动连接