色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

C++20無棧協(xié)程超輕量高性能異步庫開發(fā)實(shí)戰(zhàn)

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-09 10:20 ? 次閱讀

c++20出來有一段時(shí)間了。其中一大功能就是終于支持協(xié)程了(c++作為行業(yè)大哥大級(jí)別的語言,居然到C++20才開始支持協(xié)程,我也是無力吐槽了,讓多少人等了多少年,等了多少青春)但千呼萬喚他終于還是來了,c++標(biāo)準(zhǔn)委員會(huì)的謹(jǐn)慎態(tài)度也造就了c++20的給出來協(xié)程:“性能之優(yōu)秀”,“開發(fā)之靈活”和讓人勸退的“門檻之高”。

不過話說回來,c++從出身就注定了背負(fù)性能使命,他不是為簡單為應(yīng)用層維度開發(fā)的語言(如果應(yīng)用層你大可以用python java ruby lua等語言),他是一門可以開發(fā)其他語言的語言,所以追逐高性能和靈活性,舍棄矯情的低門檻,畢竟C++不是設(shè)計(jì)來給所有人用的語言。

之前用過python的協(xié)程,協(xié)程易用程度高,所以c++20到來也想嘗試c++狀態(tài)下的協(xié)程,但是接觸以后發(fā)現(xiàn)問題,c++20的協(xié)程狀態(tài)是:只有基礎(chǔ)設(shè)施,也就是實(shí)現(xiàn)了無棧協(xié)程的所有機(jī)制和功能,但沒有封裝到具體的應(yīng)用層標(biāo)準(zhǔn)庫STL。此時(shí)大部分人就只能干瞪眼了,由于復(fù)雜的協(xié)程運(yùn)作機(jī)制,沒有實(shí)現(xiàn)標(biāo)準(zhǔn)庫的情況下,說要用上協(xié)程你是在開玩笑,網(wǎng)上一致的意見c++20是半成品,要真的用上c++協(xié)程得等c++23協(xié)程標(biāo)準(zhǔn)庫完善后才行。

一貫本著不作死就不會(huì)死得態(tài)度,只會(huì)用庫不懂底層機(jī)制,那不是用c++的態(tài)度,所以深入學(xué)習(xí)c++20協(xié)程,半個(gè)月時(shí)間,寫了一個(gè)簡單的協(xié)程庫,在此過程中也對(duì)復(fù)雜的c++協(xié)程機(jī)制有了深入的了解。話說asio和cppcoro兩個(gè)庫已經(jīng)支持了c++20協(xié)程,但是我覺得還是龐大和復(fù)雜,對(duì)于想通過看庫源代碼學(xué)習(xí)c++協(xié)程的同學(xué),我覺得還是算了,在不懂協(xié)程機(jī)理的情況下,你連看源代碼都看不懂好吧!!有人會(huì)說有源代碼了你都看不懂,你是吹牛。那還真不是,c++協(xié)程在語法上會(huì)有些顛覆你的三觀,我們來舉個(gè)例子:

A func(int a){
co_await std::suspend_always{};
co_yield a;
co_return 0;
}
int main(){
auto co = func(1);
co.hd.resume();
int a = co.hd.resume();
int b = co.hd.resume();
}

有人說func是一個(gè)協(xié)程函數(shù),main中的func運(yùn)行后會(huì)返回0,也就是 co是一個(gè)int變量值為0;

如果你按常規(guī)代碼理解,沒錯(cuò)。但是在c++協(xié)程的世界,他完全不是上面說的情況。

正確的情況是: func在這里是一個(gè)協(xié)程生成器(這個(gè)概念很重要,他不是函數(shù))返回值co是一個(gè)協(xié)程管理類A關(guān)聯(lián)了具體協(xié)程執(zhí)行體后的協(xié)程實(shí)例的控制句柄(的包裝對(duì)象)。明確co不是協(xié)程實(shí)例(協(xié)程幀),是協(xié)程實(shí)例的控制句柄的包裝對(duì)象,在func(1)執(zhí)行之后他只是“動(dòng)態(tài)”生成了一個(gè)協(xié)程實(shí)例,并把控制句柄返回給用戶,但此時(shí)這個(gè)協(xié)程是掛起的,協(xié)程體{}代碼塊還沒有被執(zhí)行過,所以不存在返回值。這非常的繞,讓人難以理解(后面還有更難理解的)。

在三次co.hd.resume();調(diào)用后協(xié)程才被完全執(zhí)行完畢,此時(shí)a=1,b=0;

返回值保存在協(xié)程的實(shí)例(協(xié)程幀)中,通過協(xié)程管理類A的內(nèi)部流程控制函數(shù)管理著返回值(A的promise_type定義了所有的協(xié)程控制行為)。

總結(jié)幾點(diǎn) (重要,不要混淆):

1、“協(xié)程管理類A是包含協(xié)程行為控制的類定義 ,A不是協(xié)程,形如 A func(int a, …){ … } 才是一個(gè)完整的協(xié)程定義”;所以A func1(){}; A func2(){}; A func3(){}; 都可以與同一個(gè)協(xié)程控制A綁定,但他們是3個(gè)不同的協(xié)程定義,只是協(xié)程控制行為都為A。好處是,你可以用一個(gè)std::vector< A > 保存下這3個(gè)不同的協(xié)程,他們的主協(xié)程體(功能實(shí)現(xiàn))各不相同。要讓A為一個(gè)協(xié)程管理類,必須包含struct promise_type{}定義,和一個(gè)控制句柄對(duì)象std::coroutine_handle< promise_type > hd; 特別的,A可以不實(shí)現(xiàn)await_xxx接口,他可以不是可等待體。

2、代碼塊體中有co_await ,co_yield,co_return關(guān)鍵字,則為協(xié)程體代碼塊,運(yùn)行到關(guān)鍵字位置會(huì)**“觸發(fā)協(xié)程掛起” ** ,此時(shí)原調(diào)用者代碼阻塞在resume函數(shù)位置,運(yùn)行權(quán)重新回到調(diào)用者,此時(shí)resume會(huì)返回,調(diào)用者繼續(xù)執(zhí)行;

3、特別的:

co_await可以與可等待對(duì)象配合,形成更為復(fù)雜的協(xié)程掛起行為:一般異步IO操作,都是通過co_await + io可等待對(duì)象,完成異步操作后掛起協(xié)程,等待異步io完成后,再由**“調(diào)度器”**恢復(fù)協(xié)程繼續(xù)運(yùn)行,從而發(fā)揮異步的意義,形成io復(fù)雜度向cpu復(fù)雜度的轉(zhuǎn)移。因此,協(xié)程解決的是問題是“異步”而不是“并行”,要實(shí)現(xiàn)并行只能考慮多線程或多進(jìn)程,協(xié)程可以將單個(gè)線程cpu效率發(fā)揮到最大,而不會(huì)被io阻塞浪費(fèi)掉當(dāng)前線程的cpu算能,那問題來了,如果我們用 協(xié)程 + 多線程/多進(jìn)程 結(jié)合模式呢,那恭喜你,世界都將是你的;

co_yield實(shí)現(xiàn)簡單掛起,簡單的立即放棄運(yùn)行權(quán),返回調(diào)用者,可恢復(fù)(異步應(yīng)用場景相對(duì)較少,多用于循環(huán)生成器);

