Vertica系列:Vertica和Hadoop的互操作性

时间:2021-07-03 12:10:58

Vertica 8和 Hadoop 集群的互操作性已经很不错的, 但这块肯定是Vertica研发的重点, 将来可能还有较大的变动.

Vertica 集群 和 Hadoop 集群的两种布局方式

集群布局 描述 许可证 支持Vertica for SQL on Hadoop特性
共享集群 Vertica 节点部署在 Hadoop 所有节点或部分节点上 Vertica for SQL on Hadoop 许可证 支持
分离集群 Vertica 集群和 Hadoop 分属不同的集群 社区版或Premium版许可证 不支持

从目前情况下, 还是推荐使用分离模式, 主要原因有:

  • 共存模式需要为 yarn 和 Vertica 分别分配合适的计算资源, 操作难度较大.
  • 存储方面 Vertica 推荐采用 Raid10 或 Raid50, 而 HDFS 不推荐使用 Raid.
  • Hadoop Balancer功能很可能会影响Vertica的执行效率(通常情况下Vertica仅仅部署在集群中的几台).

但长期看, 共享集群应该是Vertica重点的发展方向, HAWQ 就是这个思路. 从目前看到的资料, Vertica 9将有很大的进展.

Vertica 和 Hadoop 互操作方式汇总

方法 描述 推荐 效率 必须共享集群吗
Reading Directly from HDFS 以External table 或 bulk load的方式读取HDFS上的格式化文件 Recommended 无需
HCatalog Connector 使用Vertica SQL的方式访问 Hive 的数据表 中低 无需
HDFS Connector 以External table 或 bulk load的方式读取HDFS上的格式化文件 Deprecated 无需
ROS Containers on HDFS 在HDFS上存放Vertica ROS文件 必须
MapReduce Connector 使用MapReduce或Pig来访问 HDFS 上的数据 中/低 ?
Export to Parquet 导出Parquet文件到本地或HDFS上 无需
ParallelExport 导出文件到本地或HDFS上 无需

读取原生 Hadoop 文件格式

在官网中好像叫做Reading Directly from HDFS 方式 或 HDFS Schema 方式.
该方法是用来替代 HDFS Connector 方法. 在使用该方法之前, Vertica 节点应能访问 Hadoop 配置文件:

  • 如果共享集群, Vertica 自然能访问到这些配置文件, 只需要在 Vertica DB 中验证 HadoopConfDir 环境变量即可.
  • 如果分离集群, Vertica 必须将所需文件复制到这些节点, 但一个更好的做法是, 将 Vertica 节点配置为 Hadoop 的边缘节点(英文为 edge node). 从 Hadoop 的角度来看,Vertica 就是一款客户端应用程序. 可以使用 Hortonworks Ambari 或 Cloudera Manager 配置边缘节点.

在 Vertica DB 中设置 HadoopConfDir 参数, 其路径应该包括一个含有 core-site.xml 和 hdfs-site.xml 文件的目录.
ALTER DATABASE mydb SET HadoopConfDir = '/hadoop/hcat/conf';

验证配置是否正确:
select VERIFY_HADOOP_CONF_DIR( )

该访问方式效率较高,但需要注意的是,

  • ORC 或 Parquet 文件不得使用复杂数据类型.
  • 由 Hive 或 Impala 压缩的文件需要进行 Zlib (GZIP) 或 Snappy 压缩, 不支持 LZO 压缩.
  • 定义外部表时,您必须定义文件中的所有列.

使用 hdfs 方案不会消除对 webHDFS 服务访问权限的需求, hdfs 方案并不适用于所有的文件. 如果 hdfs 不可用,那么 Vertica 会自动改用 webhdfs. 可以直接使用 webhdfs:// 前缀,并指定主机名、端口和文件路径, 读取不受 Kerberos 保护的文件时,使用 hdfs 方案可能会提供更好的性能.

[推荐] 使用 Hadoop NameService 的方式访问 hdfs 上的数据, 注意是两个 slash 的写法, hadoopNS 这个name service 已经在 hdfs-site.xml 定义好了.
CREATE EXTERNAL TABLE tt (a1 INT, a2 VARCHAR(20))
AS COPY FROM 'hdfs://hadoopNS/data/file.csv';

[不推荐] 如果没有在hdfs-site.xml中定义name service, 需要使用 hdfs:/// 这样的写法.
CREATE EXTERNAL TABLE tt (a1 INT, a2 VARCHAR(20))
AS COPY FROM 'hdfs:///opt/data/file.csv';

直接支持 ORC/Parquet 格式,
=> CREATE EXTERNAL TABLE tableName (columns)
AS COPY FROM path ORC;
=> CREATE EXTERNAL TABLE tableName (columns)
AS COPY FROM path PARQUET;
=> CREATE EXTERNAL TABLE t (id int, name varchar(50),
created date, region varchar(50))
AS COPY FROM 'hdfs:///path///*'
PARQUET(hive_partition_cols='created,region');

