I've used Apache Spark with the PostgreSQL JDBC driver on my own Linux servers before without issues, but I can't get it to work on Amazon EMR doing it the same way.
我在自己的Linux服务器上使用了Apache Spark和PostgreSQL JDBC驱动程序,没有问题,但我无法让它在Amazon EMR上以同样的方式工作。
I first downloaded the Postgres driver and set up my pyspark classpath this way: Adding postgresql jar though spark-submit on amazon EMR
我首先下载了Postgres驱动程序,并以这种方式设置了pyspark类路径:在amazon EMR上添加postgresql jar。
I executed the following in pyspark
on an Amazon EMR instance set up with Spark, similarly to how I usually do it on my own server. "myhost" is the hostname of my Amazon RDS instance running PostgreSQL, which I am able to connect to from my EMR instance with psql
, so I know it should work:
我在Amazon EMR实例上的pyspark中执行了以下操作,该实例设置为Spark,类似于我通常在自己的服务器上执行的方式。“myhost”是我的Amazon RDS实例运行PostgreSQL的主机名,我可以从EMR实例与psql连接,所以我知道它应该工作:
# helper, gets RDD from database
def get_db_rdd(table, lower=0, upper=1000):
db_connection = {
"host": "myhost",
"port": 5432,
"database": "mydb",
"user": "postgres",
"password": "mypassword"
}
url = "jdbc:postgresql://{}:{}/{}?user={}".format(db_connection["host"],
db_connection["port"],
db_connection["database"],
db_connection["user"])
ret = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", table) \
.option("partitionColumn", "id") \
.option("numPartitions", 1024) \
.option("lowerBound", lower) \
.option("upperBound", upper) \
.option("password", db_connection["password"]) \
.load()
ret = ret.rdd
return ret
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = get_db_rdd("test", 0, 3) # table exists, has columns (`id bigserial, string text`)
I immediately get a crash with this exception:
我马上就会遇到这样的例外:
17/04/21 19:34:07 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@3aa157b0, see the next exception for details.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
[...]
Looking around online, this has to do with Apache Hive... No idea why that's involved here, but I may be misunderstanding. I do see metastore_db
in my home dir. All the proposed solutions involve editing some Hive configuration that I don't even have on my instance or creating that dir I already have. My EMR instance has totally default settings. Could someone more familiar with this environment point me in the right direction?
看看网上,这与Apache Hive有关…不知道为什么会这样,但我可能是误会了。在我的家里,我确实看到了癌细胞转移。所有建议的解决方案都涉及编辑一些Hive配置,我甚至没有在实例上创建或者创建我已经拥有的。我的EMR实例有完全的默认设置。更熟悉这个环境的人能告诉我正确的方向吗?
Edit: I don't have the entire stack trace handy but have some left in my GNU screen. Here's more, mentions Derby:
编辑:我没有完整的堆栈跟踪,但有一些留在我的GNU屏幕。在这里,提到了德比:
Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@3aa157b0, see the next exception for details.
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source)
... 113 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/hadoop/metastore_db.
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.impl.store.raw.data.BaseDataFileFactory.privGetJBMSLockOnDB(Unknown Source)
Edit 2: Using other RDDs like the following works: sc.parallelize([1, 2, 3]).map(lambda r: r * 2).collect()
. The problem is only for RDDs connected to Postgres.
编辑2:使用其他RDDs,如下面的工作:sc.并行化([1,2,3])。map(lambda r: r * 2).collect()。问题只在于与Postgres连接的RDDs。
Edit 3:
编辑3:
>>> spark.range(5).show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
2 个解决方案
#1
1
The error message:
错误消息:
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/hadoop/metastore_db.
由:ERROR XSDB6引起:Derby的另一个实例可能已经启动了数据库/home/hadoop/metastore_db。
tells us that the embedded, one-thread Derby instance is already in use. I'm not very familiar with Hive, but is used when Spark boots Hive-enabled SparkSession
that you can see in your stack trace:
告诉我们,嵌入式的、单线程的Derby实例已经在使用中。我对Hive不是很熟悉,但是在Spark引导的SparkSession中使用,您可以在堆栈跟踪中看到:
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:192)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:366)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:270)
at org.apache.spark.sql.hive.HiveExternalCatalog.<init>(HiveExternalCatalog.scala:65)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:166)
at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:86)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:100)
at org.apache.spark.sql.internal.SessionState.<init>(SessionState.scala:157)
at org.apache.spark.sql.hive.HiveSessionState.<init>(HiveSessionState.scala:32)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:978)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:516)
I copied the most relevant lines (to remove the noise).
我复制了最相关的线(消除噪音)。
Side note: You don't really need Hive features these days since Spark supports most natively (and in Spark 2.2 most Hive "infrastructure" will get away).
附注:这些天你根本不需要Hive功能,因为Spark支持的最多(而且在Spark 2.2中,大多数蜂巢的“基础设施”将会消失)。
As you can see in the stack trace, the multiple-threads-accessing-single-threaded-Derby exception will only be thrown when you use SparkSession
which is the entry point to Spark SQL.
正如您在堆栈跟踪中看到的那样,只有在使用SparkSession(这是触发SQL的入口点)时,才会抛出多个线程-访问-单线程- derby异常。
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:516)
That's why you don't see it when working with RDD API. The RDD API does not use Hive at all.
这就是为什么在使用RDD API时看不到它的原因。RDD API根本不使用Hive。
Read up Hive's official documentation at Local/Embedded Metastore Database (Derby).
在本地/嵌入式转移数据库(Derby)中读取Hive的官方文档。
#2
0
Thanks to suggestions from Jacek about the nature of my problem, I developed a hack workaround after some trial and error. Haven't been able to really solve the problem yet, but this works, and that's good enough for me. I'll report back if I run into problems later.
感谢Jacek关于我的问题的本质的建议,在一些尝试和错误之后,我开发了一个hack解决方案。我还没能真正解决这个问题,但这是可行的,这对我来说已经足够好了。如果我以后遇到问题,我会报告的。
-
Start pyspark with the Postgres driver as normal:
pyspark --driver-class-path=/home/hadoop/postgres_driver.jar --jars=/home/hadoop/postgres_driver.jar
启动pyspark与Postgres驱动程序正常:pyspark——驱动-类路径=/home/hadoop/postgres_driver。jar——罐子= / home / hadoop / postgres_driver.jar
-
While that's open (!), in a separate SSH session, cd to home and
mv metastore_db old_metastore_db
(or you can do this in pyspark withos.system()
). The point of this is to release the lock on the metastore that Spark creates by default; Spark will recreate the directory without a lock.在单独的SSH会话中,在一个单独的SSH会话中,cd到home和mv的转移(或者您可以在pyspark中使用os.system())。这样做的目的是为了释放被默认触发的转移的锁;Spark将重新创建没有锁的目录。
-
Try to create an RDD connected to Postgres the way I described in my question. It'll give an error about "no suitable driver." For some reason, the driver wasn't loaded. But after that error, it seems the driver is actually loaded now.
尝试创建一个与Postgres连接的RDD,这是我在问题中描述的方式。它会给出一个关于“没有合适的驱动程序”的错误。由于某种原因,司机没有被装载。但是在那个错误之后,现在看起来驱动程序实际上已经加载了。
-
mv metastore_db old_metastore_db2
, for similar reasons to above. I guess another Hive session is connected now and needs its lock to be cleared out.由于类似的原因,mv转移了old_metastore_db2。我想另一个Hive session现在已经连接起来了,需要清除它的锁。
-
Create the Postgres-connected RDD again, same way. Driver is loaded, and metastore is unlocked, and it seems to work. I can fetch from my tables, perform RDD operations, and
collect()
.再次创建与postgres连接的RDD,同样的方法。驱动程序被加载,并且转移被解锁,而且它似乎起作用。我可以从表中获取,执行RDD操作,并收集()。
Yes, I know this is very dirty. Use at your own risk.
是的,我知道这很脏。使用自己的风险。
#1
1
The error message:
错误消息:
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/hadoop/metastore_db.
由:ERROR XSDB6引起:Derby的另一个实例可能已经启动了数据库/home/hadoop/metastore_db。
tells us that the embedded, one-thread Derby instance is already in use. I'm not very familiar with Hive, but is used when Spark boots Hive-enabled SparkSession
that you can see in your stack trace:
告诉我们,嵌入式的、单线程的Derby实例已经在使用中。我对Hive不是很熟悉,但是在Spark引导的SparkSession中使用,您可以在堆栈跟踪中看到:
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:192)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:366)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:270)
at org.apache.spark.sql.hive.HiveExternalCatalog.<init>(HiveExternalCatalog.scala:65)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$sql$internal$SharedState$$reflect(SharedState.scala:166)
at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:86)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:100)
at org.apache.spark.sql.internal.SessionState.<init>(SessionState.scala:157)
at org.apache.spark.sql.hive.HiveSessionState.<init>(HiveSessionState.scala:32)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$reflect(SparkSession.scala:978)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:516)
I copied the most relevant lines (to remove the noise).
我复制了最相关的线(消除噪音)。
Side note: You don't really need Hive features these days since Spark supports most natively (and in Spark 2.2 most Hive "infrastructure" will get away).
附注:这些天你根本不需要Hive功能,因为Spark支持的最多(而且在Spark 2.2中,大多数蜂巢的“基础设施”将会消失)。
As you can see in the stack trace, the multiple-threads-accessing-single-threaded-Derby exception will only be thrown when you use SparkSession
which is the entry point to Spark SQL.
正如您在堆栈跟踪中看到的那样,只有在使用SparkSession(这是触发SQL的入口点)时,才会抛出多个线程-访问-单线程- derby异常。
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:516)
That's why you don't see it when working with RDD API. The RDD API does not use Hive at all.
这就是为什么在使用RDD API时看不到它的原因。RDD API根本不使用Hive。
Read up Hive's official documentation at Local/Embedded Metastore Database (Derby).
在本地/嵌入式转移数据库(Derby)中读取Hive的官方文档。
#2
0
Thanks to suggestions from Jacek about the nature of my problem, I developed a hack workaround after some trial and error. Haven't been able to really solve the problem yet, but this works, and that's good enough for me. I'll report back if I run into problems later.
感谢Jacek关于我的问题的本质的建议,在一些尝试和错误之后,我开发了一个hack解决方案。我还没能真正解决这个问题,但这是可行的,这对我来说已经足够好了。如果我以后遇到问题,我会报告的。
-
Start pyspark with the Postgres driver as normal:
pyspark --driver-class-path=/home/hadoop/postgres_driver.jar --jars=/home/hadoop/postgres_driver.jar
启动pyspark与Postgres驱动程序正常:pyspark——驱动-类路径=/home/hadoop/postgres_driver。jar——罐子= / home / hadoop / postgres_driver.jar
-
While that's open (!), in a separate SSH session, cd to home and
mv metastore_db old_metastore_db
(or you can do this in pyspark withos.system()
). The point of this is to release the lock on the metastore that Spark creates by default; Spark will recreate the directory without a lock.在单独的SSH会话中,在一个单独的SSH会话中,cd到home和mv的转移(或者您可以在pyspark中使用os.system())。这样做的目的是为了释放被默认触发的转移的锁;Spark将重新创建没有锁的目录。
-
Try to create an RDD connected to Postgres the way I described in my question. It'll give an error about "no suitable driver." For some reason, the driver wasn't loaded. But after that error, it seems the driver is actually loaded now.
尝试创建一个与Postgres连接的RDD,这是我在问题中描述的方式。它会给出一个关于“没有合适的驱动程序”的错误。由于某种原因,司机没有被装载。但是在那个错误之后,现在看起来驱动程序实际上已经加载了。
-
mv metastore_db old_metastore_db2
, for similar reasons to above. I guess another Hive session is connected now and needs its lock to be cleared out.由于类似的原因,mv转移了old_metastore_db2。我想另一个Hive session现在已经连接起来了,需要清除它的锁。
-
Create the Postgres-connected RDD again, same way. Driver is loaded, and metastore is unlocked, and it seems to work. I can fetch from my tables, perform RDD operations, and
collect()
.再次创建与postgres连接的RDD,同样的方法。驱动程序被加载,并且转移被解锁,而且它似乎起作用。我可以从表中获取,执行RDD操作,并收集()。
Yes, I know this is very dirty. Use at your own risk.
是的,我知道这很脏。使用自己的风险。