Spark:不断读取Cassandra的数据

时间:2021-12-16 23:10:36

I have gone through Reading from Cassandra using Spark Streaming and through tutorial-1 and tutorial-2 links.

我使用Spark Streaming并通过tutorial-1和tutorial-2链接阅读了Cassandra的Reading。

Is it fair to say that Cassandra-Spark integration currently does not provide anything out of the box to continuously get the updates from Cassandra and stream them to other systems like HDFS?

可以公平地说,Cassandra-Spark集成目前没有提供任何开箱即用的功能,可以不断从Cassandra获取更新并将其流式传输到HDFS等其他系统吗?

By continuously, I mean getting only those rows in a table which have changed (inserted or updated) since the last fetch by Spark. If there are too many such rows, there should be an option to limit the number of rows and the subsequent spark fetch should begin from where it left off. At-least once guarantee is ok but exactly-once would be a huge welcome.

通过连续,我的意思是只获取自上次获取Spark以来已更改(插入或更新)的表中的那些行。如果有太多这样的行,应该有一个选项来限制行数,随后的spark fetch应该从它停止的地方开始。至少一次保证是好的,但确切一次将是一个巨大的欢迎。

If its not supported, one way to support it could be to have an auxiliary column updated_time in each cassandra-table that needs to be queried by storm and then use that column for queries. Or an auxiliary table per table that contains ID, timestamp of the rows being changed. Has anyone tried this before?

如果它不受支持,支持它的一种方法可能是在每个cassandra-table中有一个辅助列updated_time,需要通过storm查询,然后使用该列进行查询。或者每个表的辅助表,其中包含要更改的行的ID,时间戳。有没有人试过这个?

2 个解决方案

#1


0  

I don't think Apache Cassandra has this functionality out of the box. Internally [for some period of time] it stores all operations on data in sequential manner, but it's per node and it gets compacted eventually (to save space). Frankly, Cassandra's (as most other DB's) promise is to provide latest view of data (which by itself can be quite tricky in distributed environment), but not full history of how data was changing.

我不认为Apache Cassandra具有开箱即用的功能。在内部[在一段时间内]它以顺序方式将所有操作存储在数据上,但它是每个节点并最终被压缩(以节省空间)。坦率地说,Cassandra(与大多数其他数据库一样)的承诺是提供最新的数据视图(这在分布式环境中本身可能非常棘手),但不是数据如何变化的完整历史记录。

So if you still want to have such info in Cassandra (and process it in Spark), you'll have to do some additional work yourself: design dedicated table(s) (or add synthetic columns), take care of partitioning, save offset to keep track of progress, etc.

因此,如果您仍希望在Cassandra中获得此类信息(并在Spark中处理它),您将不得不自己做一些额外的工作:设计专用表(或添加合成列),处理分区,保存偏移跟踪进度等

Cassandra is ok for time series data, but in your case I would consider just using streaming solution (like Kafka) instead of inventing it.

Cassandra适用于时间序列数据,但在您的情况下,我会考虑使用流式解决方案(如Kafka)而不是发明它。

#2


0  

I agree with what Ralkie stated but wanted to propose one more solution if you're tied to C* with this use case. This solution assumes you have full control over the schema and ingest as well. This is not a streaming solution though it could awkwardly be shoehorned into one.

我同意Ralkie的观点,但是如果你在这个用例中与C *联系在一起,我想提出一个更多的解决方案。此解决方案假设您可以完全控制架构并进行摄取。这不是一个流媒体解决方案,虽然它可能会被笨拙地分成一个。

Have you considered using composite key composed of the timebucket along with a murmur_hash_of_one_or_more_clustering_columns % some_int_designed_limit_row_width? In this way, you could set your timebuckets to 1 minute, 5 minutes, 1 hour, etc depending on how "real-time" you need to analyze/archive your data. The murmur hash based off of one or more of the clustering columns is needed to help located data in the C* cluster (and is a terrible solution if you're often looking up specific clustering columns).

您是否考虑过使用由timebucket组成的复合键以及murmur_hash_of_one_or_more_clustering_columns%some_int_designed_limit_row_width?通过这种方式,您可以将时间设置为1分钟,5分钟,1小时等,具体取决于分析/存档数据所需的“实时”。需要基于一个或多个聚类列的杂音散列来帮助定位C *集群中的数据(如果您经常查找特定的聚类列,这是一个非常糟糕的解决方案)。

For example, take an IoT use case where sensors report in every minute and have some sensor reading that can be represented as an integer.

例如,采用IoT用例,其中传感器每分钟报告一次,并且有一些传感器读数可以表示为整数。

create table if not exists iottable {
  timebucket bigint,
  sensorbucket int,
  sensorid varchar,
  sensorvalue int,
  primary key ((timebucket, sensorbucket), sensorid)
} with caching = 'none'
   and compaction = { 'class': 'com.jeffjirsa.cassandra.db.compaction.TimeWindowedCompaction' };

Note the use of TimeWindowedCompaction. I'm not sure what version of C* you're using; but with the 2.x series, I'd stay away from DateTieredCompaction. I cannot speak to how well it performs in 3.x. Any any rate, you should test and benchmark extensively before settling on your schema and compaction strategy.

注意使用TimeWindowedCompaction。我不确定你使用的是什么版本的C *;但是对于2.x系列,我会远离DateTieredCompaction。我不能说它在3.x中的表现如何。无论如何,在确定模式和压缩策略之前,您应该进行广泛的测试和基准测试。

Also note that this schema could result in hotspotting as it is vulnerable to sensors that report more often than others. Again, not knowing the use case it's hard to provide a perfect solution -- it's just an example. If you don't care about ever reading C* for a specific sensor (or column), you don't have to use a clustering column at all and you can simply use a timeUUID or something random for the murmur hash bucketing.

另请注意,此架构可能会导致热点,因为它比其他传感器更容易报告。同样,不知道用例很难提供完美的解决方案 - 这只是一个例子。如果您不关心为特定传感器(或列)读取C *,则根本不需要使用聚类列,您只需使用timeUUID或随机的杂音哈希存储区。

Regardless of how you decide to partition the data, a schema like this would then allow you to use repartitionByCassandraReplica and joinWithCassandraTable to extract the data written during a given timebucket.

无论您决定如何对数据进行分区,这样的模式都可以让您使用repartitionByCassandraReplica和joinWithCassandraTable来提取在给定timebucket期间写入的数据。

#1


0  

I don't think Apache Cassandra has this functionality out of the box. Internally [for some period of time] it stores all operations on data in sequential manner, but it's per node and it gets compacted eventually (to save space). Frankly, Cassandra's (as most other DB's) promise is to provide latest view of data (which by itself can be quite tricky in distributed environment), but not full history of how data was changing.

