更新HIVE表格的简单方法

时间:2024-03-26 18:35:04


第一部分,用于如何更容易地更新Hive表

从历史上看,在Apache Hive中保持数据最新,需要定制应用程序开发,这是复杂的,非高性能的,难以维护的。HDP 2.6通过在Hive中引入SQL MERGE从根本上简化了数据维护,补充了现有的INSERT,UPDATE和DELETE功能。

这个博客展示了如何解决常见的数据管理问题,包括:

  • Hive插件,将Hive数据与源RDBMS同步。
  • 更新数据在Hive中的分区。
  • 在Hive中选择性地屏蔽或清除数据。

在稍后的博客中,我们将展示如何使用Hive管理缓慢变化的维度(SCD)。

基本知识:SQL MERGE,UPDATE和DELETE。

这些SQL特性是保持Hadoop中的数据最新的基础,所以让我们快速看一下它们。

MERGE在SQL 2008中被标准化,是一个强大的SQL语句,允许在一个语句中插入,更新和删除数据。MERGE使保持两个系统的一致性变得容易。我们来看看MERGE的SQL规范(稍微简化一下):

合并到< 目标表>
  USING < 表参考> ON < 检索条件> < 合并子句> ...

 
WHEN MATCHED [ AND < search condition > ]  
THEN < merge merge update or delete specification >  
WHEN NOT MATCHED [ AND < search condition > ]  
THEN < merge insert specification >

MERGE功能非常强大,因为您可以根据需要指定许多WHEN MATCHED / WHEN NOT MATCHED子句。

在这个博客中,我们还将使用更熟悉的UPDATE语句,如下所示:

UPDATE <目标表> 
SET <设置子句列表> 
[WHERE <搜索条件>]

当你不需要在同一个语句中混合使用插入和更新时,更新就会发挥作用。

准备好保持数据新鲜

有了HDP 2.6,你需要做两件事来让你的表更新。

首先:您需要配置您的系统以允许Hive交易。在Ambari这只是意味着打开ACID Transactions设置。

更新HIVE表格的简单方法

第二:你的表必须是一个交易表。这意味着表必须被聚集,作为ORCFile数据存储,并有一个表示属性,说事务=真。这是一个例子:

create table customer_partitioned
(id int, name string, email string, state string)
 partitioned by (signup date)
 clustered by (id) into 2 buckets stored as orc
 tblproperties("transactional"="true");

使用案例1:配置单元UPSERTS

假设您有一个要加载到Hadoop中的源数据库来运行大规模分析。源RDBMS中的记录不断被添加和修改,而且没有日志可以帮助您了解哪些记录已更改。为了简单起见,您只需每24小时执行一次完全转储,并更新Hadoop端,使其成为源端的镜像。

更新HIVE表格的简单方法

让我们创建我们的托管表,如下所示:

create table customer_partitioned
(id int, name string, email string, state string)
 partitioned by (signup date)
 clustered by (id) into 2 buckets stored as orc
 tblproperties("transactional"="true");

假设我们在Time = 1的源数据如下所示:

更新HIVE表格的简单方法

时间= 2的刷新负载看起来像这样:

更新HIVE表格的简单方法

Upsert将更新和插入操作合并到一个操作中,因此您不必担心目标表中是否存在记录。MERGE设计时考虑到了这个用例,基本用法非常简单:

merge into customer_partitioned
 using all_updates on customer_partitioned.id = all_updates.id
 when matched then update set
   email=all_updates.email,
   state=all_updates.state
 when not matched then insert
   values(all_updates.id, all_updates.name, all_updates.email,
   all_updates.state, all_updates.signup);

请注意,我们分别使用“when matched”和“when not matched”条件来管理更新和插入。合并过程之后,托管表与T = 2中的分级表完全相同,并且所有记录都位于其各自的分区中。

使用案例2:更新HIVE分区。

Hive中的一个常见策略是按日期划分数据。这简化了数据加载并提高了性能。无论您的分区策略如何,您偶尔都会将数据分配到错误的分区中。例如,假设客户数据由第三方提供,并包含客户注册日期。如果提供商有软件错误,需要更改客户注册日期,突然记录是在错误的分区,需要清理。

假设我们的初始数据如下所示:

更新HIVE表格的简单方法

而我们的第二个负载看起来像这样:

更新HIVE表格的简单方法

请注意,ID 2在T = 1时注册日期错误,因此在Hive表中的错误分区中。这需要以某种方式进行更新,以便将ID 2从2017-01-08分区中删除并添加到2017-01-10。

在MERGE之前,几乎不可能管理这些分区**更改。Hive的MERGE语句本身不支持更新分区键,但是这里有一个简单的方法。我们引入一个删除标记,我们随时设置分区键和UNION,第二个查询会为这些不匹配的记录中的每一个生成一个额外的行。代码使这个过程更加明显:

merge into customer_partitioned
 using (
-- Updates with matching partitions or net new records.
select
  case
       when all_updates.signup <> customer_partitioned.signup then 1
       else 0
     end as delete_flag,
     all_updates.id as match_key,
     all_updates.* from
    all_updates left join customer_partitioned
   on all_updates.id = customer_partitioned.id
      union all

 -- Produce new records when partitions dont match.
     select 0, null, all_updates.*
     from all_updates, customer_partitioned where
     all_updates.id = customer_partitioned.id
     and all_updates.signup <> customer_partitioned.signup
 ) sub
on customer_partitioned.id = sub.match_key
 when matched and delete_flag=1 then delete
 when matched and delete_flag=0 then
   update set email=sub.email, state=sub.state
 when not matched then
   insert values(sub.id, sub.name, sub.email, sub.state, sub.signup);

在MERGE进程之后,即使记录需要更改分区,托管表和源表也是同步的。最重要的是在完全原子性和隔离的单一操作中完成的。

使用案例3:掩码或清除HIVE数据。

假设有一天您的安全办公室会告诉您,所有来自特定客户的数据都需要被屏蔽或清除。在过去,您可能花了数小时或数天重写受影响的分区。

假设我们的联系人表格如下所示:

更新HIVE表格的简单方法

我们创建了我们的表格如下:

create table contacts
 (id int, name string, customer string, phone string)
 clustered by (id) into 2 buckets stored as orc 
tblproperties("transactional"="true");

安全办公室接近我们,要求采取各种行动:

例如:屏蔽来自客户MaxLeads的所有联系电话号码

这里我们使用Hive的内置掩码功能(从HDP 2.5或Apache Hive 2.1开始)

update contacts set phone = mask(phone) where customer = 'MaxLeads';

例如:清除客户LeadMax的所有记录

delete from contacts where customer = 'LeadMax';  

示例:清除与给定的键列表匹配的记录。

假设安全办公室给了我们一个带有特定**的CSV,并要求我们删除与这些**匹配的记录。我们将安全办公室的CSV加载到表中,并使用子查询获取**列表。

更新HIVE表格的简单方法

delete from contacts where id in ( select id from purge_list );   

结论:

Hive的MERGE和ACID交易使Hive中的数据管理变得简单,强大,并与已经使用多年的现有EDW平台兼容。请继续关注本系列中的下一篇博客,其中展示了如何管理Hive中缓慢变化的维度。