co_return實(shí)現(xiàn)最后一次簡單掛起,立即放棄運(yùn)行權(quán),返回調(diào)用者,協(xié)程后續(xù)不再可恢復(fù)(應(yīng)用于協(xié)程退出);

4、可等待體(類形如 struct B{ await_ready();await_suspend();await_resume(); } 實(shí)現(xiàn) 三個(gè)await_xxx接口的類B是一個(gè)可等待體定義),他的實(shí)例是一個(gè)可等待對(duì)象;其中await_suspend()在執(zhí)行后(不是執(zhí)行前),會(huì)觸發(fā)當(dāng)前協(xié)程掛起(記住,此處不是可等待對(duì)象掛起,是co_await 此可等待對(duì)象的當(dāng)前協(xié)程掛起,不能混淆,由于概念不清,我在這個(gè)位置耽誤了很久的時(shí)間)

5、協(xié)程管理類A,和可等待體B,他們沒有直接關(guān)系,是兩個(gè)不同的東西。可等待體B控制掛起時(shí)點(diǎn)的行為是局部的,協(xié)程控制A控制協(xié)程整體創(chuàng)建,運(yùn)行,掛起,銷毀,異常捕獲等過程的行為是整體的;協(xié)程只對(duì)應(yīng)有一個(gè)控制類A,但是內(nèi)部可以有多次掛起操作,每次操作對(duì)應(yīng)一個(gè)可等待對(duì)象;

庫開發(fā)

本文重點(diǎn)是庫實(shí)戰(zhàn)開發(fā),關(guān)于協(xié)程框架中的 3大概念:協(xié)程定義類及promise_type{},可等待體awaitable,協(xié)程控制句柄std::coroutine_handle< > ,此處不做介紹,自行了解。

但是要介紹一下協(xié)程調(diào)度的運(yùn)行邏輯,以此加深庫開發(fā)過程的理解。這個(gè)過程在多線程下面是由內(nèi)核管理的我們很少會(huì)了解,但是到了協(xié)程,你還要自己寫庫,那必須自己實(shí)現(xiàn)協(xié)程的調(diào)度算法和event loop模式

在此,我打個(gè)形象比喻:

現(xiàn)在一個(gè)家中有5個(gè)兒子,他們能力各不相同(工作者協(xié)程),還有一個(gè)媽媽(調(diào)度者協(xié)程),現(xiàn)在只有一臺(tái)電腦(單線程時(shí)間片),同一時(shí)刻,這臺(tái)電腦只能被老媽分給其中一個(gè)兒子來使用(協(xié)程搶占),其中一個(gè)兒子首先得到電腦開始工作(協(xié)程恢復(fù)),其他兒子只能等著無法工作(協(xié)程等待狀態(tài)),有電腦的兒子工作一會(huì)后此時(shí)他發(fā)送一封對(duì)外郵件(可等待對(duì)象)但要等待郵件回復(fù)后才能繼續(xù)工作(io等待完成),因?yàn)槠渌舜藭r(shí)還在等著用電腦而自己此時(shí)不具備繼續(xù)工作的條件,所以他識(shí)趣的放棄電腦的使用權(quán),并把電腦交還給老媽(協(xié)程掛起等待,執(zhí)行權(quán)交還caller)并等著老媽下次再把電腦給他使用,老媽拿到電腦后(調(diào)度協(xié)程恢復(fù)執(zhí)行)檢查是否有回復(fù)郵件到來(調(diào)度協(xié)程檢查事件完成,對(duì)應(yīng)事件循環(huán)iocp/epoll),如果有了,老媽檢查這封回復(fù)郵件是回復(fù)給哪個(gè)兒子的,并叫來對(duì)應(yīng)的兒子(協(xié)程調(diào)度),把電腦交給他(協(xié)程恢復(fù)),得到電腦的兒子打開回復(fù)郵件拿到結(jié)果(await_resume() 返回異步io結(jié)果)繼續(xù)工作,…, 不斷循環(huán)。至此,完成一個(gè)協(xié)程完整調(diào)度流程。

要實(shí)現(xiàn)一個(gè)協(xié)程庫,他需要幾個(gè)東西:

1、實(shí)現(xiàn)具體的異步操作的可等待體(類似比喻中的發(fā)郵件操作,定義是否將電腦歸還,獲取回復(fù)后打開查詢結(jié)果等行為);

2、協(xié)程控制類A(他是一個(gè)協(xié)程任務(wù)task),A的promise_type中應(yīng)該記錄協(xié)程的相關(guān)狀態(tài),記錄掛起點(diǎn)的可等待對(duì)象的指針(很重要),可等待對(duì)象也可以充當(dāng)task和調(diào)度協(xié)程,信息交換的媒介,可等待對(duì)象指針通過 await_suspend() 過程傳遞給task的promise做記錄并保存。調(diào)度協(xié)程通過可等待對(duì)象指針在異步操作完成時(shí)將異步操作結(jié)果傳回給等待的task。

3、 如總結(jié)和比喻所說,最重要的,還需要一個(gè)“協(xié)程調(diào)度器”。第一、他有一個(gè)主調(diào)度協(xié)程,調(diào)度協(xié)程具有一系列的調(diào)度算法,他的工作就是監(jiān)測io異步完成事件的到來和分配執(zhí)行權(quán)給task,第二,他維護(hù)有一個(gè)task協(xié)程隊(duì)列(可以多種方法實(shí)現(xiàn)),隊(duì)列記錄著所有的協(xié)程實(shí)例的句柄,這個(gè)隊(duì)列是為了協(xié)程調(diào)度準(zhǔn)備的。

(注:之所以C++20無法直接使用的原因,其實(shí)就是,以上3個(gè)具體的工具沒有現(xiàn)成的庫,由于高度靈活,c++希望使用者自己實(shí)現(xiàn)以上組件,這讓用慣成品庫的我們非常難受,望而卻步,天天喊著等c++23的標(biāo)準(zhǔn)庫,但c++23也不能將所有的需求都囊括,遇到特殊需求還是要自己寫)

實(shí)現(xiàn)思路

調(diào)度器:

1 調(diào)度協(xié)程中的event loop本例是在Windows下采用的iocp模型(linux下可以使用epoll也很好改,原理一樣)

2、調(diào)度算法采用簡單的等權(quán)重調(diào)度,也就是掛入?yún)f(xié)程隊(duì)列的task,輪流調(diào)度,每個(gè)被調(diào)度的task被調(diào)度的機(jī)會(huì)相同;

3、完成事件標(biāo)記和task恢復(fù),業(yè)務(wù)分開,這樣目的是使得通過co_yield簡單掛起的任務(wù)有重新執(zhí)行的機(jī)會(huì)(因?yàn)閏o_yeild不會(huì)在后續(xù)觸發(fā)完成事件)

4、調(diào)度器中記錄著協(xié)程隊(duì)列的開始task和末尾task的handle,以便調(diào)度協(xié)程;

可等待體:

1、文件file異步read,write操作;

2、網(wǎng)絡(luò)套接字,tcp協(xié)議下異步listen,accept, send, recv 操作;

3、網(wǎng)絡(luò)套接字,udp協(xié)議下異步sendto, recvfrom 操作;

4、協(xié)程休眠,實(shí)現(xiàn)sleepFor,sleepForEx操作,分別實(shí)現(xiàn)協(xié)程任務(wù)的毫秒和微秒級(jí)休眠;