我不认为Apache Cassandra具有开箱即用的功能。在内部[在一段时间内]它以顺序方式将所有操作存储在数据上,但它是每个节点并最终被压缩(以节省空间)。坦率地说,Cassandra(与大多数其他数据库一样)的承诺是提供最新的数据视图(这在分布式环境中本身可能非常棘手),但不是数据如何变化的完整历史记录。

So if you still want to have such info in Cassandra (and process it in Spark), you'll have to do some additional work yourself: design dedicated table(s) (or add synthetic columns), take care of partitioning, save offset to keep track of progress, etc.

因此,如果您仍希望在Cassandra中获得此类信息(并在Spark中处理它),您将不得不自己做一些额外的工作:设计专用表(或添加合成列),处理分区,保存偏移跟踪进度等

Cassandra is ok for time series data, but in your case I would consider just using streaming solution (like Kafka) instead of inventing it.

Cassandra适用于时间序列数据,但在您的情况下,我会考虑使用流式解决方案(如Kafka)而不是发明它。

#2


0  

I agree with what Ralkie stated but wanted to propose one more solution if you're tied to C* with this use case. This solution assumes you have full control over the schema and ingest as well. This is not a streaming solution though it could awkwardly be shoehorned into one.

我同意Ralkie的观点,但是如果你在这个用例中与C *联系在一起,我想提出一个更多的解决方案。此解决方案假设您可以完全控制架构并进行摄取。这不是一个流媒体解决方案,虽然它可能会被笨拙地分成一个。

Have you considered using composite key composed of the timebucket along with a murmur_hash_of_one_or_more_clustering_columns % some_int_designed_limit_row_width? In this way, you could set your timebuckets to 1 minute, 5 minutes, 1 hour, etc depending on how "real-time" you need to analyze/archive your data. The murmur hash based off of one or more of the clustering columns is needed to help located data in the C* cluster (and is a terrible solution if you're often looking up specific clustering columns).

您是否考虑过使用由timebucket组成的复合键以及murmur_hash_of_one_or_more_clustering_columns%some_int_designed_limit_row_width?通过这种方式,您可以将时间设置为1分钟,5分钟,1小时等,具体取决于分析/存档数据所需的“实时”。需要基于一个或多个聚类列的杂音散列来帮助定位C *集群中的数据(如果您经常查找特定的聚类列,这是一个非常糟糕的解决方案)。

For example, take an IoT use case where sensors report in every minute and have some sensor reading that can be represented as an integer.

例如,采用IoT用例,其中传感器每分钟报告一次,并且有一些传感器读数可以表示为整数。

create table if not exists iottable {
  timebucket bigint,
  sensorbucket int,
  sensorid varchar,
  sensorvalue int,
  primary key ((timebucket, sensorbucket), sensorid)
} with caching = 'none'
   and compaction = { 'class': 'com.jeffjirsa.cassandra.db.compaction.TimeWindowedCompaction' };

Note the use of TimeWindowedCompaction. I'm not sure what version of C* you're using; but with the 2.x series, I'd stay away from DateTieredCompaction. I cannot speak to how well it performs in 3.x. Any any rate, you should test and benchmark extensively before settling on your schema and compaction strategy.

注意使用TimeWindowedCompaction。我不确定你使用的是什么版本的C *;但是对于2.x系列,我会远离DateTieredCompaction。我不能说它在3.x中的表现如何。无论如何,在确定模式和压缩策略之前,您应该进行广泛的测试和基准测试。

Also note that this schema could result in hotspotting as it is vulnerable to sensors that report more often than others. Again, not knowing the use case it's hard to provide a perfect solution -- it's just an example. If you don't care about ever reading C* for a specific sensor (or column), you don't have to use a clustering column at all and you can simply use a timeUUID or something random for the murmur hash bucketing.

另请注意,此架构可能会导致热点,因为它比其他传感器更容易报告。同样,不知道用例很难提供完美的解决方案 - 这只是一个例子。如果您不关心为特定传感器(或列)读取C *,则根本不需要使用聚类列,您只需使用timeUUID或随机的杂音哈希存储区。

Regardless of how you decide to partition the data, a schema like this would then allow you to use repartitionByCassandraReplica and joinWithCassandraTable to extract the data written during a given timebucket.

无论您决定如何对数据进行分区,这样的模式都可以让您使用repartitionByCassandraReplica和joinWithCassandraTable来提取在给定timebucket期间写入的数据。