I have a thread called T1
for reading a flat file and parsing it. I need to create a new thread called T2
for parsing some part of this file and later this T2
thread would need to update the status of the original entity, which is also being parsed and updated by the original thread T1
.How can I handle this situation?
我有一个名为T1的线程用于读取平面文件并解析它。我需要创建一个名为T2的新线程来解析这个文件的某些部分,之后这个T2线程需要更新原始实体的状态,原始线程T1也会对其进行解析和更新。如何处理这个问题情况?
I receive a flat file having the below sample records:
我收到一个包含以下样本记录的平面文件:
AAAA
BBBB
AACC
BBCC
AADD
BBDD
First this file is saved in database in Received
status. Now all the records starting with BB
or with AA
need to be processed in a separate thread. Once it's successfully parsed, both threads will try to update the status of this file object in a database to Parsed
. In some cases, I get staleObjectException
. Edit: And the work done by any thread before the exception is lost. We are using optimistic locking. What is the best way of avoiding this problem?
首先,此文件以“已接收”状态保存在数据库中。现在,所有以BB或AA开头的记录都需要在一个单独的线程中处理。一旦成功解析,两个线程都会尝试将数据库中此文件对象的状态更新为Parsed。在某些情况下,我得到staleObjectException。编辑:在异常丢失之前,任何线程完成的工作。我们正在使用乐观锁定。避免这个问题的最佳方法是什么?
Possible hibernate exceptions when two threads update the same Object?
两个线程更新同一个对象时可能出现的hibernate异常?
The above post helps to understand some part of it, but it does not help to resolve my problem.
上面的帖子有助于理解它的某些部分,但它无助于解决我的问题。
3 个解决方案
#1
10
Part 1 - Your problem - the way I see it.
第1部分 - 你的问题 - 我看到它的方式。
The main reason for you receiving this exception is that you are using Hibernate with possibly optimistic locking. This basically tells you that either thread T1 or thread T2 have already updated the state to PARSED and now the other thread is holding old version of the row with smaller version than the one held in the database and trying to update the state to PARSED as well.
您收到此异常的主要原因是您正在使用可能乐观锁定的Hibernate。这基本上告诉你线程T1或线程T2已经将状态更新为PARSED,现在另一个线程持有旧版本的行,其版本比数据库中保存的版本小,并尝试将状态更新为PARSED 。
The big question here is "Are the two threads trying to preserve the same data?". If the answer to this question is yes then even if the last update succeed there shouldn't be any problem, because eventually they are updating the row to the same state. In that case you don't really need Optimistic locking because your data will, in any case be in sync.
这里最大的问题是“两个线程是否试图保留相同的数据?”。如果这个问题的答案是肯定的,那么即使最后一次更新成功,也应该没有任何问题,因为最终他们将行更新为相同的状态。在这种情况下,您实际上不需要乐观锁定,因为您的数据在任何情况下都是同步的。
The main problem comes if after the state is set to RECIEVED if the two threads T1 and T2 actually depending on one another when reseting to the next status. In that case you need to ensure that if T1 has executed first(or vice versa) T2 needs to refresh the data for the updated row and re-apply its changes based on the changes already pushed by T1. In this case the solution is the following. If you encounter staleObjectException you basically need to refresh your data from the database and restart your operation.
如果在重置到下一个状态时两个线程T1和T2实际上彼此依赖,则在状态设置为RECIEVED之后出现主要问题。在这种情况下,您需要确保如果T1已首先执行(反之亦然),T2需要刷新更新行的数据并根据T1已推送的更改重新应用其更改。在这种情况下,解决方案如下。如果遇到staleObjectException,则基本上需要从数据库刷新数据并重新启动操作。
Part 2 analysis on the link posted Possible hibernate exceptions when two threads update the same Object? Approach 1, this is more or less the last to update Wins situation. It more or less avoids the optimistic locking (the version counting). In case you don't have dependency from T1 to T2 or reverse in order to set status PARSED. This should be good.
第2部分关于链接的分析当两个线程更新同一个对象时,可能的hibernate异常?方法1,这或多或少是最后更新Wins的情况。它或多或少地避免了乐观锁定(版本计数)。如果您没有从T1到T2的依赖关系或反向以设置状态PARSED。这应该是好的。
****Aproach 2** Optimistic Locking** This is what you have now. The solution is to refresh the data and restart your operation.
**** Aproach 2 **乐观锁定**这就是你现在拥有的。解决方案是刷新数据并重新启动操作。
Aproach 3 Row level DB lock The solution here is more or less the same as for approach 2 with the small correction that the Pessimistic lock dure. The main difference is that in this case it may be a READ lock and you might not be even able to read the data from the database in order to refresh it if it is PESSIMISTIC READ.
Aproach 3行级DB锁定这里的解决方案与方法2的解决方案大致相同,只有悲观锁定的小修正。主要区别在于,在这种情况下,它可能是一个READ锁,如果它是PESSIMISTIC READ,您可能甚至无法从数据库中读取数据以刷新它。
Aproach 4 application level synchronization There are many different ways to do synchronization. One example would be to actually arrange all your updates in a BlockingQueue or JMS queue(if you want it to be persistent) and push all updates from a single thread. To visualize it a bit T1 and T2 will put elements on the Queue and there will be a single T3 thread reading operations and pushing them to the Database server.
Aproach 4应用程序级同步有许多不同的方法可以进行同步。一个示例是将所有更新实际安排在BlockingQueue或JMS队列中(如果您希望它是持久的)并从单个线程推送所有更新。为了使它可视化,T1和T2将把元素放在队列上,并且将有一个T3线程读取操作并将它们推送到数据库服务器。
If you use application level synchronization you should be aware that no all structures can be distributes in a multi-server deployment.
如果使用应用程序级同步,则应注意不能在多服务器部署中分发所有结构。
Well I can't think of anything else for now :)
好吧,我现在想不出别的:)
#2
3
I'm not certain I understand the question, but it seems it would constitute a logic error for a thread T1 which is only processing, for example, records beginning with AA to mark the entire file as "Parsed"? What happens if, for example, your application crashes after T1 updates but while T2 is still processing BB records? Some BB records are likely to be lost, correct?
我不确定我理解这个问题,但似乎它会构成一个线程T1的逻辑错误,它只处理,例如,以AA开头的记录将整个文件标记为“Parsed”?例如,如果您的应用程序在T1更新后崩溃但T2仍在处理BB记录时会发生什么?有些BB记录可能会丢失,对吗?
Anyhow, the crux of the issue is you have a race condition with two threads updating the same object. The stale object exception just means one of your threads lost the race. A better solution avoids a race entirely.
无论如何,问题的关键在于你有一个竞争条件,两个线程更新同一个对象。陈旧的对象异常只是意味着你的一个线程失去了竞争。更好的解决方案完全避免了比赛。
(I am assuming here that the individual record processing is idempotent, if that's not the case I think you have bigger problems as some failure modes will result in re-processing of records. If record processing has to happen once and only once, then you have a harder problem for which a message queue would probably be a better solution.)
(我在这里假设个别记录处理是幂等的,如果不是这种情况我认为你有更大的问题,因为一些失败模式将导致记录的重新处理。如果记录处理必须发生一次且只发生一次,那么你有一个更难的问题,消息队列可能是一个更好的解决方案。)
I would leverage the functionality of java.util.concurrent to dispatch records out to threaded workers, and have the thread interacting with hibernate block until all records have been processed, at which point that thread can mark the file as "Parsed".
我将利用java.util.concurrent的功能将记录分发给线程工作者,并让线程与hibernate块交互,直到所有记录都被处理完为止,此时该线程可以将文件标记为“Parsed”。
For example,
例如,
// do something like this during initialization, or use a Guava LoadingCache...
Map<RecordType, Executor> executors = new HashMap<>();
// note I'm assuming RecordType looks like an enum
executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor());
then as you process the file, you dispatch each record as follows, building up a list of futures corresponding to the status of the queued tasks. Let's assume successfully processing a record returns a boolean "true":
然后在处理文件时,按如下方式调度每条记录,构建与排队任务状态对应的期货列表。让我们假设成功处理一条记录返回一个布尔值“true”:
List<Future<Boolean>> tasks = new ArrayList<>();
for (Record record: file.getRecords()) {
Executor executorForRecord = executors.get(record.getRecordType());
tasks.add(executor.submit(new RecordProcessor(record)));
}
Now wait for all tasks to complete successfully - there are more elegant ways to do this, especially with Guava. Note you also need to deal with ExecutionException here if your task failed with an exception, I'm glossing over that here.
现在等待所有任务成功完成 - 有更优雅的方法来做到这一点,特别是与番石榴。请注意,如果您的任务因异常而失败,您还需要处理ExecutionException,我在此处对此进行了修改。
boolean allSuccess = true;
for (Future<Boolean> task: tasks) {
allSuccess = allSuccess && task.get();
if (!allSuccess) break;
}
// if all your tasks completed successfully, update the file record
if (allSuccess) {
file.setStatus("Parsed");
}
#3
2
Assuming that each thread T1,T2 will parse different parts of the file, means no one override the other thread parsing. the best thing is to decouple your parsing process from the DB commit.
假设每个线程T1,T2将解析文件的不同部分,意味着没有人覆盖其他线程解析。最好的办法是将解析过程与数据库提交分离。
T1, T2 will do the parsing T3 or Main Thread will do the commit after both T1,T2 has finished. and i think in this approach its more correct to change the file status to Parsed
only when both threads has finished.
T1,T2将执行解析T3或主线程将在T1,T2完成后执行提交。我认为在这种方法中,只有当两个线程都完成时才更正确地将文件状态更改为Parsed。
you can think of T3 as CommitService class which wait till T1,T2 finsih and then commit to DB
您可以将T3视为CommitService类,等待T1,T2 finsih然后提交到DB
CountDownLatch is a helpful tool to do it. and here is an Example
CountDownLatch是一个有用的工具。这是一个例子
#1
10
Part 1 - Your problem - the way I see it.
第1部分 - 你的问题 - 我看到它的方式。
The main reason for you receiving this exception is that you are using Hibernate with possibly optimistic locking. This basically tells you that either thread T1 or thread T2 have already updated the state to PARSED and now the other thread is holding old version of the row with smaller version than the one held in the database and trying to update the state to PARSED as well.
您收到此异常的主要原因是您正在使用可能乐观锁定的Hibernate。这基本上告诉你线程T1或线程T2已经将状态更新为PARSED,现在另一个线程持有旧版本的行,其版本比数据库中保存的版本小,并尝试将状态更新为PARSED 。
The big question here is "Are the two threads trying to preserve the same data?". If the answer to this question is yes then even if the last update succeed there shouldn't be any problem, because eventually they are updating the row to the same state. In that case you don't really need Optimistic locking because your data will, in any case be in sync.
这里最大的问题是“两个线程是否试图保留相同的数据?”。如果这个问题的答案是肯定的,那么即使最后一次更新成功,也应该没有任何问题,因为最终他们将行更新为相同的状态。在这种情况下,您实际上不需要乐观锁定,因为您的数据在任何情况下都是同步的。
The main problem comes if after the state is set to RECIEVED if the two threads T1 and T2 actually depending on one another when reseting to the next status. In that case you need to ensure that if T1 has executed first(or vice versa) T2 needs to refresh the data for the updated row and re-apply its changes based on the changes already pushed by T1. In this case the solution is the following. If you encounter staleObjectException you basically need to refresh your data from the database and restart your operation.
如果在重置到下一个状态时两个线程T1和T2实际上彼此依赖,则在状态设置为RECIEVED之后出现主要问题。在这种情况下,您需要确保如果T1已首先执行(反之亦然),T2需要刷新更新行的数据并根据T1已推送的更改重新应用其更改。在这种情况下,解决方案如下。如果遇到staleObjectException,则基本上需要从数据库刷新数据并重新启动操作。
Part 2 analysis on the link posted Possible hibernate exceptions when two threads update the same Object? Approach 1, this is more or less the last to update Wins situation. It more or less avoids the optimistic locking (the version counting). In case you don't have dependency from T1 to T2 or reverse in order to set status PARSED. This should be good.
第2部分关于链接的分析当两个线程更新同一个对象时,可能的hibernate异常?方法1,这或多或少是最后更新Wins的情况。它或多或少地避免了乐观锁定(版本计数)。如果您没有从T1到T2的依赖关系或反向以设置状态PARSED。这应该是好的。
****Aproach 2** Optimistic Locking** This is what you have now. The solution is to refresh the data and restart your operation.
**** Aproach 2 **乐观锁定**这就是你现在拥有的。解决方案是刷新数据并重新启动操作。
Aproach 3 Row level DB lock The solution here is more or less the same as for approach 2 with the small correction that the Pessimistic lock dure. The main difference is that in this case it may be a READ lock and you might not be even able to read the data from the database in order to refresh it if it is PESSIMISTIC READ.
Aproach 3行级DB锁定这里的解决方案与方法2的解决方案大致相同,只有悲观锁定的小修正。主要区别在于,在这种情况下,它可能是一个READ锁,如果它是PESSIMISTIC READ,您可能甚至无法从数据库中读取数据以刷新它。
Aproach 4 application level synchronization There are many different ways to do synchronization. One example would be to actually arrange all your updates in a BlockingQueue or JMS queue(if you want it to be persistent) and push all updates from a single thread. To visualize it a bit T1 and T2 will put elements on the Queue and there will be a single T3 thread reading operations and pushing them to the Database server.
Aproach 4应用程序级同步有许多不同的方法可以进行同步。一个示例是将所有更新实际安排在BlockingQueue或JMS队列中(如果您希望它是持久的)并从单个线程推送所有更新。为了使它可视化,T1和T2将把元素放在队列上,并且将有一个T3线程读取操作并将它们推送到数据库服务器。
If you use application level synchronization you should be aware that no all structures can be distributes in a multi-server deployment.
如果使用应用程序级同步,则应注意不能在多服务器部署中分发所有结构。
Well I can't think of anything else for now :)
好吧,我现在想不出别的:)
#2
3
I'm not certain I understand the question, but it seems it would constitute a logic error for a thread T1 which is only processing, for example, records beginning with AA to mark the entire file as "Parsed"? What happens if, for example, your application crashes after T1 updates but while T2 is still processing BB records? Some BB records are likely to be lost, correct?
我不确定我理解这个问题,但似乎它会构成一个线程T1的逻辑错误,它只处理,例如,以AA开头的记录将整个文件标记为“Parsed”?例如,如果您的应用程序在T1更新后崩溃但T2仍在处理BB记录时会发生什么?有些BB记录可能会丢失,对吗?
Anyhow, the crux of the issue is you have a race condition with two threads updating the same object. The stale object exception just means one of your threads lost the race. A better solution avoids a race entirely.
无论如何,问题的关键在于你有一个竞争条件,两个线程更新同一个对象。陈旧的对象异常只是意味着你的一个线程失去了竞争。更好的解决方案完全避免了比赛。
(I am assuming here that the individual record processing is idempotent, if that's not the case I think you have bigger problems as some failure modes will result in re-processing of records. If record processing has to happen once and only once, then you have a harder problem for which a message queue would probably be a better solution.)
(我在这里假设个别记录处理是幂等的,如果不是这种情况我认为你有更大的问题,因为一些失败模式将导致记录的重新处理。如果记录处理必须发生一次且只发生一次,那么你有一个更难的问题,消息队列可能是一个更好的解决方案。)
I would leverage the functionality of java.util.concurrent to dispatch records out to threaded workers, and have the thread interacting with hibernate block until all records have been processed, at which point that thread can mark the file as "Parsed".
我将利用java.util.concurrent的功能将记录分发给线程工作者,并让线程与hibernate块交互,直到所有记录都被处理完为止,此时该线程可以将文件标记为“Parsed”。
For example,
例如,
// do something like this during initialization, or use a Guava LoadingCache...
Map<RecordType, Executor> executors = new HashMap<>();
// note I'm assuming RecordType looks like an enum
executors.put(RecordType.AA_RECORD, Executors.newSingleThreadExecutor());
then as you process the file, you dispatch each record as follows, building up a list of futures corresponding to the status of the queued tasks. Let's assume successfully processing a record returns a boolean "true":
然后在处理文件时,按如下方式调度每条记录,构建与排队任务状态对应的期货列表。让我们假设成功处理一条记录返回一个布尔值“true”:
List<Future<Boolean>> tasks = new ArrayList<>();
for (Record record: file.getRecords()) {
Executor executorForRecord = executors.get(record.getRecordType());
tasks.add(executor.submit(new RecordProcessor(record)));
}
Now wait for all tasks to complete successfully - there are more elegant ways to do this, especially with Guava. Note you also need to deal with ExecutionException here if your task failed with an exception, I'm glossing over that here.
现在等待所有任务成功完成 - 有更优雅的方法来做到这一点,特别是与番石榴。请注意,如果您的任务因异常而失败,您还需要处理ExecutionException,我在此处对此进行了修改。
boolean allSuccess = true;
for (Future<Boolean> task: tasks) {
allSuccess = allSuccess && task.get();
if (!allSuccess) break;
}
// if all your tasks completed successfully, update the file record
if (allSuccess) {
file.setStatus("Parsed");
}
#3
2
Assuming that each thread T1,T2 will parse different parts of the file, means no one override the other thread parsing. the best thing is to decouple your parsing process from the DB commit.
假设每个线程T1,T2将解析文件的不同部分,意味着没有人覆盖其他线程解析。最好的办法是将解析过程与数据库提交分离。
T1, T2 will do the parsing T3 or Main Thread will do the commit after both T1,T2 has finished. and i think in this approach its more correct to change the file status to Parsed
only when both threads has finished.
T1,T2将执行解析T3或主线程将在T1,T2完成后执行提交。我认为在这种方法中,只有当两个线程都完成时才更正确地将文件状态更改为Parsed。
you can think of T3 as CommitService class which wait till T1,T2 finsih and then commit to DB
您可以将T3视为CommitService类,等待T1,T2 finsih然后提交到DB
CountDownLatch is a helpful tool to do it. and here is an Example
CountDownLatch是一个有用的工具。这是一个例子