5、在iocp模型下以上api都提供了重疊io操作,此時(shí)將api執(zhí)行成功的重疊io操作,將對(duì)應(yīng)的可等待體指針記錄到當(dāng)前協(xié)程變量中(promise_type中的變量),一旦完成事件到來,調(diào)度協(xié)程就會(huì)設(shè)置可等待對(duì)象的完成標(biāo)記狀態(tài)為true,調(diào)度協(xié)程只要在輪詢中逐個(gè)檢查task保存的可等待對(duì)象指針,檢查完成標(biāo)記是否為true,為true恢復(fù)執(zhí)行該協(xié)程,為false則跳過該協(xié)程,繼續(xù)輪詢 event loop;

任務(wù)定義(task協(xié)程):

1、task協(xié)程的promise_type中定義3個(gè)變量,

2、保存當(dāng)前掛起的可等待提指針,如果當(dāng)前協(xié)程不是io掛起或者是沒有掛起,該指針應(yīng)該為null

3、保存當(dāng)前協(xié)程自身所屬調(diào)度器Scheduler的指針;

4、保存此刻協(xié)程隊(duì)列中的前一個(gè)協(xié)程task的handle和后一個(gè)協(xié)程task的handle;

5、若當(dāng)前task的可等待對(duì)象完成標(biāo)記為true,則調(diào)度協(xié)程會(huì)將該task的before task和behind task鏈接,將該task的handle移動(dòng)到協(xié)程隊(duì)列尾部,并且resume task,完成調(diào)度和恢復(fù);

啟動(dòng)協(xié)程調(diào)度:

1、實(shí)例化調(diào)度器 CoScheduler;

2、通過lambda表達(dá)方式定義task協(xié)程,并加入到調(diào)度器的協(xié)程隊(duì)列;

3、通過run方法啟動(dòng)調(diào)度器調(diào)度運(yùn)行各協(xié)程任務(wù);

4、task協(xié)程中又可以動(dòng)態(tài)嵌套生產(chǎn)新的task協(xié)程加入到調(diào)度隊(duì)列;

先看測試效果:(后面會(huì)有源碼)

案例1:tcp 服務(wù)器/客戶端模型測試

除調(diào)度協(xié)程外,協(xié)程隊(duì)列中會(huì)產(chǎn)生4個(gè)task,一個(gè)服務(wù)監(jiān)聽器task,一個(gè)客戶端生成器task,服務(wù)端task,客戶端task

Main coro scheduler started ...
Main coro scheduler: Iocp loop started ... //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064
Tcp server coro started ... //1 監(jiān)聽器task執(zhí)行
Server listener accept wait ... --》 在accept異步掛起
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 (event loop段)
Clients creater coro started. //2 客戶端生成器task執(zhí)行
Clients creater make client task 1. --》 動(dòng)態(tài)生成客戶端task加入隊(duì)列
Clients creater yield run privilege to coro scheduler. --> 通過co_yield返回調(diào)度協(xié)程
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064 --》 調(diào)度新到來的task
Client[1] connect wait ... //3 客戶端task執(zhí)行 在connect異步掛起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行 檢測到connect完成事件
Clients creater fetch run privilege again. //2 客戶端生成器task 執(zhí)行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到accept完成事件
Server listener accept wait ... //1 服務(wù)端監(jiān)聽task執(zhí)行 在accept異步掛起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行
Clients creater fetch run privilege again. //2 客戶端生成器task執(zhí)行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Server[1] send wait ... //4 服務(wù)端task執(zhí)行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到send完成事件
Client[1] recv wait ... //3 客戶端task執(zhí)行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服務(wù)端task執(zhí)行 在recv異步掛起
Client[1] recv server msg = //3 客戶端task執(zhí)行
Hello client. this is server 1. 1st response. --》打印服務(wù)端發(fā)來的消息
Client[1] send wait ... --》在send異步掛起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 --》 檢測到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg = //4 服務(wù)端task執(zhí)行
Helle server, this is client 1: 2st request. -->打印客戶端發(fā)來的消息
Server[1] send wait ...

多個(gè)協(xié)程任務(wù)的異步交替執(zhí)行,就是在一個(gè)協(xié)程遇到 可掛起的異步操作時(shí),比如connect accept send recv等,把運(yùn)行權(quán)限歸還給調(diào)度器,當(dāng)完成事件到來,調(diào)度器又把執(zhí)行權(quán)返回給task,形成執(zhí)行權(quán)在調(diào)度器和task之間反復(fù)橫跳的情況,實(shí)現(xiàn)cpu的多任務(wù)復(fù)用;

案例2:udp 廣播模式測試

Main coro scheduler started ... // 同案例1 調(diào)度啟動(dòng),分別產(chǎn)生3個(gè)服務(wù)和3個(gè)客戶端
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid=31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Iocp: New task arrived and first run, tid=31188
Udp server[1] coro started bind port = 33000...
Udp server[1] recvfrom wait ... //服務(wù)端1 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[2] coro started bind port = 33001...
Udp server[2] recvfrom wait ... //服務(wù)端2 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[3] coro started bind port = 33002...
Udp server[3] recvfrom wait ... //服務(wù)端3 異步接收
Iocp: New task arrived and first run, tid=31188
Broadcastor[1] send wait ... //客戶端1 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[2] send wait ... //客戶端2 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[3] send wait ... //客戶端3 異步發(fā)送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188 //調(diào)度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188 //調(diào)度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg = //服務(wù)端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Udp server[2] recvfrom wait ... --》 在recvfrom異步掛起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg =
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Broadcastor[2] sendto server msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Broadcastor[3] sendto server msg =
Helle server, this is broadcastor 3: 1st randon broadcast to port=33001.

再看看性能測試:

同樣采用案例1和案例2的模型,但這次tcp采用100個(gè)server/client共200個(gè)task,udp采用20個(gè)braodcast/reciever共40個(gè)task來測試并發(fā)效果,做一下統(tǒng)計(jì);效果如下

Tcp server coro started ...
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.

按server和client一次完整的send和recv,也就是4此tcp通信,記錄為一次有效通訊記錄,記為1times,

則結(jié)果顯示,在coro=200時(shí)候,單個(gè)線程平均每秒將完成3萬次有效通訊(雖然是自導(dǎo)自演,但是協(xié)程的功能完整實(shí)現(xiàn)了,性能可觀)

Servers creater coro started.
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port = 33000...
...
Udp server[20] coro started bind port = 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.

由于udp是單向非鏈接協(xié)議,速度會(huì)比tcp快得多,按一次sendto和recvfrom記為一次有效通訊,則在coro=40時(shí)候,單線程每秒有效通訊18萬次。

最后

c++協(xié)程理解之后并不是很難,并且只要api提供異步方案,都可以實(shí)現(xiàn)協(xié)程庫的封裝,比如mysql,redis等異步操作,后續(xù)都可以依葫蘆畫瓢,很快實(shí)現(xiàn)c++協(xié)程庫的開發(fā)。

本庫開發(fā)只是為記錄c++協(xié)程學(xué)習(xí)的經(jīng)歷,很多功能后續(xù)還需完善。目前支持在windows下的各位file socket sleep的異步操作,后續(xù)可擴(kuò)展支持linux的epoll模型。

代碼

頭文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的測試代碼。

#ifndef __CL_COROUTINE__
#define __CL_COROUTINE__