直接 HDFS 的 bulk load 示例:
COPY t FROM 'hdfs:///opt/data/file1.dat';

如果您在启动 Vertica 后更新了任何Hadoop的配置文件,使用下面的语句强制更新 Vertica 中作更新.
SELECT CLEAR_HDFS_CACHES ();

HCatalog Connector 访问方式

首先Vertica 集群需要安装 Hadoop and hive的 jar library, 同时能拿到正确的 Hadoop 和 Hive 的配置文件(hive-site.xml/core-site.xml/yarn-site.xml/webhcat-site.xml/hdfs-site.xml).
Vertica 专门提供了 hcatUtil 工具帮助我们完成这些配置, 该工具的位置是: /opt/vertica/packages/hcat/tools/
详细使用见 https://my.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/HadoopIntegrationGuide/HCatalogConnector/ConfiguringVerticaForHCatalog.htm

另外, 还可以配置是要使用 HiveServer2 还是 WebHCat
ALTER DATABASE mydb SET HCatalogConnectorUseHiveServer2 = 1; -- 使用 HiveServer2, 缺省模式.
ALTER DATABASE mydb SET HCatalogConnectorUseHiveServer2 = 0; -- 使用 WebHCat, 效率比 HiveServer2 好.

集群Linux 完成安装后, 还需要在 Vertica DB 中安装必要的Udx库, Vertica 8.1缺省未安装.
首先通过下面SQL验证 VHCatalogLib user_lib 是否已经安装?
select * from user_library_manifest where user_library_manifest.lib_name = 'VHCatalogLib';
如果没有安装的化, 需要先安装, vsql -f /opt/vertica/packages/hcat/ddl/install.sql

创建 HCatalog schema 名为 hcat
CREATE HCATALOG SCHEMA hcat WITH
hostname='hcathost'
HCATALOG_SCHEMA='default'
HCATALOG_USER='hcatuser';

可以使用 hcat schema直接访问 hive 表了.
SELECT
distinct ship_type,
ship_mode,
ship_carrier
FROM hcat.shipping_dimension
WHERE shipping_key >= 10627
GROUP BY ship_mode,
ship_carrier;

HDFS Connector 的外部表访问示例

首先通过下面SQL验证 HdfsSource user_lib 是否已经安装? Vertica 8.1缺省已经安装.
select * from user_library_manifest where user_library_manifest.lib_name = 'HdfsSource';
如果没有安装的化, 需要先安装, vsql -f /opt/vertica/packages/hdfsconnector/ddl/install.sql

CREATE EXTERNAL TABLE shipping_dimension
(shipping_key integer,
ship_type char(30),
ship_mode char(10),
ship_carrier char(20))
AS COPY FROM SOURCE
Hdfs(url='http://n01:50070/webhdfs/v1/dbadmin/shippingDimension/*');

HDFS Connector 的 bulk load 示例:
COPY testTable SOURCE Hdfs(url='http://hadoop:50070/webhdfs/v1/tmp/test.txt', username='hadoopUser');

ROS Containers on HDFS 方式

共享集群暂时不做过多研究, 需要创建 HDFS 存储位置
=> CREATE LOCATION 'webhdfs://hadoop:50070/user/dbadmin' ALL NODES SHARED USAGE 'data' LABEL 'coldstorage';
=> SELECT node_name,location_path,location_label FROM STORAGE_LOCATIONS;

-- Vertica Proprietary Format in HDFS
=> SELECT set_object_storage_policy('DBNAME','HDFS');

导出到HDFS

EXPORT TO PARQUET 可以支持本地导出, 也可以导出到HDFS上.
EXPORT TO PARQUET(directory='hdfs:///data/sales_data')
AS SELECT * FROM public.sales;

EXPORT TO PARQUET(directory='hdfs:///data/sales_by_region')
AS SELECT sale.price, sale.date, store.region
FROM public.sales sale
JOIN public.vendor store ON sale.distribID = store.ID;

EXPORT TO PARQUET(directory = 'hdfs:///data/export')
OVER(PARTITION BY date) AS SELECT date, price FROM public.sales;

EXPORT TO PARQUET(directory = 'hdfs:///data/export')
OVER(PARTITION BY date ORDER BY price) AS SELECT date, price FROM public.sales;

使用第三方库 ParallelExport 导出到HDFS

EXPORT TO PARQUET 的导出格式必须是 Parquet, 第三方函数 ParallelExport 支持很多中导出格式, 但效率比EXPORT TO PARQUET要差.
参考< http://www.dbjungle.com/exporting-vast-amounts-of-data-using-parallel-export-for-hpe-vertica/>

select ParallelExport(eutime,eid::varchar,logintype::varchar,sessiontime
using parameters cmd='hadoop fs -put - /user/etl/app/app_fact_event_base/${nodeName}', separator=' '
) over( partition auto ) from app.app_fact_event_base where date(sessiontime) in ('2016-12-13','2016-12-14','2017-02-12','2017-02-13') limit 1000;