批量更新Redshift中的现有行

时间:2022-01-01 23:04:15

This seems like it should be easy, but isn't. I'm migrating a query from MySQL to Redshift of the form:

这似乎应该很容易,但事实并非如此。我正在将查询从MySQL迁移到表格的Redshift:

INSERT INTO table
(...)
VALUES
(...)
ON DUPLICATE KEY UPDATE
  value = MIN(value, VALUES(value))

For primary keys we're inserting that aren't already in the table, those are just inserted. For primary keys that are already in the table, we update the row's values based on a condition that depends on the existing and new values in the row.

对于主键,我们正在插入表中尚未插入的主键。对于已在表中的主键,我们根据取决于行中现有值和新值的条件更新行的值。

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html does not work, because filter_expression in my case depends on the current entries in the table. I'm currently creating a staging table, inserting into it with a COPY statement and am trying to figure out the best way to merge the staging and real tables.

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html不起作用,因为我的情况下filter_expression取决于表中的当前条目。我正在创建一个临时表,使用COPY语句插入其中,并试图找出合并登台表和真实表的最佳方法。

3 个解决方案

#1


10  

I'm having to do exactly this for a project right now. The method I'm using involves 3 steps:

我现在必须为一个项目做这件事。我正在使用的方法涉及3个步骤:

1.

Run an update that addresses changed fields (I'm updating whether or not the fields have changed, but you can certainly qualify that):

运行更新以解决已更改的字段(我正在更新字段是否已更改,但您当然可以限定该字段):

update table1 set col1=s.col1, col2=s.col2,...
from table1 t
 join stagetable s on s.primkey=t.primkey;

2.

Run an insert that addresses new records:

运行一个解决新记录的插入:

insert into table1
select s.* 
from stagetable s 
 left outer join table1 t on s.primkey=t.primkey
where t.primkey is null;

3.

Mark rows no longer in the source as inactive (our reporting tool uses views that filter inactive records):

将源中的行不再标记为非活动状态(我们的报告工具使用过滤非活动记录的视图):

update table1 
set is_active_flag='N', last_updated=sysdate
from table1 t
 left outer join stagetable s on s.primkey=t.primkey
where s.primkey is null;

#2


0  

Is posible to create a temp table. In redshift is better to delete and insert the record. Check this doc

是否可以创建临时表。在redshift中最好删除并插入记录。检查此文档

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html

#3


0  

Here is the fully working approach for Redshift.

这是Redshift的全面工作方法。

Assumptions:

A.Data available in S3 in gunzip format with '|' separated columns, may have some garbage data see maxerror.

A.数据在S3中以gunzip格式提供,带有'|'分隔列,可能有一些垃圾数据见maxerror。

B.Sales fact with two dimension tables to keep it simple (TIME and SKU(SKU may have many groups and categories))).

B.使用两个维度表来销售事实以保持简单(TIME和SKU(SKU可能有许多组和类别)))。

C.You have Sales table like this.

你有这样的销售表。

CREATE TABLE sales (
 sku_id int encode zstd,
 date_id int encode zstd,
quantity numeric(10,2) encode delta32k,
);

1)Create Staging table, that should resemble with your Online Table used by app/apps.

1)创建临时表,该表应与app / apps使用的在线表类似。

CREATE TABLE stg_sales_onetime (
 sku_number varchar(255) encode zstd,
 time varchar(255) encode zstd,
 qty_str varchar(20) encode zstd,
 quantity numeric(10,2) encode delta32k,
 sku_id int encode zstd,
 date_id int encode zstd
);

2)Copy data from S3( this could done using SSH).

2)从S3复制数据(这可以使用SSH完成)。

copy stg_sales_onetime (sku_number,time,qty_str) from 
  's3://<buecket_name>/<full_file_path>' CREDENTIALS 'aws_access_key_id=<your_key>;aws_secret_access_key=<your_secret>' delimiter '|' ignoreheader 1 maxerror as 1000 gzip;

3)This step is optional, in case you don't have good formatted data, this a your transformation step if needed(as converting String(12.555654) quantity to Number(12.56))

3)此步骤是可选的,如果您没有良好的格式化数据,如果需要,这是您的转换步骤(将String(12.555654)数量转换为Number(12.56))

update  stg_sales_onetime set quantity=convert(decimal(10,2),qty_str);

4)Populating the correct IDs from dimension table.

4)从维度表中填充正确的ID。

