【译者注】在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。 以下为译文:
1.捕获InterruptedException错误
请检查下面的代码片段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public class Task implements Runnable {
private final BlockingQueue queue = ...;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
String result = getOrDefault(() -> queue.poll(1L, TimeUnit.MINUTES), "default" );
//do smth with the result
}
}
T getOrDefault(Callable supplier, T defaultValue) {
try {
return supplier.call();
}
catch (Exception e) {
logger.error( "Got exception while retrieving value." , e);
return defaultValue;
}
}
}
|
代码的问题是,在等待队列中的新元素时,是不可能终止线程的,因为中断的标志永远不会被恢复:
1.运行代码的线程被中断。
2.BlockingQueue # poll()方法抛出InterruptedException异常,并清除了中断的标志。
3.while中的循环条件 (!Thread.currentThread().isInterrupted())的判断是true,因为标记已被清除。
为了防止这种行为,当一个方法被显式抛出(通过声明抛出InterruptedException)或隐式抛出(通过声明/抛出一个原始异常)时,总是捕获InterruptedException异常,并恢复中断的标志。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
T getOrDefault(Callable supplier, T defaultValue) {
try {
return supplier.call();
}
catch (InterruptedException e) {
logger.error( "Got interrupted while retrieving value." , e);
Thread.currentThread().interrupt();
return defaultValue;
}
catch (Exception e) {
logger.error( "Got exception while retrieving value." , e);
return defaultValue;
}
}
|
2.使用特定的执行程序来阻止操作
因为一个缓慢的操作而使整个服务器变得无响应,这通常不是开发人员想要的。不幸的是,对于RPC,响应时间通常是不可预测的。
假设服务器有100个工作线程,有一个端点,称为100 RPS。在内部,它发出一个RPC调用,通常需要10毫秒。在某个时间点,此RPC的响应时间变为2秒,在峰值期间服务器能够做的惟一的一件事就是等待这些调用,而其他端点则无法访问。
1
2
3
4
5
6
7
|
@GET
@Path ( "/genre/{name}" )
@Produces (MediaType.APPLICATION_JSON)
public Response getGenre( @PathParam ( "name" ) String genreName) {
Genre genre = potentiallyVerySlowSynchronousCall(genreName);
return Response.ok(genre).build();
}
|
解决这个问题最简单的方法是提交代码,它将阻塞调用变成一个线程池:
1
2
3
4
5
6
7
8
9
10
11
|
@GET
@Path ( "/genre/{name}" )
@Produces (MediaType.APPLICATION_JSON)
public void getGenre( @PathParam ( "name" ) String genreName, @Suspended AsyncResponse response) {
response.setTimeout(1L, TimeUnit.SECONDS);
executorService.submit(() -> {
Genre genre = potentiallyVerySlowSynchronousCall(genreName);
return response.resume(Response.ok(genre).build());
}
);
}
|
3.传MDC的值
MDC(Mapped Diagnostic Context)通常用于存储单个任务的特定值。例如,在web应用程序中,它可能为每个请求存储一个请求id和一个用户id,因此MDC查找与单个请求或整个用户活动相关的日志记录变得更加容易。
1
|
2017 - 08 - 27 14 : 38 : 30 , 893 INFO [server-thread- 0 ] [requestId=060d8c7f, userId=2928ea66] c.g.s.web.Controller - Message.
|
可是如果代码的某些部分是在专用线程池中执行的,则线程(提交任务的线程)中MDC就不会被继续传值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志则没有:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@GET
@Path ( "/genre/{name}" )
@Produces (MediaType.APPLICATION_JSON)
public void getGenre( @PathParam ( "name" ) String genreName, @Suspended AsyncResponse response) {
try (MDC.MDCCloseable ignored = MDC.putCloseable( "requestId" , UUID.randomUUID().toString())) {
String genreId = getGenreIdbyName(genreName);
//Sync call
logger.trace( "Submitting task to find genre with id '{}'." , genreId);
//'requestId' is logged
executorService.submit(() -> {
logger.trace( "Starting task to find genre with id '{}'." , genreId);
//'requestId' is not logged
Response result = getGenre(genreId) //Async call
.map(artist -> Response.ok(artist).build())
.orElseGet(() -> Response.status(Response.Status.NOT_FOUND).build());
response.resume(result);
}
);
}
}
|
这可以通过MDC#getCopyOfContextMap()方法来解决:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
...
public void getGenre( @PathParam ( "name" ) String genreName, @Suspended AsyncResponse response) {
try (MDC.MDCCloseable ignored = MDC.putCloseable( "requestId" , UUID.randomUUID().toString())) {
...
logger.trace( "Submitting task to find genre with id '{}'." , genreId); //'requestId' is logged
withCopyingMdc(executorService, () -> {
logger.trace( "Starting task to find genre with id '{}'." , genreId); //'requestId' is logged
...
});
}
}
private void withCopyingMdc(ExecutorService executorService, Runnable function) {
Map
|
4.更改线程名称
为了简化日志读取和线程转储,可以自定义线程的名称。这可以通过创建ExecutorService时用一个ThreadFactory来完成。在流行的实用程序库中有许多ThreadFactory接口的实现:
1
2
3
|
com.google.common.util.concurrent.ThreadFactoryBuilde+r in Guava.
org.springframework.scheduling.concurrent.CustomizableThreadFactory in Spring.
org.apache.commons.lang3.concurrent.BasicThreadFactory in Apache Commons Lang 3.
|
1
2
3
4
|
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern( "computation-thread-%d" )
.build();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, threadFactory);
|
尽管ForkJoinPool不使用ThreadFactory接口,但也支持对线程的重命名:
1
2
3
4
5
6
|
ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinThreadFactory = pool -> {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName( "computation-thread-" + thread.getPoolIndex());
return thread;
};
ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads, forkJoinThreadFactory, null , false );
|
将线程转储与默认命名进行比较:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
"pool-1-thread-3" # 14 prio= 5 os_prio= 31 tid= 0x00007fc06b19f000 nid= 0x5703 runnable [ 0x0000700001ff9000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java: 16 )
...
"pool-2-thread-3" # 15 prio= 5 os_prio= 31 tid= 0x00007fc06aa10800 nid= 0x5903 runnable [ 0x00007000020fc000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java: 21 )
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java: 9 )
...
"pool-1-thread-2" # 12 prio= 5 os_prio= 31 tid= 0x00007fc06aa10000 nid= 0x5303 runnable [ 0x0000700001df3000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java: 16 )
...
|
与自定义命名进行比较:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
"task-handler-thread-1" # 14 prio= 5 os_prio= 31 tid= 0x00007fb49c9df000 nid= 0x5703 runnable [ 0x000070000334a000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java: 16 )
...
"authentication-service-ping-thread-0" # 15 prio= 5 os_prio= 31 tid= 0x00007fb49c9de000 nid= 0x5903 runnable [ 0x0000700003247000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java: 21 )
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java: 9 )
...
"task-handler-thread-0" # 12 prio= 5 os_prio= 31 tid= 0x00007fb49b9b5000 nid= 0x5303 runnable [ 0x0000700003144000 ]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java: 16 )
...
|
想象一下,可能会不止3个线程。
5.使用LongAdder计数器
在高竞争的情况下,会采用java.util.concurrent.atomic.LongAdder进行计数,而不会采用AtomicLong/AtomicInteger。
LongAdder可以跨越多个单元间仍保持值不变,但是如果需要的话,也可以增加它们的值,但与父类AtomicXX比较,这会导致更高的吞吐量,也会增加内存消耗。
1
2
3
4
|
LongAdder counter = new LongAdder();
counter.increment();
...
long currentValue = counter.sum();
|
总结
以上就是本文关于5个并发处理技巧代码示例的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!
原文链接:http://geek.csdn.net/news/detail/235232