#if (defined(__cplusplus) && __cplusplus >= 202002L) || (defined(_HAS_CXX20) && _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif

#if (defined(CLUseCorotine) && CLUseCorotine)

#include
#include
#include
#include "../_cl_common/CLCommon.h"

#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精簡windows包含庫的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "Winsock2.h"
#include "WS2tcpip.h"
#include "MSWSock.h"
#pragma comment(lib, "ws2_32.lib")
#else
#endif

struct CoScheduler;

//(協(xié)程)任務(wù)單元
struct CoTask {

using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;

struct promise_type {
CoTask get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() { }
template
std::suspend_always yield_value(const U& val) {
pAwaitableFile = nullptr;
return {};
}

CoScheduler* sc = 0;
handle before = 0, behind = 0;
void* pAwaitableFile = 0;
};

bool resume();
handle hd;
};

//(協(xié)程)任務(wù)調(diào)度器。包含主調(diào)度協(xié)程和事件循環(huán),維護(hù)掛起的(協(xié)程)任務(wù)隊(duì)列
struct CoScheduler {

struct MainCoro {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
MainCoro get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() { }
CoScheduler* sc = 0;
};
constexpr bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) { }
auto await_resume() const { }
handle hd;
};

CoScheduler()
: m_curTid(std::this_thread::get_id())
, m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0))
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
bool registe(HANDLE hFile) {
if (!hFile || hFile == INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) != m_hIocp)
return false;
else
return true;
}
bool registe(SOCKET sock) {
return registe((HANDLE)sock);
}
// 創(chuàng)建task并等待后續(xù)調(diào)度執(zhí)行
template
void gather(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);
}
// 創(chuàng)建task并立即調(diào)度執(zhí)行
template
void createTask(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
coro.resume();
}
size_t taskCount() const { return m_taskCount; }
// 執(zhí)行協(xié)程調(diào)度
void run();

private:
void appendNewTaskToEnd(CoTask& cur) {
auto& cprm = cur.hd.promise();
cprm.sc = this;
if (m_end.hd) {
cprm.before = m_end.hd;
cprm.behind = 0;
m_end.hd.promise().behind = cur.hd;
}
m_end.hd = cur.hd;
++m_taskCount;
if (m_begin.hd == 0) {
m_begin.hd = cur.hd;
cprm.before = 0;
}
}
void moveTaskToEnd(CoTask::handle h) {
if (removeDoneTask())
return;
if (!h)
return;
auto& cprm = h.promise();
if (h == m_begin.hd) {
m_begin.hd = cprm.behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
else if (h == m_end.hd) {
}
else {
cprm.behind.promise().before = cprm.before;
cprm.before.promise().behind = cprm.behind;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
}
bool removeDoneTask() {
bool ret = false;
while (m_begin.hd && m_begin.hd.done()) {
auto h = m_begin.hd;
m_begin.hd = h.promise().behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
h.destroy();
--m_taskCount;
ret = true;
}
return ret;
}

HANDLE m_hIocp;
const std::thread::id m_curTid;
MainCoro m_main;
CoTask m_begin, m_end;
std::atomic m_taskCount = 0;
};

// IO文件操作類型
enum IoFileType :int {
TUnknown = 0,
TRead,
TWrite,
TListen,
TAccept,
TConnect,
TSend,
TRecv,
TSendto,
TRecvfrom,
TSleep,
};

// IO文件調(diào)度優(yōu)先級(jí)
enum IoFilePriority : int {
WaitingForPolling = 0, // 等待順序輪詢調(diào)度
DispatchImmediately, // 立即調(diào)度
};

// 支持異步掛起的可等待文件對(duì)象(基類)
template
struct IoFileAwaitable : OVERLAPPED {
operator HANDLE() const { return m_hFile; }
operator SOCKET() const { return (SOCKET)m_hFile; }
bool isRegisted() const { return m_isRegisted; }
bool isCompleted() const { return m_isCompleted; }
void setCompleted() { m_isCompleted = true; }
void resetCompleted() {
memset(this, 0, sizeof(OVERLAPPED));
m_isCompleted = 0;
}
void setReturn(Ret ret) { m_ret = ret; }
Ret getReturn() const { return m_ret; }
IoFileType& type() { return m_fileType; }
const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;
switch (m_fileType)
{
_TypeNameItem(TUnknown);
_TypeNameItem(TRead);
_TypeNameItem(TWrite);
_TypeNameItem(TListen);
_TypeNameItem(TAccept);
_TypeNameItem(TConnect);
_TypeNameItem(TSend);
_TypeNameItem(TRecv);
_TypeNameItem(TSendto);
_TypeNameItem(TRecvfrom);
_TypeNameItem(TSleep);
default:
return "TUnknown";
}
}
void* getTransferredBytesCountBuffer() const {
return m_transferredBytesCount;
}
void setTransferredBytesCountRecvBuffer(void* countBuf) {
m_transferredBytesCount = countBuf;
}
bool close() {
if (m_hFile) {
return CloseHandle(detach());
}
return true;
}
HANDLE detach() {
HANDLE ret = *this;
m_hFile = 0;
m_isRegisted = 0;
return ret;
}
HANDLE attach(CoScheduler& sc, HANDLE s) {
HANDLE ret = *this;
m_hFile = s;
m_isRegisted = sc.registe(m_hFile);
return ret;
}
int getLastError() const { return m_lastError; }
void setLastError(int err) { m_lastError = err; }
CoTask::handle& onwer() { return m_owner; }
auto getPriority() const { return m_priority; }
void setPriority(IoFilePriority priority) { m_priority = priority; }
// awaitable methed
bool await_ready() const { return isCompleted(); }
void await_suspend(CoTask::handle h) {
h.promise().pAwaitableFile = this;
m_owner = h;
}
Ret await_resume() {
setTransferredBytesCountRecvBuffer(nullptr);
return getReturn();
}
protected:
IoFileAwaitable()
: m_hFile((HANDLE)0)
, m_isRegisted(false)
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, HANDLE hFile)
: m_hFile(hFile)
, m_isRegisted(sc.registe(m_hFile))
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, SOCKET sock)
: m_hFile((HANDLE)sock)
, m_isRegisted(sc.registe(sock))
{
resetCompleted();
}
HANDLE m_hFile;
bool m_isRegisted;
bool m_isCompleted;
IoFileType m_fileType = IoFileType::TUnknown;
void* m_transferredBytesCount = nullptr;
int m_lastError = ERROR_SUCCESS;
IoFilePriority m_priority = IoFilePriority::WaitingForPolling;
CoTask::handle m_owner;
Ret m_ret = 0;
};

// 支持異步掛起的套接字(基類)
template
struct AsyncSocket :public IoFileAwaitable {
using base = IoFileAwaitable;
~AsyncSocket() { close(); }
sockaddr_in localAddress() const { return m_local; }
sockaddr_in remoteAddress() const { return m_remote; }
sockaddr_in* localAddress() { return &m_local; }
sockaddr_in* remoteAddress() { return &m_remote; }
int close() {
int ret = 0;
if (base::m_hFile) {
if (base::m_hFile != (HANDLE)INVALID_SOCKET) {
ret = closesocket(detach());
}
else {
base::m_hFile = 0;
base::m_isRegisted = 0;
}
}
return ret;
}
SOCKET detach() {
return (SOCKET)base::detach();
}
SOCKET attach(CoScheduler& sc, SOCKET s) {
return (SOCKET)base::attach(sc, (HANDLE)s);
}
protected:
AsyncSocket(CoScheduler& sc, SOCKET sock)
:base(sc, sock)
{ }
sockaddr_in m_local = { 0 };
sockaddr_in m_remote = { 0 };
};

