多線程并發處理方式
1. 捕獲InterruptedException錯誤
請檢查下面的代碼片段:
public class Task implements Runnable {
private final BlockingQueue queue = 。。.;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
String result = getOrDefault(() -》 queue.poll(1L, TimeUnit.MINUTES), “default”);
//do smth with the result
}
}
T getOrDefault(Callable supplier, T defaultValue) {
try {
return supplier.call();
} catch (Exception e) {
logger.error(“Got exception while retrieving value.”, e);
return defaultValue;
}
}
}
代碼的問題是,在等待隊列中的新元素時,是不可能終止線程的,因為中斷的標志永遠不會被恢復:
運行代碼的線程被中斷。
BlockingQueue # poll()方法拋出InterruptedException異常,并清除了中斷的標志。
while中的循環條件 (!Thread.currentThread().isInterrupted())的判斷是true,因為標記已被清除。
為了防止這種行為,當一個方法被顯式拋出(通過聲明拋出InterruptedException)或隱式拋出(通過聲明/拋出一個原始異常)時,總是捕獲InterruptedException異常,并恢復中斷的標志。
T getOrDefault(Callable supplier, T defaultValue) {
try {
return supplier.call();
} catch (InterruptedException e) {
logger.error(“Got interrupted while retrieving value.”, e);
Thread.currentThread().interrupt();
return defaultValue;
} catch (Exception e) {
logger.error(“Got exception while retrieving value.”, e);
return defaultValue;
}
}
2.使用特定的執行程序來阻止操作
因為一個緩慢的操作而使整個服務器變得無響應,這通常不是開發人員想要的。不幸的是,對于RPC,響應時間通常是不可預測的。
假設服務器有100個工作線程,有一個端點,稱為100 RPS。在內部,它發出一個RPC調用,通常需要10毫秒。在某個時間點,此RPC的響應時間變為2秒,在峰值期間服務器能夠做的惟一的一件事就是等待這些調用,而其他端點則無法訪問。
@GET
@Path(“/genre/{name}”)
@Produces(MediaType.APPLICATION_JSON)
public Response getGenre(@PathParam(“name”) String genreName) {
Genre genre = potentiallyVerySlowSynchronousCall(genreName);
return Response.ok(genre).build();
}
解決這個問題最簡單的方法是提交代碼,它將阻塞調用變成一個線程池:
@GET
@Path(“/genre/{name}”)
@Produces(MediaType.APPLICATION_JSON)
public void getGenre(@PathParam(“name”) String genreName, @Suspended AsyncResponse response) {
response.setTimeout(1L, TimeUnit.SECONDS);
executorService.submit(() -》 {
Genre genre = potentiallyVerySlowSynchronousCall(genreName);
return response.resume(Response.ok(genre).build());
});
}
3. 傳MDC的值
MDC(Mapped Diagnostic Context)通常用于存儲單個任務的特定值。例如,在web應用程序中,它可能為每個請求存儲一個請求id和一個用戶id,因此MDC查找與單個請求或整個用戶活動相關的日志記錄變得更加容易。
2017-08-27 14:38:30,893 INFO [server-thread-0] [requestId=060d8c7f, userId=2928ea66] c.g.s.web.Controller - Message.
可是如果代碼的某些部分是在專用線程池中執行的,則線程(提交任務的線程)中MDC就不會被繼續傳值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志則沒有:
@GET
@Path(“/genre/{name}”)
@Produces(MediaType.APPLICATION_JSON)
public void getGenre(@PathParam(“name”) String genreName, @Suspended AsyncResponse response) {
try (MDC.MDCCloseable ignored = MDC.putCloseable(“requestId”, UUID.randomUUID().toString())) {
String genreId = getGenreIdbyName(genreName); //Sync call
logger.trace(“Submitting task to find genre with id ‘{}’。”, genreId); //‘requestId’ is logged
executorService.submit(() -》 {
logger.trace(“Starting task to find genre with id ‘{}’。”, genreId); //‘requestId’ is not logged
Response result = getGenre(genreId) //Async call
.map(artist -》 Response.ok(artist).build())
.orElseGet(() -》 Response.status(Response.Status.NOT_FOUND).build());
response.resume(result);
});
}
}
這可以通過MDC#getCopyOfContextMap()方法來解決:
。。.
public void getGenre(@PathParam(“name”) String genreName, @Suspended AsyncResponse response) {
try (MDC.MDCCloseable ignored = MDC.putCloseable(“requestId”, UUID.randomUUID().toString())) {
。。.
logger.trace(“Submitting task to find genre with id ‘{}’。”, genreId); //‘requestId’ is logged
withCopyingMdc(executorService, () -》 {
logger.trace(“Starting task to find genre with id ‘{}’。”, genreId); //‘requestId’ is logged
。。.
});
}
}
private void withCopyingMdc(ExecutorService executorService, Runnable function) {
Map
4.更改線程名稱
為了簡化日志讀取和線程轉儲,可以自定義線程的名稱。這可以通過創建ExecutorService時用一個ThreadFactory來完成。在流行的實用程序庫中有許多ThreadFactory接口的實現:
com.google.common.util.concurrent.ThreadFactoryBuilde+r in Guava.
org.springframework.scheduling.concurrent.CustomizableThreadFactory in Spring.
org.apache.commons.lang3.concurrent.BasicThreadFactory in Apache Commons Lang 3.
ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern(“computation-thread-%d”)
.build();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, threadFactory);
盡管ForkJoinPool不使用ThreadFactory接口,但也支持對線程的重命名:
ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinThreadFactory = pool -》 {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName(“computation-thread-” + thread.getPoolIndex());
return thread;
};
ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads, forkJoinThreadFactory, null, false);
將線程轉儲與默認命名進行比較:
“pool-1-thread-3” #14 prio=5 os_prio=31 tid=0x00007fc06b19f000 nid=0x5703 runnable [0x0000700001ff9000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
。。.
“pool-2-thread-3” #15 prio=5 os_prio=31 tid=0x00007fc06aa10800 nid=0x5903 runnable [0x00007000020fc000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)
。。.
“pool-1-thread-2” #12 prio=5 os_prio=31 tid=0x00007fc06aa10000 nid=0x5303 runnable [0x0000700001df3000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
。。.
與自定義命名進行比較:
“task-handler-thread-1” #14 prio=5 os_prio=31 tid=0x00007fb49c9df000 nid=0x5703 runnable [0x000070000334a000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
。。.
“authentication-service-ping-thread-0” #15 prio=5 os_prio=31 tid=0x00007fb49c9de000 nid=0x5903 runnable [0x0000700003247000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)
。。.
“task-handler-thread-0” #12 prio=5 os_prio=31 tid=0x00007fb49b9b5000 nid=0x5303 runnable [0x0000700003144000]
java.lang.Thread.State: RUNNABLE
at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)
。。.
想象一下,可能會不止3個線程。
5. 使用LongAdder計數器
在高競爭的情況下,會采用java.util.concurrent.atomic.LongAdder進行計數,而不會采用AtomicLong/AtomicInteger。LongAdder可以跨越多個單元間仍保持值不變,但是如果需要的話,也可以增加它們的值,但與父類AtomicXX比較,這會導致更高的吞吐量,也會增加內存消耗。
LongAdder counter = new LongAdder();
counter.increment();
。。.
long currentValue = counter.sum();
非常好我支持^.^
(0) 0%
不好我反對
(0) 0%
下載地址
多線程并發處理方式下載
相關電子資料下載
- 一文詳解ZGC關鍵技術 26
- AMD推出銳龍 Threadripper 7000系列處理器 171
- 如何使用pthread_barrier_xxx系列函數來實現多線程之間的同步? 29
- SpringBoot物理線程、虛擬線程、Webflux性能比較 37
- SV線程的使用和控制 121
- Python 如何獲取旅游景點信息 82
- 新一輪制裁,摩爾線程、壁仞等IC公司上實體清單,英偉達AI芯片限制出售! 927
- 英偉達H800和A800將禁運!美國將摩爾線程、壁仞列入貿易管制“黑名單” 301
- i9-14900K/i7-14700K處理器首發評測 134
- 酷睿i7-14700K處理器性能測試分析 115