可扩展的方式来访问ConcurrentHashMap的每个元素一次

时间:2022-10-29 13:47:55

I have 32 machine threads and one ConcurrentHashMap<Key,Value> map, which contains a lot of keys. Key has defined a public method visit(). I want to visit() every element of map exactly once using the processing power I have available and possibly some sort of thread pooling.

我有32个机器线程和一个ConcurrentHashMap 映射,其中包含许多键。 Key定义了一个公共方法visit()。我想使用我可用的处理能力和可能的某种线程池来访问()地图的每个元素。 ,value>

Things I could try:

我可以尝试的事情:

  • I could use the method map.keys(). The resulting Enumeration<Key> could be iterated over using nextElement(), but since a call to key.visit() is very brief I won't manage to keep threads busy. The Enumeration is inherently single-threaded.
  • 我可以使用方法map.keys()。生成的Enumeration 可以使用nextElement()进行迭代,但由于对key.visit()的调用非常简短,因此我无法保持线程忙。枚举本质上是单线程的。
  • I could use a synchronised HashSet<Key> instead, invoke a method toArray() and split the work on the array into all 32 threads. I seriously doubt in this solution, since the method toArray() will likely be a single-thread bottle-neck.
  • 我可以使用同步的HashSet ,调用方法toArray()并将数组上的工作拆分为所有32个线程。我严重怀疑这个解决方案,因为方法toArray()很可能是一个单线程的瓶颈。
  • I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment<K,V>, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.
  • 我可以尝试从ConcurrentHashMap继承,获取其内部段 的实例,尝试将它们分组为32个组并分别处理每个组。这听起来像是一个硬核方法。 ,v>
  • or similar magic with Enumeration<Key>.
  • 或枚举 的类似魔法。

Ideally:

理想的情况是:

  • Ideally a ConcurrentHashMap<Key, Value> would define a method keysEnumerator(int approximatePosition), which could drop me an enumerator missing approximately first 1/32 elements, i.e. map.keysEnumerator(map.size()/32)
  • 理想情况下,ConcurrentHashMap 将定义一个方法keysEnumerator(int approximatePosition),这可能会让我失去大约前1/32元素的枚举器,即map.keysEnumerator(map.size()/ 32) ,value>

Am I missing anything obvious? Has anybody run into similar problem before?

我错过了什么明显的东西?以前有人遇到过类似的问题吗?

EDIT

编辑

I've had a go at profiling to see whether this problem is actually going to affect the performance in practice. As I don't have access to the cluster at the moment I used my laptop and tried to extrapolate the results to a bigger dataset. On my machine I can create a 2 million keys ConcurrentHashMap and it takes about 1 second to iterate over it invoking the visit() method on every key. The program is supposed to scale to 85 million keys (and over). The cluster's processor is slightly faster, but it still should take about 40 seconds to iterate over entire map. Now a few words about the logic flow of the program. The logic presented is sequential, i.e. it is not allowed for any thread to proceed to the next step until all the threads in the previous step are finished:

我已经进行了剖析,看看这个问题是否真的会影响实践中的性能。由于我目前无法访问群集,因此我使用笔记本电脑并尝试将结果外推到更大的数据集。在我的机器上,我可以创建一个200万个密钥ConcurrentHashMap,它需要大约1秒来迭代调用每个键上的visit()方法。该程序应该扩展到8500万键(及以上)。群集的处理器稍微快一点,但仍需要大约40秒来迭代整个地图。现在谈谈程序的逻辑流程。提供的逻辑是顺序的,即任何线程都不允许进行下一步,直到上一步中的所有线程都完成为止:

  1. Create the hash map, create the keys and populate the hash map
  2. 创建哈希映射,创建密钥并填充哈希映射
  3. Iterate over entire hash map visiting all the keys.
  4. 迭代访问所有键的整个哈希映射。
  5. Do some data shuffling which is parallel insertions and deletions.
  6. 做一些数据混洗,即并行插入和删除。
  7. Repeat step 2 and 3 a few hundred times.
  8. 重复步骤2和3几百次。

That logic flow means that a 40 second iteration is going to be repeated a few hundred times, say 100. Which gives us a bit over an hour spent just in visiting the nodes. With a set of 32 parallel iterators it could go down to just a few minutes, which is a significant performance improvement.

这个逻辑流意味着40秒的迭代将重复几百次,比如说100次。这让我们在访问节点时花了一个多小时。使用一组32个并行迭代器,它可以降低到几分钟,这是一个显着的性能改进。