struct AsyncAcceptor;
// 支持異步掛起的服務(wù)端監(jiān)聽器,是一個(gè)等待連接到來的TCP監(jiān)聽套接字
struct AsyncListener :public AsyncSocket {
AsyncListener(CoScheduler& sc, unsigned long addr, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = addr;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
AsyncListener(CoScheduler& sc, const char* ip, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_port = htons(port);
InetPton(AF_INET, ip, &m_local.sin_addr);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
sockaddr_in listenAddress() const { return localAddress(); }
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};

// 支持異步掛起的TCP連接(基類)
struct AsyncTcp :public AsyncSocket {
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:
AsyncTcp(CoScheduler& sc)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{ }
};

// 支持異步掛起的服務(wù)端接收器,是一個(gè)接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {
AsyncAcceptor(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TAccept;
}
// 解析到來連接的地址信息,保存在內(nèi)部地址變量
void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {
if (lpAcceptBuffer == 0 || nNumberOfBytesAcceptBuffer == 0)
throw std::logic_error("perseAddress parm is invalid.");

static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs = 0;
if (!lpfnGetAcceptSockAddrs) {
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
*this,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&lpfnGetAcceptSockAddrs,
sizeof(lpfnGetAcceptSockAddrs),
&dwBytes, NULL, NULL))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener GetAcceptexSockAddrs error.");
}
}
int localLen = 0, remoteLen = 0;
lpfnGetAcceptSockAddrs(
lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(LPSOCKADDR*)localAddress(),
&localLen,
(LPSOCKADDR*)remoteAddress(),
&remoteLen
);
}
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncListener& sockListener, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
int await_resume() {
setPriority(IoFilePriority::WaitingForPolling);
return AsyncTcp::await_resume();
}
};

// 支持異步掛起的用戶端連接器,是一個(gè)發(fā)起端TCP套接字
struct AsyncConnector : public AsyncTcp {
AsyncConnector(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
}
AsyncConnector(CoScheduler& sc, const char* ip, unsigned short port)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
setConnectRemoteAddress(ip, port);
bindConnectLocalPort(0);
}
void setConnectRemoteAddress(const char* ip, unsigned short port) {
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_port = htons(port);
InetPton(AF_INET, ip, &m_remote.sin_addr);
}
int bindConnectLocalPort(unsigned short port = 0) {
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = INADDR_ANY;
m_local.sin_port = htons(port);
return ::bind(*this, (const sockaddr*)&m_local, sizeof(m_local));
}
// 返回值true成功,false失敗
AsyncConnector& connect(const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
};

// 作為服務(wù)端Acceptor應(yīng)該具有事件完成并立即調(diào)度優(yōu)先級(jí),保證吞吐量
// 返回值true成功,false失敗
inline
AsyncAcceptor&
accept(AsyncListener& sockListener, AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {
static LPFN_ACCEPTEX lpfnAcceptEx = 0;

sockListener.type() = IoFileType::TListen;
sockAccept.type() = IoFileType::TAccept;
sockAccept.resetCompleted();
sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);
sockAccept.setPriority(IoFilePriority::DispatchImmediately);//設(shè)置為立即調(diào)度優(yōu)先級(jí)
if (lpNumberOfBytesAccepted)
*lpNumberOfBytesAccepted = 0;

if (!lpfnAcceptEx) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; // GUID,這個(gè)是識(shí)別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockListener,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
lpfnAcceptEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Accept get AcceptEx function address error.");
}
}

bool ret = lpfnAcceptEx(
sockListener,
sockAccept,
(char*)lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
lpNumberOfBytesAccepted,
&sockAccept
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockAccept.setReturn(ret);
return sockAccept;
}
sockAccept.setReturn(false);
sockAccept.setCompleted();
sockAccept.setPriority(IoFilePriority::WaitingForPolling);
return sockAccept;
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
static LPFN_CONNECTEX lpfnConnectEx = 0;
sockCon.type() = IoFileType::TConnect;
sockCon.resetCompleted();
if (lpdwBytesSent)
*lpdwBytesSent = 0;

if (!lpfnConnectEx) {
GUID GuidConnectEx = WSAID_CONNECTEX; // GUID,這個(gè)是識(shí)別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockCon,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&lpfnConnectEx,
sizeof(lpfnConnectEx),
&dwBytes, NULL, NULL))
{
lpfnConnectEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Connect get ConnectEx function address error.");
}
}
sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);
bool ret = lpfnConnectEx(
sockCon,
name,
namelen,
lpSendBuffer,
dwSendDataLength,
lpdwBytesSent,
&sockCon
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockCon.setReturn(ret);
return sockCon;
}

sockCon.setReturn(false);
sockCon.setCompleted();
return sockCon;
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
sockCon.setConnectRemoteAddress(ip, port);
sockCon.bindConnectLocalPort(0);
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
send(AsyncTcp& sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {
sock.type() = IoFileType::TSend;
sock.resetCompleted();
if (lpNumberOfBytesSend)
*lpNumberOfBytesSend = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);
WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };
auto ret = WSASend(sock, &wsaBuf, 1, lpNumberOfBytesSend, 0, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
recv(AsyncTcp& sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {
sock.type() = IoFileType::TRecv;
sock.resetCompleted();
if (lpNumberOfBytesRecv)
*lpNumberOfBytesRecv = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);
WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };
unsigned long dwFlag = 0;
auto ret = WSARecv(sock, &wsaBuf, 1, NULL, &dwFlag, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 支持異步掛起的UDP(非連接)套接字
struct AsyncUdp : public AsyncSocket {
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int status() const { return m_isBroadCast; }
int* remoteLen() { return &m_remoteLen; }

protected:
//isBroadCast = true則為發(fā)送端udp(client端),使用sendTo,此時(shí)可以在sendTo階段動(dòng)態(tài)指定廣播目的地址
//isBroadCast = false則為接受端udp(server端),使用recvFrom,構(gòu)造時(shí)必須指定綁定的廣播接收地址
AsyncUdp(CoScheduler& sc, bool isBroadCast = true, const char* ip = 0, unsigned short port = 0)
: AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
setBroadCast(isBroadCast, ip, port);
}
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {
if (*this && *this != INVALID_SOCKET)
{
m_isBroadCast = isBroadCast;
if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)&m_isBroadCast, sizeof(m_isBroadCast)) == 0) {
if (isBroadCast) {
setBindAddress(0, 0);
setBroadcastAddress(ip, port);
}
else {
setBindAddress(ip, port);
}
return m_isBroadCast;
}
}
return m_isBroadCast = -1;
}
// 設(shè)置接收器綁定的收聽本地地址
bool setBindAddress(const char* ip, unsigned short port)
{
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp set reuse address error.");
}
if (::bind(*this, (const sockaddr*)&m_local, sizeof(sockaddr_in)))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp bind address error.");
}
return true;
}
// 設(shè)置發(fā)送要廣播到的目標(biāo)地址(遠(yuǎn)端地址)
void setBroadcastAddress(const char* ip, unsigned short port)
{
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_remote.sin_port = htons(port);
}
int m_remoteLen = 0;
int m_isBroadCast = -1;
};