update  stg_sales_onetime set sku_id=<your_sku_demesion_table>.sku_id  from <your_sku_demesion_table> where stg_sales_onetime.sku_number=<your_sku_demesion_table>.sku_number;
update  stg_sales_onetime set time_id=<your_time_demesion_table>.time_id  from <your_time_demesion_table> where stg_sales_onetime.time=<your_time_demesion_table>.time;

5)Finally you have data good to go from Staging to Online Sales table.

5)最后,您有从登台到在线销售表的数据。

insert into sales(sku_id,time_id,quantity)  select sku_id,time_id,quantity from stg_sales_onetime;

#1


10  

I'm having to do exactly this for a project right now. The method I'm using involves 3 steps:

我现在必须为一个项目做这件事。我正在使用的方法涉及3个步骤:

1.

Run an update that addresses changed fields (I'm updating whether or not the fields have changed, but you can certainly qualify that):

运行更新以解决已更改的字段(我正在更新字段是否已更改,但您当然可以限定该字段):

update table1 set col1=s.col1, col2=s.col2,...
from table1 t
 join stagetable s on s.primkey=t.primkey;

2.

Run an insert that addresses new records:

运行一个解决新记录的插入:

insert into table1
select s.* 
from stagetable s 
 left outer join table1 t on s.primkey=t.primkey
where t.primkey is null;

3.

Mark rows no longer in the source as inactive (our reporting tool uses views that filter inactive records):

将源中的行不再标记为非活动状态(我们的报告工具使用过滤非活动记录的视图):

update table1 
set is_active_flag='N', last_updated=sysdate
from table1 t
 left outer join stagetable s on s.primkey=t.primkey
where s.primkey is null;

#2


0  

Is posible to create a temp table. In redshift is better to delete and insert the record. Check this doc

是否可以创建临时表。在redshift中最好删除并插入记录。检查此文档

http://docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html

#3


0  

Here is the fully working approach for Redshift.

这是Redshift的全面工作方法。

Assumptions:

A.Data available in S3 in gunzip format with '|' separated columns, may have some garbage data see maxerror.

A.数据在S3中以gunzip格式提供,带有'|'分隔列,可能有一些垃圾数据见maxerror。

B.Sales fact with two dimension tables to keep it simple (TIME and SKU(SKU may have many groups and categories))).

B.使用两个维度表来销售事实以保持简单(TIME和SKU(SKU可能有许多组和类别)))。

C.You have Sales table like this.

你有这样的销售表。

CREATE TABLE sales (
 sku_id int encode zstd,
 date_id int encode zstd,
quantity numeric(10,2) encode delta32k,
);

1)Create Staging table, that should resemble with your Online Table used by app/apps.

1)创建临时表,该表应与app / apps使用的在线表类似。

CREATE TABLE stg_sales_onetime (
 sku_number varchar(255) encode zstd,
 time varchar(255) encode zstd,
 qty_str varchar(20) encode zstd,
 quantity numeric(10,2) encode delta32k,
 sku_id int encode zstd,
 date_id int encode zstd
);

2)Copy data from S3( this could done using SSH).

2)从S3复制数据(这可以使用SSH完成)。

copy stg_sales_onetime (sku_number,time,qty_str) from 
  's3://<buecket_name>/<full_file_path>' CREDENTIALS 'aws_access_key_id=<your_key>;aws_secret_access_key=<your_secret>' delimiter '|' ignoreheader 1 maxerror as 1000 gzip;

3)This step is optional, in case you don't have good formatted data, this a your transformation step if needed(as converting String(12.555654) quantity to Number(12.56))

3)此步骤是可选的,如果您没有良好的格式化数据,如果需要,这是您的转换步骤(将String(12.555654)数量转换为Number(12.56))

update  stg_sales_onetime set quantity=convert(decimal(10,2),qty_str);

4)Populating the correct IDs from dimension table.

4)从维度表中填充正确的ID。

update  stg_sales_onetime set sku_id=<your_sku_demesion_table>.sku_id  from <your_sku_demesion_table> where stg_sales_onetime.sku_number=<your_sku_demesion_table>.sku_number;
update  stg_sales_onetime set time_id=<your_time_demesion_table>.time_id  from <your_time_demesion_table> where stg_sales_onetime.time=<your_time_demesion_table>.time;

5)Finally you have data good to go from Staging to Online Sales table.

5)最后,您有从登台到在线销售表的数据。

insert into sales(sku_id,time_id,quantity)  select sku_id,time_id,quantity from stg_sales_onetime;