Now a few words on how ConcurrentHashMap works (Or how I believe it works). Every ConcurrentHashMap consists of segments (by default 16). Every write to a hash map is synchronised on a relevant segment. So say we're trying to write two new keys k1 and k2 to the hash map and that they would be resolved to belong to the same segment, say s1. If they are attempted to be written simultaneously, one of them is going to acquire the lock first and be added earlier then the other. What is the chance of two elements to be resolved to belong to the same segment? In case we have got a good hash function and 16 segements it is 1/16.

现在谈谈ConcurrentHashMap如何工作(或者我相信它是如何工作的)。每个ConcurrentHashMap都包含段(默认为16)。对哈希映射的每次写入都在相关段上同步。所以说我们试图将两个新密钥k1和k2写入哈希映射,并且它们将被解析为属于同一个段,比如s1。如果试图同时写入它们,其中一个将首先获得锁定,然后再添加到另一个锁定。两个元素被解析为属于同一个细分的可能性是多少?如果我们有一个很好的哈希函数和16个segements它是1/16。

I belive that ConcurrentHashMap should have a method concurrentKeys(), which would return an array of Enumerations, one per each segment. I have got a few ideas how to add it to ConcurrentHashMap through inheritance and I'll let you know if I succeed. As for now the solution seems to be to create an array of ConcurrentHashMaps and pre-hashing every key to resolve to one member of such array. I'll share that code as well, once it's ready.

我相信ConcurrentHashMap应该有一个方法concurrentKeys(),它将返回一个Enumerations数组,每个段一个。我有一些想法如何通过继承将它添加到ConcurrentHashMap,如果我成功,我会告诉你。至于现在,解决方案似乎是创建一个ConcurrentHashMaps数组并预先散列每个键以解析为此类数组的一个成员。一旦准备就绪,我也会分享这些代码。

EDIT

编辑

This is the same problem in a different language:

这是另一种语言中的同一问题:

Parallel Iterators

并行迭代器

3 个解决方案

#1


3  

I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.

我可以尝试从ConcurrentHashMap继承,获取其内部Segment的实例,尝试将它们分组为32个组并分别处理每个组。这听起来像是一个硬核方法。

Hardcore indeed but about the only thing I would see that would work. toArray() builds the array by doing an enumeration so no win there. I can't believe that a synchronized HashSet would be better unless the ratio of visit() runs to other map operations is pretty high.

硬核确实,但我唯一能看到它会起作用。 toArray()通过枚举来构建数组,因此没有胜利。我无法相信同步HashSet会更好,除非visit()运行与其他map操作的比率相当高。

The problem with the working with the Segments is that you are going to have to be extremely careful that your code is resilient because I assume other threads may be altering the table at the same time you are visiting the nodes and you need to avoid the inevitable race conditions. Delicate for sure.

使用Segments的问题在于,您必须非常小心您的代码是有弹性的,因为我假设其他线程可能在您访问节点的同时更改表,并且您需要避免不可避免的竞争条件。确实很精致。

The big question in my mind is if this is necessary? Has a profiler or timing runs shown to you that this is taking too long to visit() each of the keys in one thread? Have you tried to do a thread-pool for each visit() call and have one thread doing the enumeration and the pool threads doing the visit()?

我心中最大的问题是,这是否有必要?是否有一个探查器或计时器运行显示,访问()一个线程中的每个键需要太长时间?您是否尝试为每个visit()调用执行一个线程池,并让一个线程执行枚举,并且池线程执行visit()?

#2


2  

If I were you I'd just try iterating the key set of ConcurrentHashMap first. You could try passing the processing of keys off to a thread pool (in bundles, if the task is too light weight), or even to a ForkJoin task but you should do that only if it's really necessary.

如果我是你,我会先尝试迭代ConcurrentHashMap的密钥集。您可以尝试将密钥处理传递给线程池(在捆绑中,如果任务太轻),或者甚至是ForkJoin任务,但只有在真正需要时才应该这样做。

Having said that you could use a ConcurrentSkipListMap, in which you can get a NavigableSet of keys. You can then take out partitions from this by using the subSet method. However, ConcurrentHashMap would have better performance for put, get operations (note also it would use compareTo rather than hashCode). Situations where this is better seems pretty unlikely.

说过你可以使用ConcurrentSkipListMap,你可以在其中获得一个Navigable键集。然后,您可以使用subSet方法从中取出分区。但是,ConcurrentHashMap对于put,get操作会有更好的性能(注意它也会使用compareTo而不是hashCode)。这种情况好转的情况似乎不太可能。