// 支持異步掛起的UDP協(xié)議廣播器套接字(發(fā)送端,client端)
struct AsyncBroadcastor :public AsyncUdp {
AsyncBroadcastor(CoScheduler& sc, const char* ip = 0, unsigned short port = 0)
:AsyncUdp(sc, true, ip, port)
{
type() = IoFileType::TSendto;
}
// 發(fā)送端udp(client端)向內(nèi)部已保存的指定的廣播地址發(fā)送數(shù)據(jù)(未設(shè)置廣播地址將失敗)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
// 發(fā)送端udp(client端)向動(dòng)態(tài)指定的廣播地址發(fā)送數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
bool isValidBroadcastor() const { return status() == 1; }
using AsyncUdp::setBroadcastAddress;
};

// 支持異步掛起的UDP協(xié)議接收器套接字(接收端,server端)
struct AsyncReceiver :public AsyncUdp {
AsyncReceiver(CoScheduler& sc, const char* ip, unsigned short port)
:AsyncUdp(sc, false, ip, port)
{
type() = IoFileType::TRecvfrom;
}
// 接收端udp(server端)向綁定的本地地址獲取廣播數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncReceiver& recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);
bool isValidReceiver() const { return status() == 0; }
using AsyncUdp::setBindAddress;
};

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.type() = IoFileType::TSendto;
sock.resetCompleted();
if (lpNumberOfBytesSent)
*lpNumberOfBytesSent = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);
WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };
auto ret = WSASendTo(sock, &wsaBuf, 1, lpNumberOfBytesSent, 0,
(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const char* ip, unsigned short port,
const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.setBroadcastAddress(ip, port);
return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}

// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncReceiver&
recvFrom(AsyncReceiver& sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {
sock.type() = IoFileType::TRecvfrom;
sock.resetCompleted();
if (lpNumberOfBytesRecvd)
*lpNumberOfBytesRecvd = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);
WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };
DWORD dwFlag = 0;
*sock.remoteLen() = sizeof(sockaddr_in);
auto ret = WSARecvFrom(sock, &wsaBuf, 1, NULL, &dwFlag,
(sockaddr*)sock.remoteAddress(), sock.remoteLen(), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}

struct AsyncFile : public IoFileAwaitable {
AsyncFile(CoScheduler& sc, const char* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
AsyncFile(CoScheduler& sc, const wchar_t* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
~AsyncFile() { close(); }
AsyncFile& read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);
AsyncFile& write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};

// 返回值true成功,false失敗
inline
AsyncFile&
read(AsyncFile& file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);
auto ret = ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}

// 返回值true成功,false失敗
inline
AsyncFile&
write(AsyncFile& file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);
auto ret = WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}

