如何实现大规模迁移到DynamoDB的更高吞吐量?

时间:2023-01-15 09:45:58

I have been running a test for a large data migration to dynamo that we intend to do in our prod account this summer. I ran a test to batch write about 3.2 billion documents to our dynamo table, which has a hash and range keys and two partial indexes. Each document is small, less than 1k. While we succeeded in getting the items written in about 3 days, we were disappointed with the Dynamo performance we experienced and are looking for suggestions on how we might improve things.

我一直在测试我们打算在今年夏天在我们的prod帐户中进行的大型数据迁移到发电机。我运行了一个测试,将大约32亿个文档批量写入我们的dynamo表,该表具有散列和范围键以及两个部分索引。每个文件都很小,不到1k。虽然我们在大约3天内成功获得了这些项目,但我们对我们所经历的Dynamo表现感到失望,并且正在寻找关于我们如何改进事物的建议。

In order to do this migration, we are using 2 ec2 instances (c4.8xlarges). Each runs up to 10 processes of our migration program; we've split the work among the processes by some internal parameters and know that some processes will run longer than others. Each process queries our RDS database for 100,000 records. We then split these into partitions of 25 each and use a threadpool of 10 threads to call the DynamoDB java SDK's batchSave() method. Each call to batchSave() is sending only 25 documents that are less than 1k each, so we expect each to only make a single HTTP call out to AWS. This means that at any given time, we can have as many as 100 threads on a server each making calls to batchSave with 25 records. Our RDS instance handled the load of queries to it just fine during this time, and our 2 EC2 instances did as well. On the ec2 side, we did not max out our cpu, memory, or network in or network out. Our writes are not grouped by hash key, as we know that can be known to slow down dynamo writes. In general, in a group of 100,000 records, they are split across 88,000 different hash keys. I created the dynamo table initially with 30,000 write throughput, but configured up to 40,000 write throughput at one point during the test, so our understanding is that there are at least 40 partitions on the dynamo side to handle this.

为了进行此迁移,我们使用了2个ec2实例(c4.8xlarges)。每个程序最多可运行10个迁移程序;我们通过一些内部参数在流程之间拆分工作,并且知道某些流程的运行时间比其他流程要长。每个进程查询我们的RDS数据库以查找100,000条记录。然后我们将它们分成25个分区,并使用10个线程的线程池来调用DynamoDB java SDK的batchSave()方法。每次调用batchSave()时,只发送25个文档,每个文档少于1k,因此我们希望每个文档只向AWS发出一次HTTP调用。这意味着在任何给定时间,我们可以在服务器上拥有多达100个线程,每个线程调用具有25条记录的batchSave。我们的RDS实例在此期间处理了对它的查询负载,我们的2个EC2实例也是如此。在ec2方面,我们没有最大化我们的CPU,内存或网络或网络输出。我们的写入不是按散列键分组的,因为我们知道可以减慢发电机写入速度。通常,在一组100,000个记录中,它们分为88,000个不同的哈希键。我最初创建了Dynamo表,其写入吞吐量为30,000,但在测试期间的一个点上配置了高达40,000的写入吞吐量,因此我们的理解是发电机侧至少有40个分区来处理这个问题。

We saw very variable responses times in our calls to batchSave() to dynamo throughout this period. For one span of 20 minutes while I was running 100 threads per ec2 instance, the average time was 0.636 seconds, but the median was only 0.374, so we've got a lot of calls taking more than a second. I'd expect to see much more consistency in the time it takes to make these calls from an EC2 instance to dynamo. Our dynamo table seems to have plenty of throughput configured, and the EC2 instance is below 10% CPU, and the network in and out look healthy, but are not close to be maxed out. The CloudWatch graphs in the console (which are fairly terrible...) didn't show any throttling of write requests.

在整个这段时间里,我们在调用batchSave()到dynamo时看到了非常多变的响应时间。对于每个ec2实例运行100个线程的20分钟的一个跨度,平均时间是0.636秒,但是中位数仅为0.374,所以我们有很多呼叫花费超过一秒钟。我期望从EC2实例到发电机进行这些调用所需的时间更加一致。我们的发电机表似乎配置了大量的吞吐量,并且EC2实例的CPU低于10%,并且网络输入和输出看起来都很健康,但不会接近最大化。控制台中的CloudWatch图形(相当糟糕......)没有显示任何写入请求限制。

