色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

效率加倍,高并發場景下的接口請求合并方案

jf_ro2CN3Fa ? 來源:芋道源碼 ? 2023-01-13 10:09 ? 次閱讀


前言

請求合并到底有什么意義呢?我們來看下圖。

281af012-92e3-11ed-bfe3-dac502259ad0.png

假設我們3個用戶(用戶id分別是1、2、3),現在他們都要查詢自己的基本信息,請求到服務器,服務器端請求數據庫,發出3次請求。我們都知道數據庫連接資源是相當寶貴的,那么我們怎么盡可能節省連接資源呢?

這里把數據庫換成被調用的遠程服務,也是同樣的道理。

我們改變下思路,如下圖所示。

28311838-92e3-11ed-bfe3-dac502259ad0.png

我們在服務器端把請求合并,只發出一條SQL查詢數據庫,數據庫返回后,服務器端處理返回數據,根據一個唯一請求ID,把數據分組,返回給對應用戶。

基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能

  • 項目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 視頻教程:https://doc.iocoder.cn/video/

技術手段

  • LinkedBlockQueue 阻塞隊列
  • ScheduledThreadPoolExecutor 定時任務線程池
  • CompleteableFuture future 阻塞機制(Java 8 的 CompletableFuture 并沒有 timeout 機制,后面優化,使用了隊列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能

  • 項目地址:https://github.com/YunaiV/yudao-cloud
  • 視頻教程:https://doc.iocoder.cn/video/

代碼實現

查詢用戶的代碼

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部參數
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in語句合并成一條SQL,避免多次請求數據庫的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示沒數據
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并請求的實現

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包裝成批量執行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任務數
**/
publicstaticintMAX_TASK_NUM=100;


/**
*請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區分
*CompletableFuture將處理結果返回
*/
publicclassRequest{
//請求id唯一
StringrequestId;
//參數
LonguserId;
//TODOJava8的CompletableFuture并沒有timeout機制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一個阻塞的隊列,內部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區別
ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
ArrayBlockingQueue的存儲容器是數組,而LinkedBlockingQueue是存儲容器是鏈表
兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定時任務線程池,創建一個支持定時、周期性或延時任務的限定線程數目(這里傳入的是1)的線程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果隊列沒數據,表示這段時間沒有請求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]個請求");
//將隊列的請求消費到一個集合保存
for(inti=0;i//后面的SQL語句是有長度限制的,所以還要做限制每次批量的數量,超過最大任務數,等下次執行
if(i//拿到我們需要去數據庫查詢的特征,保存為集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//將參數傳入service處理,這里是本地服務,也可以把userService看成RPC之類的遠程調用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//將處理結果返回各自的請求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成賦值,這一步執行完畢,下面future.get()阻塞的請求可以繼續執行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執行schedule是延遲執行initialDelay是初始延遲period是周期間隔后面是單位
//這里我寫的是初始化后100毫秒后執行,周期性執行10毫秒執行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//這里用UUID做請求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//將對象傳入隊列
queue.offer(request);
//如果這時候沒完成賦值,那么就會阻塞,直到能夠拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制層調用

/***
*請求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以參考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模擬高并發查詢的代碼

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//為保證30個線程同時并發運行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i//循環開30個線程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次減一
try{
COUNT_DOWN_LATCH.await();//此處等待狀態,為了讓30個線程同時進行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<=?3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"參數"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

測試效果

283d32f8-92e3-11ed-bfe3-dac502259ad0.png284d3d4c-92e3-11ed-bfe3-dac502259ad0.png

要注意的問題

  • Java 8 的 CompletableFuture 并沒有 timeout 機制
  • 后面的SQL語句是有長度限制的,所以還要做限制每次批量的數量,超過最大任務數,等下次執行(本例中加了MAX_TASK_NUM判斷)

使用隊列的超時解決Java 8 的 CompletableFuture 并沒有 timeout 機制

核心代碼

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包裝成批量執行的地方,使用queue解決超時問題
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任務數
**/
publicstaticintMAX_TASK_NUM=100;


/**
*請求類,code為查詢的共同特征,例如查詢商品,通過不同id的來區分
*CompletableFuture將處理結果返回
*/
publicclassRequest{
//請求id
StringrequestId;

//參數
LonguserId;
//隊列,這個有超時機制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一個阻塞的隊列,內部采用鏈表的結果,通過兩個ReenTrantLock來保證線程安全
LinkedBlockingQueue與ArrayBlockingQueue的區別
ArrayBlockingQueue默認指定了長度,而LinkedBlockingQueue的默認長度是Integer.MAX_VALUE,也就是無界隊列,在移除的速度小于添加的速度時,容易造成OOM。
ArrayBlockingQueue的存儲容器是數組,而LinkedBlockingQueue是存儲容器是鏈表
兩者的實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,
也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定時任務線程池,創建一個支持定時、周期性或延時任務的限定線程數目(這里傳入的是1)的線程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果隊列沒數據,表示這段時間沒有請求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]個請求");
//將隊列的請求消費到一個集合保存
for(inti=0;i//后面的SQL語句是有長度限制的,所以還要做限制每次批量的數量,超過最大任務數,等下次執行
if(i//拿到我們需要去數據庫查詢的特征,保存為集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//將參數傳入service處理,這里是本地服務,也可以把userService看成RPC之類的遠程調用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//這里再把結果放到隊列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執行schedule是延遲執行initialDelay是初始延遲period是周期間隔后面是單位
//這里我寫的是初始化后100毫秒后執行,周期性執行10毫秒執行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//這里用UUID做請求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//將對象傳入隊列
queue.offer(request);
//取出元素時,如果隊列為空,給定阻塞多少毫秒再隊列取值,這里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部參數
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in語句合并成一條SQL,避免多次請求數據庫的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//數據分組
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示沒數據,這里要new,不然加入隊列會空指針
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小結

請求合并,批量的辦法能大幅節省被調用系統的連接資源,本例是以數據庫為例,其他RPC調用也是類似的道理。缺點就是請求的時間在執行實際的邏輯之前增加了等待時間,不適合低并發的場景。

代碼地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

參考

  • https://www.cnblogs.com/oyjg/p/13099998.html


審核編輯 :李倩


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 接口
    +關注

    關注

    33

    文章

    8685

    瀏覽量

    151651
  • 服務器
    +關注

    關注

    12

    文章

    9285

    瀏覽量

    85845
  • 數據庫
    +關注

    關注

    7

    文章

    3842

    瀏覽量

    64579

原文標題:效率加倍,高并發場景下的接口請求合并方案

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    華為支付-(可選)特定場景配置操作

    如涉及以下場景,需提前完成相關產品的開通或配置操作。如不涉及,則不需要配置。 場景一:產品開通操作 部分支付場景接入涉及產品開通,未開通產品直接接入,商戶請求華為支付開放的API
    發表于 01-21 10:30

    低成本解決方案,RK3506的應用場景分析!

    RK3506 是瑞芯微推出的MPU產品,芯片制程為22nm,定位于輕量級、低成本解決方案。該MPU具有低功耗、外設接口豐富、實時性的特點,適合用多種工商業場景。本文將基于RK3506
    的頭像 發表于 12-11 15:26 ?480次閱讀
    低成本解決<b class='flag-5'>方案</b>,RK3506的應用<b class='flag-5'>場景</b>分析!

    存儲、高效率、更靈活,拆解聯核科技“前揀后存”解決方案

    為了解決傳統倉庫低矮、空間小儲量小,庫位不足等行業痛點,聯核科技重磅推出四向穿梭車,向密集存儲場景的拓展結合無人叉車,打造存儲、高效率的創新四向車前揀后存解決方案
    的頭像 發表于 12-02 16:01 ?137次閱讀

    信令測試儀器的技術原理和應用場景

    信令測試儀器是一種專門用于測試通信系統中信令的設備,其技術原理和應用場景如下:一、技術原理信令測試儀器的技術原理主要涉及信令的捕獲、解碼和分析。 信令捕獲:信令測試儀器通過物理接口或無線接收器捕獲
    發表于 10-31 14:45

    NVIDIA助力麗蟾科技打造AI訓練與推理加速解決方案

    麗蟾科技通過 Leaper 資源管理平臺集成 NVIDIA AI Enterprise,為企業和科研機構提供了一套高效、靈活的 AI 訓練與推理加速解決方案。無論是在復雜的 AI 開發任務中,還是在并發推理
    的頭像 發表于 10-27 10:03 ?291次閱讀
    NVIDIA助力麗蟾科技打造AI訓練與推理加速解決<b class='flag-5'>方案</b>

    測試聊并發-入門篇

    服務端接口的性能測試中,我們面臨的挑戰不僅僅是處理單個請求效率,更在于如何在多用戶同時訪問時保持系統的穩定性和響應速度。并發編程和測試,作為性能測試的核心,對于評估系統在
    的頭像 發表于 10-15 15:23 ?217次閱讀
    測試聊<b class='flag-5'>并發</b>-入門篇

    使用DM365的DC/DC轉換器的Vin、高效率電源解決方案

    電子發燒友網站提供《使用DM365的DC/DC轉換器的Vin、高效率電源解決方案.pdf》資料免費下載
    發表于 10-11 10:47 ?0次下載
    使用DM365的DC/DC轉換器的<b class='flag-5'>高</b>Vin、高<b class='flag-5'>效率</b>電源解決<b class='flag-5'>方案</b>

    使用帶有DVFS的DC/DC轉換器的Vin、高效率電源解決方案

    電子發燒友網站提供《使用帶有DVFS的DC/DC轉換器的Vin、高效率電源解決方案.pdf》資料免費下載
    發表于 10-10 10:28 ?0次下載
    使用帶有DVFS的DC/DC轉換器的<b class='flag-5'>高</b>Vin、高<b class='flag-5'>效率</b>電源解決<b class='flag-5'>方案</b>

    并發物聯網云平臺是什么

    并發物聯網云平臺是一種能夠處理大量設備同時連接并進行數據交換的云計算平臺。這種平臺通常被設計用來應對來自數以萬計甚至數十億計的物聯網設備的并發請求,保證系統的穩定性和響應速度。 首先
    的頭像 發表于 08-13 13:50 ?287次閱讀

    并發系統的藝術:如何在流量洪峰中游刃有余

    尤為重要。用戶對在線服務的需求和期望不斷提高,系統的并發處理能力成為衡量其性能和用戶體驗的關鍵指標之一。并發系統不僅僅是大型互聯網企業的專利,對于任何希望在市場中占據一席之地的公司來說,能夠處理大量
    的頭像 發表于 08-05 13:43 ?314次閱讀
    <b class='flag-5'>高</b><b class='flag-5'>并發</b>系統的藝術:如何在流量洪峰中游刃有余

    FPGA與MCU的應用場景

    肯定是不同的。在需要處理多個高速數據流的場景,FPGA的多通道IO接口設計能力顯得尤為重要,例如PCIe、DDR還是其他高速通信協議。FPGA可以進行高速數字信號處理,能夠以極高的效率
    發表于 07-29 15:45

    鴻蒙原生應用元服務開發-設備管理USB服務開發場景接口

    場景介紹 Host模式,可以獲取到已經連接的USB設備列表,并根據需要打開和關閉設備、控制設備權限、進行數據傳輸等。 接口說明 USB服務主要提供的功能有:查詢USB設備列表、批量數據傳輸、控制
    發表于 06-07 14:40

    為何什么risc-v芯片比arm的效率高

    并不是絕對的,而是取決于具體的應用場景、設計優化等因素。 綜上所述,RISC-V芯片在某些情況可能相對于ARM架構芯片表現出更高的效率,這主要得益于RISC-V設計的模塊化、開源特性和在寄存器數目、指令數目等方面的優勢。然而,
    發表于 04-28 09:38

    鴻蒙OS開發實例:【工具類封裝-http請求

    ;@ohos.promptAction';** **封裝HTTP接口請求類,提供格式化的響應信息輸出功能。 使用 DevEco Studio 3.1.1 Release 及以上版本,API 版本為 api 9 及以上
    的頭像 發表于 03-27 22:32 ?1427次閱讀
    鴻蒙OS開發實例:【工具類封裝-http<b class='flag-5'>請求</b>】

    光伏逆變器轉換效率測試方案

    發揮著將太陽能面板發出的直流電轉換成交流電,并將這些交流電輸送至電力公司電網的作用,逆變器的轉換效率高,供自家使用及出售的電力就可增加。 光伏逆變器都有一定的損耗,為了給客戶產生更大的效益,國家標準
    發表于 02-22 15:15
    主站蜘蛛池模板: 日韩午夜欧美精品一二三四区| 天天综合网网欲色| 香蕉在线播放| 高h超辣bl文| 日本双渗透| seba51久久精品| 欧美午夜不卡在线观看| 印度12 13free| 久久草香蕉频线观| 真实处破女全过程完免费观看| 久久国产精品免费网站| 在线免费观看日本| 美女拉开腿让男生桶到爽| 98国产精品人妻无码免费| 蜜柚在线观看免费高清官网视频 | 出差无套内射小秘书| 日本久久不射| 国产精品A久久777777| 小黄文纯肉污到你湿| 精品无人区麻豆乱码1区2| 诱受H嗯啊巨肉舍友1V1| 麻豆精品2021最新| 99re5久久热在线| 日本另类xxxx| 国产成人综合视频| 亚洲高清中文字幕| 久久人妻少妇嫩草AV無碼| 1级午夜影院费免区| 牛牛在线精品视频| 成人五级毛片免费播放| 午夜性爽视频男人的天堂在线 | 久久黄色录像| 99久久无码热高清精品| 热の中文 AV天堂| 国产人妻XXXX精品HD电影| 亚洲色图激情小说| 美女伊人网| 国产电影一区二区三区| 亚洲一区二区影院| 欧美人妇无码精品久久| 国产精品色吧国产精品|