struct AsyncSleepor :public IoFileAwaitable {
AsyncSleepor(long long microOrMilliSeconds = 0, bool useMicroSeconds = false)
: microOrMilliSeconds(microOrMilliSeconds)
, useMicroSeconds(useMicroSeconds)
{
type() = IoFileType::TSleep;
start();
}
void start()
{
tp = std::chrono::steady_clock::now();
}
auto getSpendMicroSeconds() const {
constexpr auto div = std::nano::den / std::micro::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
auto getSpendMilliSeconds() const {
constexpr auto div = std::nano::den / std::milli::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
bool isCompleted() {
setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());
return (m_isCompleted = getReturn() >= microOrMilliSeconds);
}
protected:
long long microOrMilliSeconds;
bool useMicroSeconds;
std::chrono::steady_clock::time_point tp;
};

//毫秒妙級(jí)別休眠,返回實(shí)際休眠的毫妙數(shù)
inline
AsyncSleepor
sleepFor(long long milliSeconds) {
return AsyncSleepor{ milliSeconds };
}

//微妙級(jí)別休眠,返回實(shí)際休眠的微妙數(shù)
inline
AsyncSleepor
sleepForEx(long long microSeconds) {
return AsyncSleepor{ microSeconds, true };
}

void test_coroutine_tcp_server(unsigned short serverPort = 33100, int totalClientCount = 100, bool dumpTestInfo = 0);

void test_coroutine_udp_random_broadcast(unsigned short broadCastPort = 33000, int totalClientBroadcastCount = 20, bool dumpTestInfo = 0);

#endif

#endif

實(shí)現(xiàn)文件

CLCoroutine.cpp

#include "CLCoroutine.h"

#if (defined(CLUseCorotine) && CLUseCorotine)

#include "../_cl_common/CLCommon.h"
#include "../_cl_string/CLString.h"
#include "../_cl_logger/CLLogger.h"

void CoScheduler::run() {
auto coro = [this]() ->MainCoro {
//logger.debug("nMain coro scheduler started ...");
#ifdef _WIN32
if (m_hIocp) {
CLString err;
DWORD dwMilliseconds = 0;
//logger.debug("nMain coro scheduler: Iocp loop started ...");
while (1) {
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* pOverlapped = 0;
while (GetQueuedCompletionStatus(m_hIocp, &numberOfBytesTransferred, &completionKey, &pOverlapped, dwMilliseconds))
{
if (pOverlapped) { //io完成事件
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(ERROR_SUCCESS);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = numberOfBytesTransferred;

// 根據(jù)可等待對(duì)象的優(yōu)先級(jí),決定是否立即調(diào)度或是輪流調(diào)度讓每個(gè)任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else { //新task來到,立即調(diào)度
if (numberOfBytesTransferred == 0 && completionKey) {
auto h = CoTask::handle::from_address((void*)completionKey);
moveTaskToEnd(h);
h.resume();
}
else {
auto lr = GetLastError();
logger.warning("Iocp: get status in event loop: ",err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
}
}
auto lr = GetLastError();
if (lr == WSA_WAIT_TIMEOUT) {
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
m_end.resume(); //執(zhí)行resume,此刻所有等待io均未完成不會(huì)執(zhí)行,但yeild讓渡的協(xié)程得到執(zhí)行;

}
else if(pOverlapped) {
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(lr);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = 0;
IoFileType fileType = pFile->type();
switch (fileType)
{
case TUnknown:
break;
case TRead:
case TWrite:
case TListen:
case TAccept:
case TConnect:
pFile->setReturn(false);
break;
case TSend:
case TRecv:
case TSendto:
case TRecvfrom:
pFile->setReturn(SOCKET_ERROR);
break;
case TSleep:
break;
default:
break;
}
switch (lr)
{
case ERROR_NETNAME_DELETED: //64 指定的網(wǎng)絡(luò)名不再可用
break;
case ERROR_SEM_TIMEOUT://121 信號(hào)燈超時(shí)
break;
default:
logger.error("Iocp: get status out event loop: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
break;
}
// 根據(jù)可等待對(duì)象的優(yōu)先級(jí),決定是否立即調(diào)度或是輪流調(diào)度讓每個(gè)任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else {
logger.error("Iocp: get status out event loop no completed: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
if (taskCount() == 0)
break;
}
CloseHandle(m_hIocp);
m_hIocp = 0;
//logger.debug("nMain coro scheduler: Iocp loop has done ...");
}
#endif
//logger.debug("nMain coro scheduler quit ...");
co_return;
};
m_main = coro();
m_main.hd.promise().sc = this;
m_main.hd.resume();
m_main.hd.destroy();
}

bool CoTask::resume() {
if (!hd)
return true;
else if (hd.done()) {
return false;
}
else {
auto pFile = (IoFileAwaitable<>*) hd.promise().pAwaitableFile;
if (!pFile) //第一次調(diào)度或者yield的協(xié)程
hd.resume();
else {
if (pFile->type() == IoFileType::TSleep) { //休眠調(diào)度
if (((AsyncSleepor*)pFile)->isCompleted()) {
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
else if (pFile->isCompleted()) { //io完成調(diào)度
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
return true;
}
}

#ifdef _WIN32
#else // Windows
#endif // Linux

AsyncAcceptor& AsyncListener::accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}

AsyncAcceptor& AsyncAcceptor::accept(AsyncListener& sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}

AsyncConnector& AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncConnector& AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncConnector& AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}

AsyncTcp& AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{
return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}

AsyncTcp& AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{
return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}

AsyncBroadcastor& AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}

AsyncBroadcastor& AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}

AsyncReceiver& AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{
return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}

AsyncFile& AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{
return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}

AsyncFile& AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{
return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}

#include

void test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);

CoScheduler sc;
int servRun = 0;
int totals = 0;
CLTick tk;
// 服務(wù)端監(jiān)聽器task
sc.gather([&]()->CoTask {
logger.info("nTcp server coro started ...");
AsyncListener listener(sc, ADDR_ANY, serverPort);
// loop accept
std::vector acceptbuf(260);
AsyncAcceptor* pAcceptor = 0;
int servId = 0;
while (true)
{
AsyncAcceptor& acceptor = pAcceptor ? *pAcceptor : *(pAcceptor = new AsyncAcceptor(sc));
DWORD nValidAccept;
logger.debug("nServer listener accept wait ...");
bool ret = co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), &nValidAccept);
if (ret) {
//create server task
acceptor.perseAddress(acceptbuf.data(), acceptbuf.size());
servRun++;
// 服務(wù)端task
sc.gather([&](AsyncAcceptor* pAcceptor, int idx) ->CoTask {
AsyncAcceptor& acp = *pAcceptor;
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::sprintf(bufSend.data(), "nHello client. this is server %d. %dst response.", idx, total);
logger.debug("nServer[%d] send wait ...", idx);
int ret = co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
logger.debug("nServer[%d] recv wait ...", idx);
ret = co_await acp.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (nbytesRecv == 0)
break;
logger.debug("nServer[%d] recv client msg = %s", idx, bufRecv.data());
total++;
totals++;
}
logger.debug("nServer[%d] recv client close msg", idx);
delete pAcceptor;
servRun--;
}, pAcceptor, ++servId);
pAcceptor = 0;
}
}
logger.info("nTcp server coro quit.%d", GetCurrentThreadId());
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientCount) {
i++;
logger.info("nClients creater make client task %d.", i);
nClient++;
// 客戶端task
sc.gather([&](int idx)->CoTask {
AsyncConnector con(sc);
logger.debug("nClient[%d] connect wait ...", idx);
auto ret = co_await con.connect("127.0.0.1", serverPort);
if (!ret) {
logger.debug("nClinet[%d] connect server fail, %s", idx, CLString().getLastErrorString(GetLastError()));
co_return;
}
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::snprintf(bufSend.data(), bufSend.size(), "nHelle server, this is client %d: %dst request.", idx, total);
logger.debug("nClient[%d] send wait ...", idx);
auto ret = co_await con.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
if (!(ret == SOCKET_ERROR || nbytesSend == 0)) {
logger.debug("nClient[%d] recv wait ...", idx);
ret = co_await con.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0)
break;
logger.debug("nClient[%d] recv server msg = %s", idx, bufRecv.data());
}
total++;
}
logger.debug("nClient[%d] get server close msg and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計(jì)協(xié)程
sc.gather([&]()->CoTask {
auto last = totals;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000);
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totals, time, (totals - last) / (time - lastTime));
last = totals;
lastTime = time;
}
});

sc.run();

}

void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
srand(time(0));
CoScheduler sc;
int servRun = 0;
int totalsRecvd = 0;
int totalsSendto = 0;
CLTick tk;
std::vector portList(totalClientBroadcastCount);
for (int i = 0; i < totalClientBroadcastCount; i++)portList[i] = broadCastPort + i;
// 服務(wù)端生成器
sc.gather([&]()->CoTask {
logger.info("nServers creater coro started.");
int nServer = 0;
for (int i = 0; 1; )
{
if (nServer < totalClientBroadcastCount) {
i++;
logger.info("nServers creater make server task %d.", i);
nServer++;
// 服務(wù)端task (廣播接收端)
sc.gather([&](int i)->CoTask {
logger.info("nUdp server[%d] coro started bind port = %d...", i, portList[i - 1]);
AsyncReceiver serv(sc, "127.0.0.1", portList[i - 1]);
// recv
std::vector recv(260);
int servId = 0;
int total = 1;
while (true)
{
DWORD nbytesRecv;
logger.debug("nUdp server[%d] recvfrom wait ...", i);
int ret = co_await serv.recvFrom(recv.data(), recv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0) {
CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());
break;
}
logger.debug("nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg = %s", i, total, nbytesRecv, recv.data());
total++;
totalsRecvd++;
}
logger.info("nUdp server[%d] coro quit.%d", i, GetCurrentThreadId());
nServer--;
}, i);
}
else {
logger.debug("nServers creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nServers creater fetch run privilege again.");
}
}
logger.debug("nServers creater coro quit.");
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientBroadcastCount) {
i++;
logger.info("nClients creater make broadcastor client task %d.", i);
nClient++;
// 客戶端task (廣播發(fā)送端)
sc.gather([&](int idx)->CoTask {
AsyncBroadcastor broadcast(sc);
std::vector bufSent(260);
DWORD nbytesSent;
int total = 1;
while (1) {
auto randPort = portList[rand() % totalClientBroadcastCount];
std::snprintf(bufSent.data(), bufSent.size(),
"nHelle server, this is broadcastor %d: %dst randon broadcast to port=%d."
, idx, total, randPort);
logger.debug("nBroadcastor[%d] send wait ...", idx);
auto ret = co_await broadcast.sendTo("127.0.0.1", randPort,
bufSent.data(), std::strlen(bufSent.data()) + 1, &nbytesSent);
if (ret == SOCKET_ERROR || nbytesSent == 0) {
break;
}
logger.debug("nBroadcastor[%d] sendto server msg = %s", idx, bufSent.data());
total++;
totalsSendto++;
}
logger.debug("nBroadcastor[%d] send 0 bytes and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計(jì)協(xié)程
sc.gather([&]()->CoTask {
auto last = totalsRecvd + totalsSendto;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000); // 協(xié)程休眠3000毫秒
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totalsRecvd + totalsSendto, time, (totalsRecvd + totalsSendto - last) / (time - lastTime));
last = totalsRecvd + totalsSendto;
lastTime = time;
}
});

sc.run();
}

#endif

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4344

    瀏覽量

    62861
  • C++
    C++
    +關(guān)注

    關(guān)注

    22

    文章

    2114

    瀏覽量

    73775
  • 源代碼
    +關(guān)注

    關(guān)注

    96

    文章

    2946

    瀏覽量

    66843
  • 生成器
    +關(guān)注

    關(guān)注

    7

    文章

    319

    瀏覽量

    21074
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    C++20新特性解析