#3


0  

The solution I will eventually go for is an array of ConcurrentHashMaps instead of one ConcurrentHashMap. This is ad hoc, but seems to be relevant for my usecase. I don't care about the second step being slow as it doesn't affect my code's performance. The solution is:

我最终将要使用的解决方案是ConcurrentHashMaps数组,而不是一个ConcurrentHashMap。这是临时的,但似乎与我的用例相关。我不关心第二步是否缓慢,因为它不会影响我的代码性能。解决方案是:

Object Creation:

对象创建:

  1. Create an array of size t of ConcurrentHashMaps, where t is a number of threads.
  2. 创建一个大小为ConcurrentHashMaps的数组,其中t是多个线程。
  3. Create an array of Runnables, also of size t.
  4. 创建一个Runnables数组,大小为t。

Array Population (single threaded, not an issue):

数组人口(单线程,不是问题):

  1. Create the keys and apply pre-hash function, which will return an int in the range 0 ... t-1. In my case simply modulo t.
  2. 创建密钥并应用预哈希函数,它将返回0 ... t-1范围内的int。在我的情况下,简单地模数。
  3. Put the key in the hashmap, by accessing appropriate entry in the array. E.g. if the pre-hash has resulted in index 4, then go for hashArray[4].put(key)
  4. 通过访问数组中的相应条目,将密钥放入散列映射中。例如。如果预哈希导致索引4,则转到hashArray [4] .put(key)

Array Iteration (nicely multithreaded, performance gain):

数组迭代(很好的多线程,性能增益):

  1. Assign every thread from Runnables array a job of iterating over the hashmap with a corresponding index. This should give give a t times shorter iteration as opposed to single threaded.
  2. 从Runnables数组中为每个线程分配一个使用相应索引迭代hashmap的工作。与单线程相比,这应该给出t次更短的迭代次数。

To see the proof of concept code (as it's got some dependencies from the project I can't post it here) head towards my project on github

要查看概念证明代码(因为它有一些来自项目的依赖项,我不能在这里发布)前往github上的项目

EDIT

编辑

Actually, implementing the above proof of concept for my system has proven to be time-consuming, bug-prone and grossly disappointing. Additionally I've discovered I would have missed many features of the standard library ConcurrentHashMap. The solution I have been exploring recently, which looks much less ad-hoc and much more promising is to use Scala, which produces bytecode that is fully interoperable with Java. The proof of concept relies on stunning library described in this paper and AFAIK it is currently IMPOSSIBLE to achieve a corresponding solution in vanilla Java without writing thousands lines of code, given the current state of the standard library and corresponding third-party libraries.

实际上,为我的系统实现上述概念证明已经证明是耗时的,容易出错并且非常令人失望。另外我发现我会错过标准库ConcurrentHashMap的许多功能。我最近一直在探索的解决方案,它看起来不那么特别且更有希望使用Scala,它可以生成与Java完全互操作的字节码。概念验证依赖于本文中描述的令人惊叹的库和AFAIK,鉴于标准库和相应的第三方库的当前状态,目前不可能在没有编写数千行代码的情况下在vanilla Java中实现相应的解决方案。

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}

#1


3  

I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.

我可以尝试从ConcurrentHashMap继承,获取其内部Segment的实例,尝试将它们分组为32个组并分别处理每个组。这听起来像是一个硬核方法。

Hardcore indeed but about the only thing I would see that would work. toArray() builds the array by doing an enumeration so no win there. I can't believe that a synchronized HashSet would be better unless the ratio of visit() runs to other map operations is pretty high.

硬核确实,但我唯一能看到它会起作用。 toArray()通过枚举来构建数组,因此没有胜利。我无法相信同步HashSet会更好,除非visit()运行与其他map操作的比率相当高。

The problem with the working with the Segments is that you are going to have to be extremely careful that your code is resilient because I assume other threads may be altering the table at the same time you are visiting the nodes and you need to avoid the inevitable race conditions. Delicate for sure.

使用Segments的问题在于,您必须非常小心您的代码是有弹性的,因为我假设其他线程可能在您访问节点的同时更改表,并且您需要避免不可避免的竞争条件。确实很精致。

The big question in my mind is if this is necessary? Has a profiler or timing runs shown to you that this is taking too long to visit() each of the keys in one thread? Have you tried to do a thread-pool for each visit() call and have one thread doing the enumeration and the pool threads doing the visit()?