After I took these sample times, some of our processes finished their work, so we were running less threads on our ec2 instances. When that happened, we saw dramatically improved response times in our calls to dynamo. e.g. when we were running 40 threads instead of 100 on the ec2 instance, each making calls to batchSave, the response times improved more than 5x. However, we did NOT see improved write throughput even with the increased better response times. It seems that no matter what we configured our write throughput to be, we never really saw the actual throughput exceed 15,000.

在我采用这些采样时间之后,我们的一些进程完成了他们的工作,因此我们在ec2实例上运行的线程更少。当发生这种情况时,我们在发电机调用中看到了显着改善的响应时间。例如当我们在ec2实例上运行40个线程而不是100个线程时,每个线程调用batchSave,响应时间提高了5倍以上。但是,即使响应时间越来越长,我们也没有看到提高的写入吞吐量。似乎无论我们将写入吞吐量配置为什么,我们从未真正看到实际吞吐量超过15,000。

We'd like some advice on how best to achieve better performance on a Dynamo migration like this. Our production migration this summer will be time-sensitive, of course, and by then, we'll be looking to migrate about 4 billion records. Does anyone have any advice on how we can achieve an overall higher throughput rate? If we're willing to pay for 30,000 units of write throughput for our main index during the migration, how can we actually achieve performance close to that?

我们想了解如何在这样的Dynamo迁移中获得更好性能的一些建议。今年夏天我们的生产迁移将是时间敏感的,当然,到那时,我们将寻求迁移大约40亿条记录。有没有人对如何实现整体更高的吞吐率有任何建议?如果我们愿意在迁移期间为主索引支付30,000单位的写入吞吐量,那么我们如何才能真正实现接近这一目标的性能呢?

1 个解决方案

#1


One component of BatchWrite latency is the Put request that takes the longest in the Batch. Considering that you have to loop over the List of DynamoDBMapper.FailedBatch until it is empty, you might not be making progress fast enough. Consider running multiple parallel DynamoDBMapper.save() calls instead of batchSave so that you can make progress independently for each item you write.

BatchWrite延迟的一个组成部分是批处理中占用时间最长的Put请求。考虑到你必须循环遍历DynamoDBMapper.FailedBatch列表,直到它为空,你可能没有足够快地取得进展。考虑运行多个并行的DynamoDBMapper.save()调用而不是batchSave,以便您可以为您编写的每个项目独立进行。

Again, Cloudwatch metrics are 1 minute metrics so you may have peaks of consumption and throttling that are masked by the 1 minute window. This is compounded by the fact that the SDK, by default, will retry throttled calls 10 times before exposing the ProvisionedThroughputExceededException to the client, making it difficult to pinpoint when and where the actual throttling is happening. To improve your understanding, try reducing the number of SDK retries, request ConsumedCapacity=TOTAL, self-throttle your writes using Guava RateLimiter as is described in the rate-limited scan blog post, and log throttled primary keys to see if any patterns emerge.

同样,Cloudwatch指标是1分钟指标,因此您可能会有1分钟窗口屏蔽的消耗和限制峰值。默认情况下,SDK会在将ProvisionedThroughputExceededException暴露给客户端之前重试受限制的调用10次,这使得很难确定实际限制发生的时间和地点。为了提高您的理解,请尝试减少SDK重试次数,请求ConsumedCapacity = TOTAL,使用Guava RateLimiter自行限制您的写入,如速率限制扫描博客文章中所述,并记录受限制的主键以查看是否出现任何模式。

Finally, the number of partitions of a table is not only driven by the amount of read and write capacity units you provision on your table. It is also driven by the amount of data you store in your table. Generally, a partition stores up to 10GB of data and then will split. So, if you just write to your table without deleting old entries, the number of partitions in your table will grow without bound. This causes IOPS starvation - even if you provision 40000 WCU/s, if you already have 80 partitions due to the amount of data, the 40k WCU will be distributed among 80 partitions for an average of 500 WCU per partition. To control the amount of stale data in your table, you can have a rate-limited cleanup process that scans and removes old entries, or use rolling time-series tables (slides 84-95) and delete/migrate entire tables of data as they become less relevant. Rolling time-series tables is less expensive than rate-limited cleanup as you do not consume WCU with a DeleteTable operation, while you consume at least 1 WCU for each DeleteItem call.