    C++之父都說過,C++20C++語言的一次重大變革,引入了大量的新特性。
    發(fā)表于 10-08 09:07 ?2038次閱讀

    談?wù)?b class='flag-5'>協(xié)的那些事兒

    隨著異步編程的發(fā)展以及各種并發(fā)框架的普及,協(xié)作為一種異步編程規(guī)范在各類語言中地位逐步提高。我們不單單會(huì)在自己的程序中使用協(xié)
    的頭像 發(fā)表于 01-26 11:36 ?1147次閱讀
    談?wù)?b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的那些事兒

    關(guān)于C++ 20協(xié)最全面詳解

    花了一兩周的時(shí)間后,我想寫寫 C++20 協(xié)的基本用法,因?yàn)?C++ 的協(xié)讓我感到很奇怪,寫
    的頭像 發(fā)表于 04-12 11:10 ?1.3w次閱讀
    關(guān)于<b class='flag-5'>C</b>++ <b class='flag-5'>20</b><b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>最全面詳解

    Python后端項(xiàng)目的協(xié)是什么

    最近公司 Python 后端項(xiàng)目進(jìn)行重構(gòu),整個(gè)后端邏輯基本都變更為采用“異步協(xié)的方式實(shí)現(xiàn)。看著滿屏幕經(jīng)過 async await(協(xié)
    的頭像 發(fā)表于 09-23 14:38 ?1354次閱讀

    現(xiàn)代C++20實(shí)戰(zhàn)手冊(cè)

    追其根源,C++ 為何如此受歡迎,除了它本身出色的性能,作為一種高級(jí)面向?qū)ο笳Z言,適用領(lǐng)域極其廣泛,小到嵌入式,大到分布式服務(wù)器,到處可以見到 C++ 的身影;另一個(gè)很重要的原因就是它“最近”不斷發(fā)布具有有趣功能的新語言標(biāo)準(zhǔn),也
    的頭像 發(fā)表于 01-17 09:55 ?3087次閱讀

    詳解Linux線程、線程與異步編程、協(xié)異步

    協(xié)不是系統(tǒng)級(jí)線程,很多時(shí)候協(xié)被稱為“輕量級(jí)線程”、“微線程”、“纖(fiber)”等。簡單來說可以認(rèn)為
    的頭像 發(fā)表于 03-16 15:49 ?1020次閱讀

    協(xié)的概念及協(xié)的掛起函數(shù)介紹

    協(xié)是一種輕量級(jí)的線程,它可以在單個(gè)線程中實(shí)現(xiàn)并發(fā)執(zhí)行。與線程不同,協(xié)不需要操作系統(tǒng)的上下文切換,因此可以更高效地使用系統(tǒng)資源。Kotlin 協(xié)
    的頭像 發(fā)表于 04-19 10:20 ?922次閱讀

    C++20 modules入門

    以前一直有了解C++20的新特性,但是因?yàn)榫幾g器對(duì)此支持的比較少,所以很少實(shí)踐。
    的頭像 發(fā)表于 05-29 15:03 ?1021次閱讀
    <b class='flag-5'>C++20</b> modules入門

    Kotlin協(xié)實(shí)戰(zhàn)進(jìn)階之筑基篇1

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:24 ?741次閱讀
    Kotlin<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b><b class='flag-5'>實(shí)戰(zhàn)</b>進(jìn)階之筑基篇1

    Kotlin協(xié)實(shí)戰(zhàn)進(jìn)階之筑基篇2

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:25 ?785次閱讀
    Kotlin<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b><b class='flag-5'>實(shí)戰(zhàn)</b>進(jìn)階之筑基篇2

    Kotlin協(xié)實(shí)戰(zhàn)進(jìn)階之筑基篇3

    協(xié)的概念在1958年就開始出現(xiàn)(比線程還早), 目前很多語言開始原生支, Java 沒有原生協(xié)但是大型公司都自己或者使用第三方來支持
    的頭像 發(fā)表于 05-30 16:26 ?726次閱讀

    C++20 modules基礎(chǔ)知識(shí)入門

    以前一直有了解C++20的新特性,但是因?yàn)榫幾g器對(duì)此支持的比較少,所以很少實(shí)踐。
    的頭像 發(fā)表于 06-15 11:37 ?902次閱讀
    <b class='flag-5'>C++20</b> modules基礎(chǔ)知識(shí)入門

    C/C++協(xié)編程的相關(guān)概念和技巧

    自己的寄存器上下文和,可以在多個(gè)入口點(diǎn)間自由切換,而不是像傳統(tǒng)的函數(shù)調(diào)用那樣在一個(gè)入口點(diǎn)開始、另一個(gè)入口點(diǎn)結(jié)束。協(xié)的概念最早可以追溯到1963年,由Melvin Conway提出。經(jīng)過多年的發(fā)展,
    的頭像 發(fā)表于 11-09 11:34 ?830次閱讀

    Linux線程、線程與異步編程、協(xié)異步介紹

    協(xié)不是系統(tǒng)級(jí)線程,很多時(shí)候協(xié)被稱為“輕量級(jí)線程”、“微線程”、“纖(fiber)”等。簡單來說可以認(rèn)為
    的頭像 發(fā)表于 11-11 11:35 ?1216次閱讀
    Linux線程、線程與<b class='flag-5'>異步</b>編程、<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>與<b class='flag-5'>異步</b>介紹

    何選擇一個(gè)合適的協(xié)來獲得CPU執(zhí)行權(quán)

    如今雖不敢說協(xié)已經(jīng)是紅的發(fā)紫,但確實(shí)是越來越受到了大家的重視。Golang中的已經(jīng)是只有g(shù)oroutine,以至于很多go程序員是只知有協(xié),不知有線程了。就連
    的頭像 發(fā)表于 11-13 14:10 ?431次閱讀
    何選擇一個(gè)合適的<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>來獲得CPU執(zhí)行權(quán)
    主站蜘蛛池模板: 网友自拍偷拍| 国产午夜精品久久久久婷婷| 国产免费高清mv视频在线观看| 老奶奶50p| 色欲人妻AAAAAAA无码| 18禁止观看免费私人影院| 国产精品永久在线| 女性爽爽影院免费观看| 亚洲精品久久午夜麻豆| 北原多香子qvod| 久久影院中文字幕| 香蕉97超级碰碰碰碰碰久| eussse手机电影在线观看| 精品 在线 视频 亚洲| 日本无码欧美激情在线视频| 主播蜜汁丝袜精品自拍| 国产亚洲精品久久777777| 日韩AV无码一区二区三区不卡毛片| 中国人泡妞xxxxxxxx19| 国产亚洲精品久久久久久线投注 | 伊人久久青草| 国产蜜臀AV在线一区视频| 秋霞鲁丝片Av无码| 最近高清日本免费| 红尘影院在线观看| 天美传媒 免费观看| yellow日本动漫观看免费| 绿巨人www| 亚洲一区精品伊人久久伊人| 国产精品无码久久av| 日韩精品无码视频一区二区蜜桃| 69精品国产人妻蜜桃国产毛片| 狠日狠干日曰射| 性色欲情网站IWWW九文堂| 囯产精品久久久久久久久免费蜜桃| 女子叉开腿让男子桶免费软件| 在线观看免费视频a| 精品久久久无码21P发布| 性夜影院午夜看片| 国产69精品久久久久人妻刘玥| 青柠在线观看免费播放电影|