我心中最大的问题是,这是否有必要?是否有一个探查器或计时器运行显示,访问()一个线程中的每个键需要太长时间?您是否尝试为每个visit()调用执行一个线程池,并让一个线程执行枚举,并且池线程执行visit()?

#2


2  

If I were you I'd just try iterating the key set of ConcurrentHashMap first. You could try passing the processing of keys off to a thread pool (in bundles, if the task is too light weight), or even to a ForkJoin task but you should do that only if it's really necessary.

如果我是你,我会先尝试迭代ConcurrentHashMap的密钥集。您可以尝试将密钥处理传递给线程池(在捆绑中,如果任务太轻),或者甚至是ForkJoin任务,但只有在真正需要时才应该这样做。

Having said that you could use a ConcurrentSkipListMap, in which you can get a NavigableSet of keys. You can then take out partitions from this by using the subSet method. However, ConcurrentHashMap would have better performance for put, get operations (note also it would use compareTo rather than hashCode). Situations where this is better seems pretty unlikely.

说过你可以使用ConcurrentSkipListMap,你可以在其中获得一个Navigable键集。然后,您可以使用subSet方法从中取出分区。但是,ConcurrentHashMap对于put,get操作会有更好的性能(注意它也会使用compareTo而不是hashCode)。这种情况好转的情况似乎不太可能。

#3


0  

The solution I will eventually go for is an array of ConcurrentHashMaps instead of one ConcurrentHashMap. This is ad hoc, but seems to be relevant for my usecase. I don't care about the second step being slow as it doesn't affect my code's performance. The solution is:

我最终将要使用的解决方案是ConcurrentHashMaps数组,而不是一个ConcurrentHashMap。这是临时的,但似乎与我的用例相关。我不关心第二步是否缓慢,因为它不会影响我的代码性能。解决方案是:

Object Creation:

对象创建:

  1. Create an array of size t of ConcurrentHashMaps, where t is a number of threads.
  2. 创建一个大小为ConcurrentHashMaps的数组,其中t是多个线程。
  3. Create an array of Runnables, also of size t.
  4. 创建一个Runnables数组,大小为t。

Array Population (single threaded, not an issue):

数组人口(单线程,不是问题):

  1. Create the keys and apply pre-hash function, which will return an int in the range 0 ... t-1. In my case simply modulo t.
  2. 创建密钥并应用预哈希函数,它将返回0 ... t-1范围内的int。在我的情况下,简单地模数。
  3. Put the key in the hashmap, by accessing appropriate entry in the array. E.g. if the pre-hash has resulted in index 4, then go for hashArray[4].put(key)
  4. 通过访问数组中的相应条目,将密钥放入散列映射中。例如。如果预哈希导致索引4,则转到hashArray [4] .put(key)

Array Iteration (nicely multithreaded, performance gain):

数组迭代(很好的多线程,性能增益):

  1. Assign every thread from Runnables array a job of iterating over the hashmap with a corresponding index. This should give give a t times shorter iteration as opposed to single threaded.
  2. 从Runnables数组中为每个线程分配一个使用相应索引迭代hashmap的工作。与单线程相比,这应该给出t次更短的迭代次数。

To see the proof of concept code (as it's got some dependencies from the project I can't post it here) head towards my project on github

要查看概念证明代码(因为它有一些来自项目的依赖项,我不能在这里发布)前往github上的项目

EDIT

编辑

Actually, implementing the above proof of concept for my system has proven to be time-consuming, bug-prone and grossly disappointing. Additionally I've discovered I would have missed many features of the standard library ConcurrentHashMap. The solution I have been exploring recently, which looks much less ad-hoc and much more promising is to use Scala, which produces bytecode that is fully interoperable with Java. The proof of concept relies on stunning library described in this paper and AFAIK it is currently IMPOSSIBLE to achieve a corresponding solution in vanilla Java without writing thousands lines of code, given the current state of the standard library and corresponding third-party libraries.

实际上,为我的系统实现上述概念证明已经证明是耗时的,容易出错并且非常令人失望。另外我发现我会错过标准库ConcurrentHashMap的许多功能。我最近一直在探索的解决方案,它看起来不那么特别且更有希望使用Scala,它可以生成与Java完全互操作的字节码。概念验证依赖于本文中描述的令人惊叹的库和AFAIK,鉴于标准库和相应的第三方库的当前状态,目前不可能在没有编写数千行代码的情况下在vanilla Java中实现相应的解决方案。

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}