最后,表的分区数不仅由您在表上提供的读写容量单位数量驱动。它还受您在表中存储的数据量的驱动。通常,分区最多可存储10GB数据,然后进行拆分。因此,如果您只是在不删除旧条目的情况下写入表,则表中的分区数将不受限制地增长。这导致IOPS饥饿 - 即使您配置40000 WCU / s,如果由于数据量已经有80个分区,则40k WCU将分布在80个分区中,平均每个分区500 WCU。要控制表中陈旧数据的数量,您可以使用速率限制清理过程来扫描和删除旧条目,或使用滚动时间序列表(幻灯片84-95)并删除/迁移整个数据表变得不太相关。滚动时间序列表比速率限制清理更便宜,因为您不使用带有DeleteTable操作的WCU,而每个DeleteItem调用至少消耗1个WCU。

#1


One component of BatchWrite latency is the Put request that takes the longest in the Batch. Considering that you have to loop over the List of DynamoDBMapper.FailedBatch until it is empty, you might not be making progress fast enough. Consider running multiple parallel DynamoDBMapper.save() calls instead of batchSave so that you can make progress independently for each item you write.

BatchWrite延迟的一个组成部分是批处理中占用时间最长的Put请求。考虑到你必须循环遍历DynamoDBMapper.FailedBatch列表,直到它为空,你可能没有足够快地取得进展。考虑运行多个并行的DynamoDBMapper.save()调用而不是batchSave,以便您可以为您编写的每个项目独立进行。

Again, Cloudwatch metrics are 1 minute metrics so you may have peaks of consumption and throttling that are masked by the 1 minute window. This is compounded by the fact that the SDK, by default, will retry throttled calls 10 times before exposing the ProvisionedThroughputExceededException to the client, making it difficult to pinpoint when and where the actual throttling is happening. To improve your understanding, try reducing the number of SDK retries, request ConsumedCapacity=TOTAL, self-throttle your writes using Guava RateLimiter as is described in the rate-limited scan blog post, and log throttled primary keys to see if any patterns emerge.

同样,Cloudwatch指标是1分钟指标,因此您可能会有1分钟窗口屏蔽的消耗和限制峰值。默认情况下,SDK会在将ProvisionedThroughputExceededException暴露给客户端之前重试受限制的调用10次,这使得很难确定实际限制发生的时间和地点。为了提高您的理解,请尝试减少SDK重试次数,请求ConsumedCapacity = TOTAL,使用Guava RateLimiter自行限制您的写入,如速率限制扫描博客文章中所述,并记录受限制的主键以查看是否出现任何模式。

Finally, the number of partitions of a table is not only driven by the amount of read and write capacity units you provision on your table. It is also driven by the amount of data you store in your table. Generally, a partition stores up to 10GB of data and then will split. So, if you just write to your table without deleting old entries, the number of partitions in your table will grow without bound. This causes IOPS starvation - even if you provision 40000 WCU/s, if you already have 80 partitions due to the amount of data, the 40k WCU will be distributed among 80 partitions for an average of 500 WCU per partition. To control the amount of stale data in your table, you can have a rate-limited cleanup process that scans and removes old entries, or use rolling time-series tables (slides 84-95) and delete/migrate entire tables of data as they become less relevant. Rolling time-series tables is less expensive than rate-limited cleanup as you do not consume WCU with a DeleteTable operation, while you consume at least 1 WCU for each DeleteItem call.

最后,表的分区数不仅由您在表上提供的读写容量单位数量驱动。它还受您在表中存储的数据量的驱动。通常,分区最多可存储10GB数据,然后进行拆分。因此,如果您只是在不删除旧条目的情况下写入表,则表中的分区数将不受限制地增长。这导致IOPS饥饿 - 即使您配置40000 WCU / s,如果由于数据量已经有80个分区,则40k WCU将分布在80个分区中,平均每个分区500 WCU。要控制表中陈旧数据的数量,您可以使用速率限制清理过程来扫描和删除旧条目,或使用滚动时间序列表(幻灯片84-95)并删除/迁移整个数据表变得不太相关。滚动时间序列表比速率限制清理更便宜,因为您不使用带有DeleteTable操作的WCU,而每个DeleteItem调用至少消耗1个WCU。