I have a class in which I am populating a map liveSocketsByDatacenter
from a single background thread every 30 seconds and then I have a method getNextSocket
which will be called by multiple reader threads to get a live socket available which uses the same map to get this info.
我有一个类,我每30秒从一个后台线程填充一个地图liveSocketsByDatacenter然后我有一个方法getNextSocket,它将由多个读者线程调用以获得一个可用的实时套接字,它使用相同的地图来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
public Optional<SocketHolder> getNextSocket() {
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
private void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
boolean status = SendToSocket.getInstance().execute(3, holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
}
}
}
As you can see in my above class:
正如你在我上面的课程中看到的那样:
- From a single background thread which runs every 30 seconds, I populate
liveSocketsByDatacenter
map with all the live sockets. - 从每隔30秒运行一个后台线程,我用所有实时套接字填充liveSocketsByDatacenter映射。
- And then from multiple threads, I call
getNextSocket
method to give me live socket available which usesliveSocketsByDatacenter
map to get the required information. - 然后从多个线程,我调用getNextSocket方法给我实时套接字可用,它使用liveSocketsByDatacenter映射来获取所需的信息。
Is my above code thread safe and all the reader threads will see liveSocketsByDatacenter
accurately? Since I am modifying liveSocketsByDatacenter
map every 30 seconds from a single background thread and then from a lot of reader threads, I am calling getNextSocket
method so I am not sure if I did anything wrong here.
我上面的代码线程是否安全,所有读者线程都会准确地看到liveSocketsByDatacenter?由于我每隔30秒从一个后台线程修改liveSocketsByDatacenter映射,然后从很多读取器线程修改,我调用了getNextSocket方法,所以我不确定这里是否有任何错误。
It looks like there might be a thread safety issue in my "getLiveSocket" method as every read gets a shared ArrayList
out of the map and shuffles it? And there might be few more places as well which I might have missed. What is the best way to fix these thread safety issues in my code?
看起来我的“getLiveSocket”方法中可能存在线程安全问题,因为每次读取都会从地图中获取共享的ArrayList并将其洗牌?而且可能还有一些我可能错过的地方。在我的代码中解决这些线程安全问题的最佳方法是什么?
If there is any better way to rewrite this, then I am open for that as well.
如果有更好的方法来重写这个,那么我也是开放的。
4 个解决方案
#1
1
To be thread-safe, your code must synchronize any access to all shared mutable state.
为了线程安全,您的代码必须同步对所有共享可变状态的任何访问。
Here you share liveSocketsByDatacenter
, an instance of HashMap
a non thread-safe implementation of a Map
that can potentially be concurrently read (by updateLiveSockets
and getNextSocket
) and modified (by connectToZMQSockets
and updateLiveSockets
) without synchronizing any access which is already enough to make your code non thread safe. Moreover, the values of this Map
are instances of ArrayList
a non thread-safe implementation of a List
that can also potentially be concurrently read (by getNextSocket
and updateLiveSockets
) and modified (by getLiveSocket
more precisely by Collections.shuffle
).
在这里,您共享liveSocketsByDatacenter,HashMap的一个实例,一个非线程安全的Map实现,可以同时读取(通过updateLiveSockets和getNextSocket)并修改(通过connectToZMQSockets和updateLiveSockets),而无需同步任何已经足以生成代码的访问权限非线程安全。此外,此Map的值是ArrayList的实例,List的非线程安全实现,也可以同时读取(通过getNextSocket和updateLiveSockets)并修改(通过Collections.shuffle更精确地通过getLiveSocket)。
The simple way to fix your 2 thread safety issues could be to:
解决2线程安全问题的简单方法可能是:
- use a
ConcurrentHashMap
instead of aHashMap
for your variableliveSocketsByDatacenter
as it is a natively thread safe implementation of aMap
. - 为您的变量liveSocketsByDatacenter使用ConcurrentHashMap而不是HashMap,因为它是Map的本机线程安全实现。
- put the unmodifiable version of your
ArrayList
instances as value of your map usingCollections.unmodifiableList(List<? extends T> list)
, your lists would then be immutable so thread safe. - 使用Collections.unmodifiableList(List <?extends T> list)将您的ArrayList实例的不可修改版本作为地图的值,您的列表将是不可变的,因此线程安全。
For example:
例如:
liveSocketsByDatacenter.put(
entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`
- rewrite your method
getLiveSocket
to avoid callingCollections.shuffle
directly on your list, you could for example shuffle only the list of live sockets instead of all sockets or use a copy of your list (with for examplenew ArrayList<>(listOfEndPoints)
) instead of the list itself. - 重写你的方法getLiveSocket以避免直接在你的列表上调用Collections.shuffle,你可以例如只调整实时套接字列表而不是所有套接字或使用列表的副本(例如使用新的ArrayList <>(listOfEndPoints))列表本身。
For example:
例如:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
For #1 as you seem to frequently read and rarely (only once every 30 seconds) modify your map, you could consider to rebuild your map then share its immutable version (using Collections.unmodifiableMap(Map<? extends K,? extends V> m)
) every 30 seconds, this approach is very efficient in mostly read scenario as you no longer pay the price of any synchronization mechanism to access to the content of your map.
对于#1,你似乎经常阅读,很少(每30秒一次)修改你的地图,你可以考虑重建你的地图然后分享它的不可变版本(使用Collections.unmodifiableMap(Map <?extends K,?extends V> m))每30秒,这种方法在大多数读取方案中非常有效,因为您不再为访问地图内容的任何同步机制付出代价。
Your code would then be:
您的代码将是:
// Your variable is no more final, it is now volatile to ensure that all
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter
= Collections.unmodifiableMap(new HashMap<>());
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry :
socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(
entry.getKey(), entry.getValue(), ZMQ.PUSH
);
liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter;
...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter);
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
...
liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}
Your field liveSocketsByDatacenter
could also be of type AtomicReference<Map<Datacenters, List<SocketHolder>>>
, it would then be final
, your map will still be stored in a volatile
variable but within the class AtomicReference
.
您的字段liveSocketsByDatacenter也可以是AtomicReference类型
The previous code would then be:
之前的代码将是:
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
= new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
...
private void connectToZMQSockets() {
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
...
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
#2
1
As you can read in detail e.g. here, if multiple threads access a hash map concurrently, and at least one of the threads modifies the map structurally, it must be synchronized externally to avoid an inconsistent view of the contents. So to be thread safe you should use either Java Collections synchronizedMap() method or a ConcurrentHashMap.
你可以详细阅读,例如这里,如果多个线程同时访问哈希映射,并且至少有一个线程在结构上修改了映射,则必须在外部进行同步以避免内容的不一致视图。因此,为了保证线程安全,您应该使用Java Collections synchronizedMap()方法或ConcurrentHashMap。
//synchronizedMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.synchronizedMap(new HashMap<Datacenters, List<SocketHolder>>());
or
要么
//ConcurrentHashMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<Datacenters, List<SocketHolder>>();
As you have very highly concurrent application modifying and reading key value in different threads, you should also have a look at the Producer-Consumer principle, e.g. here.
由于您在不同的线程中具有非常高并发的应用程序修改和读取键值,因此您还应该查看Producer-Consumer原则,例如:这里。
#3
1
It seems, that you can safely use ConcurrentHashMap
here instead of regular HashMap
and it should work.
看来,你可以安全地在这里使用ConcurrentHashMap而不是常规的HashMap,它应该可以工作。
In your current approach, using regular HashMap, you need to have synchronization of methods:
在您当前的方法中,使用常规HashMap,您需要同步方法:
getNextSocket
, connectToZMQSockets
and updateLiveSockets
(everywhere you update or read the HashMap) like a sychronized
word before those methods or other lock on a monitor common for all these methods - And this is not because of ConcurrentModificationException
, but because without synchornization reading threads can see not updated values.
getNextSocket,connectToZMQSockets和updateLiveSockets(你更新或读取HashMap的所有地方)就像这些方法之前的同步字或所有这些方法共同的监视器上的其他锁 - 这不是因为ConcurrentModificationException,而是因为没有同步化读取线程可以看不到更新的值。
There is also problem with concurrent modification in the getLiveSocket, one of the simplest way to avoid this problem is to copy the listOfEndpoints to a new list before shuffle, like this:
getLiveSocket中的并发修改也存在问题,避免此问题的最简单方法之一是在shuffle之前将listOfEndpoints复制到新列表,如下所示:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> endPoints) {
List<SocketHolder> listOfEndPoints = new ArrayList<SocketHolder>(endPoints);
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
#4
-1
Using ConcurrentHashMap should make your code threadsafe. Alternatively use synchronized methods to access existing hashmap.
使用ConcurrentHashMap应该使您的代码线程安全。或者,使用synchronized方法访问现有的hashmap。
#1
1
To be thread-safe, your code must synchronize any access to all shared mutable state.
为了线程安全,您的代码必须同步对所有共享可变状态的任何访问。
Here you share liveSocketsByDatacenter
, an instance of HashMap
a non thread-safe implementation of a Map
that can potentially be concurrently read (by updateLiveSockets
and getNextSocket
) and modified (by connectToZMQSockets
and updateLiveSockets
) without synchronizing any access which is already enough to make your code non thread safe. Moreover, the values of this Map
are instances of ArrayList
a non thread-safe implementation of a List
that can also potentially be concurrently read (by getNextSocket
and updateLiveSockets
) and modified (by getLiveSocket
more precisely by Collections.shuffle
).
在这里,您共享liveSocketsByDatacenter,HashMap的一个实例,一个非线程安全的Map实现,可以同时读取(通过updateLiveSockets和getNextSocket)并修改(通过connectToZMQSockets和updateLiveSockets),而无需同步任何已经足以生成代码的访问权限非线程安全。此外,此Map的值是ArrayList的实例,List的非线程安全实现,也可以同时读取(通过getNextSocket和updateLiveSockets)并修改(通过Collections.shuffle更精确地通过getLiveSocket)。
The simple way to fix your 2 thread safety issues could be to:
解决2线程安全问题的简单方法可能是:
- use a
ConcurrentHashMap
instead of aHashMap
for your variableliveSocketsByDatacenter
as it is a natively thread safe implementation of aMap
. - 为您的变量liveSocketsByDatacenter使用ConcurrentHashMap而不是HashMap,因为它是Map的本机线程安全实现。
- put the unmodifiable version of your
ArrayList
instances as value of your map usingCollections.unmodifiableList(List<? extends T> list)
, your lists would then be immutable so thread safe. - 使用Collections.unmodifiableList(List <?extends T> list)将您的ArrayList实例的不可修改版本作为地图的值,您的列表将是不可变的,因此线程安全。
For example:
例如:
liveSocketsByDatacenter.put(
entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`
- rewrite your method
getLiveSocket
to avoid callingCollections.shuffle
directly on your list, you could for example shuffle only the list of live sockets instead of all sockets or use a copy of your list (with for examplenew ArrayList<>(listOfEndPoints)
) instead of the list itself. - 重写你的方法getLiveSocket以避免直接在你的列表上调用Collections.shuffle,你可以例如只调整实时套接字列表而不是所有套接字或使用列表的副本(例如使用新的ArrayList <>(listOfEndPoints))列表本身。
For example:
例如:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
For #1 as you seem to frequently read and rarely (only once every 30 seconds) modify your map, you could consider to rebuild your map then share its immutable version (using Collections.unmodifiableMap(Map<? extends K,? extends V> m)
) every 30 seconds, this approach is very efficient in mostly read scenario as you no longer pay the price of any synchronization mechanism to access to the content of your map.
对于#1,你似乎经常阅读,很少(每30秒一次)修改你的地图,你可以考虑重建你的地图然后分享它的不可变版本(使用Collections.unmodifiableMap(Map <?extends K,?extends V> m))每30秒,这种方法在大多数读取方案中非常有效,因为您不再为访问地图内容的任何同步机制付出代价。
Your code would then be:
您的代码将是:
// Your variable is no more final, it is now volatile to ensure that all
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter
= Collections.unmodifiableMap(new HashMap<>());
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry :
socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(
entry.getKey(), entry.getValue(), ZMQ.PUSH
);
liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter;
...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter);
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
...
liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}
Your field liveSocketsByDatacenter
could also be of type AtomicReference<Map<Datacenters, List<SocketHolder>>>
, it would then be final
, your map will still be stored in a volatile
variable but within the class AtomicReference
.
您的字段liveSocketsByDatacenter也可以是AtomicReference类型
The previous code would then be:
之前的代码将是:
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
= new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
...
private void connectToZMQSockets() {
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
...
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
#2
1
As you can read in detail e.g. here, if multiple threads access a hash map concurrently, and at least one of the threads modifies the map structurally, it must be synchronized externally to avoid an inconsistent view of the contents. So to be thread safe you should use either Java Collections synchronizedMap() method or a ConcurrentHashMap.
你可以详细阅读,例如这里,如果多个线程同时访问哈希映射,并且至少有一个线程在结构上修改了映射,则必须在外部进行同步以避免内容的不一致视图。因此,为了保证线程安全,您应该使用Java Collections synchronizedMap()方法或ConcurrentHashMap。
//synchronizedMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.synchronizedMap(new HashMap<Datacenters, List<SocketHolder>>());
or
要么
//ConcurrentHashMap
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<Datacenters, List<SocketHolder>>();
As you have very highly concurrent application modifying and reading key value in different threads, you should also have a look at the Producer-Consumer principle, e.g. here.
由于您在不同的线程中具有非常高并发的应用程序修改和读取键值,因此您还应该查看Producer-Consumer原则,例如:这里。
#3
1
It seems, that you can safely use ConcurrentHashMap
here instead of regular HashMap
and it should work.
看来,你可以安全地在这里使用ConcurrentHashMap而不是常规的HashMap,它应该可以工作。
In your current approach, using regular HashMap, you need to have synchronization of methods:
在您当前的方法中,使用常规HashMap,您需要同步方法:
getNextSocket
, connectToZMQSockets
and updateLiveSockets
(everywhere you update or read the HashMap) like a sychronized
word before those methods or other lock on a monitor common for all these methods - And this is not because of ConcurrentModificationException
, but because without synchornization reading threads can see not updated values.
getNextSocket,connectToZMQSockets和updateLiveSockets(你更新或读取HashMap的所有地方)就像这些方法之前的同步字或所有这些方法共同的监视器上的其他锁 - 这不是因为ConcurrentModificationException,而是因为没有同步化读取线程可以看不到更新的值。
There is also problem with concurrent modification in the getLiveSocket, one of the simplest way to avoid this problem is to copy the listOfEndpoints to a new list before shuffle, like this:
getLiveSocket中的并发修改也存在问题,避免此问题的最简单方法之一是在shuffle之前将listOfEndpoints复制到新列表,如下所示:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> endPoints) {
List<SocketHolder> listOfEndPoints = new ArrayList<SocketHolder>(endPoints);
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
#4
-1
Using ConcurrentHashMap should make your code threadsafe. Alternatively use synchronized methods to access existing hashmap.
使用ConcurrentHashMap应该使您的代码线程安全。或者,使用synchronized方法访问现有的hashmap。