1、读Hive表数据
pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from pyspark.sql import HiveContext,SparkSession
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
hive_context = HiveContext(spark_session )
# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}" . format (hive_database, hive_table)
# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)
|
2 、将数据写入hive表
pyspark写hive表有两种方式:
(1)通过SQL语句生成表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from pyspark.sql import SparkSession, HiveContext
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
data = [
( 1 , "3" , "145" ),
( 1 , "4" , "146" ),
( 1 , "5" , "25" ),
( 1 , "6" , "26" ),
( 2 , "32" , "32" ),
( 2 , "8" , "134" ),
( 2 , "8" , "134" ),
( 2 , "9" , "137" )
]
df = spark.createDataFrame(data, [ 'id' , "test_id" , 'camera_id' ])
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable( 'test_hive' )
sqlContext.sql( "create table default.write_test select * from test_hive" )
|
(2)saveastable的方式
1
2
3
4
5
|
# method two
# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据
df.write. format ( "hive" ).mode( "overwrite" ).saveAsTable( 'default.write_test' )
|
tips:
spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:
spark-submit --conf spark.sql.catalogImplementation=hive test.py
补充知识:PySpark基于SHC框架读取HBase数据并转成DataFrame
一、首先需要将HBase目录lib下的jar包以及SHC的jar包复制到所有节点的Spark目录lib下
二、修改spark-defaults.conf 在spark.driver.extraClassPath和spark.executor.extraClassPath把上述jar包所在路径加进去
三、重启集群
四、代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#/usr/bin/python
#-*- coding:utf-8 –*-
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame
sc = SparkContext(appName = "pyspark_hbase" )
sql_sc = SQLContext(sc)
dep = "org.apache.spark.sql.execution.datasources.hbase"
#定义schema
catalog = """{
"table":{"namespace":"default", "name":"teacher"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"string"},
"name":{"cf":"teacherInfo", "col":"name", "type":"string"},
"age":{"cf":"teacherInfo", "col":"age", "type":"string"},
"gender":{"cf":"teacherInfo", "col":"gender","type":"string"},
"cat":{"cf":"teacherInfo", "col":"cat","type":"string"},
"tag":{"cf":"teacherInfo", "col":"tag", "type":"string"},
"level":{"cf":"teacherInfo", "col":"level","type":"string"} }
}"""
df = sql_sc.read.options(catalog = catalog). format (dep).load()
print ( '***************************************************************' )
print ( '***************************************************************' )
print ( '***************************************************************' )
df.show()
print ( '***************************************************************' )
print ( '***************************************************************' )
print ( '***************************************************************' )
sc.stop()
|
五、解释
数据来源参考请本人之前的文章,在此不做赘述
schema定义参考如图:
六、结果
以上这篇在python中使用pyspark读写Hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u011412768/article/details/93426353