c++20出來有一段時(shí)間了。其中一大功能就是終于支持協(xié)程了(c++作為行業(yè)大哥大級(jí)別的語言,居然到C++20才開始支持協(xié)程,我也是無力吐槽了,讓多少人等了多少年,等了多少青春)但千呼萬喚他終于還是來了,c++標(biāo)準(zhǔn)委員會(huì)的謹(jǐn)慎態(tài)度也造就了c++20的給出來協(xié)程:“性能之優(yōu)秀”,“開發(fā)之靈活”和讓人勸退的“門檻之高”。
不過話說回來,c++從出身就注定了背負(fù)性能使命,他不是為簡單為應(yīng)用層維度開發(fā)的語言(如果應(yīng)用層你大可以用python java ruby lua等語言),他是一門可以開發(fā)其他語言的語言,所以追逐高性能和靈活性,舍棄矯情的低門檻,畢竟C++不是設(shè)計(jì)來給所有人用的語言。
之前用過python的協(xié)程,協(xié)程易用程度高,所以c++20到來也想嘗試c++狀態(tài)下的協(xié)程,但是接觸以后發(fā)現(xiàn)問題,c++20的協(xié)程狀態(tài)是:只有基礎(chǔ)設(shè)施,也就是實(shí)現(xiàn)了無棧協(xié)程的所有機(jī)制和功能,但沒有封裝到具體的應(yīng)用層標(biāo)準(zhǔn)庫STL。此時(shí)大部分人就只能干瞪眼了,由于復(fù)雜的協(xié)程運(yùn)作機(jī)制,沒有實(shí)現(xiàn)標(biāo)準(zhǔn)庫的情況下,說要用上協(xié)程你是在開玩笑,網(wǎng)上一致的意見c++20是半成品,要真的用上c++協(xié)程得等c++23協(xié)程標(biāo)準(zhǔn)庫完善后才行。
一貫本著不作死就不會(huì)死得態(tài)度,只會(huì)用庫不懂底層機(jī)制,那不是用c++的態(tài)度,所以深入學(xué)習(xí)c++20協(xié)程,半個(gè)月時(shí)間,寫了一個(gè)簡單的協(xié)程庫,在此過程中也對(duì)復(fù)雜的c++協(xié)程機(jī)制有了深入的了解。話說asio和cppcoro兩個(gè)庫已經(jīng)支持了c++20協(xié)程,但是我覺得還是龐大和復(fù)雜,對(duì)于想通過看庫源代碼學(xué)習(xí)c++協(xié)程的同學(xué),我覺得還是算了,在不懂協(xié)程機(jī)理的情況下,你連看源代碼都看不懂好吧!!有人會(huì)說有源代碼了你都看不懂,你是吹牛。那還真不是,c++協(xié)程在語法上會(huì)有些顛覆你的三觀,我們來舉個(gè)例子:
co_await std::suspend_always{};
co_yield a;
co_return 0;
}
int main(){
auto co = func(1);
co.hd.resume();
int a = co.hd.resume();
int b = co.hd.resume();
}
有人說func是一個(gè)協(xié)程函數(shù),main中的func運(yùn)行后會(huì)返回0,也就是 co是一個(gè)int變量值為0;
如果你按常規(guī)代碼理解,沒錯(cuò)。但是在c++協(xié)程的世界,他完全不是上面說的情況。
正確的情況是: func在這里是一個(gè)協(xié)程生成器(這個(gè)概念很重要,他不是函數(shù))返回值co是一個(gè)協(xié)程管理類A關(guān)聯(lián)了具體協(xié)程執(zhí)行體后的協(xié)程實(shí)例的控制句柄(的包裝對(duì)象)。明確co不是協(xié)程實(shí)例(協(xié)程幀),是協(xié)程實(shí)例的控制句柄的包裝對(duì)象,在func(1)執(zhí)行之后他只是“動(dòng)態(tài)”生成了一個(gè)協(xié)程實(shí)例,并把控制句柄返回給用戶,但此時(shí)這個(gè)協(xié)程是掛起的,協(xié)程體{}代碼塊還沒有被執(zhí)行過,所以不存在返回值。這非常的繞,讓人難以理解(后面還有更難理解的)。
在三次co.hd.resume();調(diào)用后協(xié)程才被完全執(zhí)行完畢,此時(shí)a=1,b=0;
返回值保存在協(xié)程的實(shí)例(協(xié)程幀)中,通過協(xié)程管理類A的內(nèi)部流程控制函數(shù)管理著返回值(A的promise_type定義了所有的協(xié)程控制行為)。
總結(jié)幾點(diǎn) (重要,不要混淆):
1、“協(xié)程管理類A是包含協(xié)程行為控制的類定義 ,A不是協(xié)程,形如 A func(int a, …){ … } 才是一個(gè)完整的協(xié)程定義”;所以A func1(){}; A func2(){}; A func3(){}; 都可以與同一個(gè)協(xié)程控制A綁定,但他們是3個(gè)不同的協(xié)程定義,只是協(xié)程控制行為都為A。好處是,你可以用一個(gè)std::vector< A > 保存下這3個(gè)不同的協(xié)程,他們的主協(xié)程體(功能實(shí)現(xiàn))各不相同。要讓A為一個(gè)協(xié)程管理類,必須包含struct promise_type{}定義,和一個(gè)控制句柄對(duì)象std::coroutine_handle< promise_type > hd; 特別的,A可以不實(shí)現(xiàn)await_xxx接口,他可以不是可等待體。
2、代碼塊體中有co_await ,co_yield,co_return關(guān)鍵字,則為協(xié)程體代碼塊,運(yùn)行到關(guān)鍵字位置會(huì)**“觸發(fā)協(xié)程掛起” ** ,此時(shí)原調(diào)用者代碼阻塞在resume函數(shù)位置,運(yùn)行權(quán)重新回到調(diào)用者,此時(shí)resume會(huì)返回,調(diào)用者繼續(xù)執(zhí)行;
3、特別的:
co_await可以與可等待對(duì)象配合,形成更為復(fù)雜的協(xié)程掛起行為:一般異步IO操作,都是通過co_await + io可等待對(duì)象,完成異步操作后掛起協(xié)程,等待異步io完成后,再由**“調(diào)度器”**恢復(fù)協(xié)程繼續(xù)運(yùn)行,從而發(fā)揮異步的意義,形成io復(fù)雜度向cpu復(fù)雜度的轉(zhuǎn)移。因此,協(xié)程解決的是問題是“異步”而不是“并行”,要實(shí)現(xiàn)并行只能考慮多線程或多進(jìn)程,協(xié)程可以將單個(gè)線程cpu效率發(fā)揮到最大,而不會(huì)被io阻塞浪費(fèi)掉當(dāng)前線程的cpu算能,那問題來了,如果我們用 協(xié)程 + 多線程/多進(jìn)程 結(jié)合模式呢,那恭喜你,世界都將是你的;
co_yield實(shí)現(xiàn)簡單掛起,簡單的立即放棄運(yùn)行權(quán),返回調(diào)用者,可恢復(fù)(異步應(yīng)用場景相對(duì)較少,多用于循環(huán)生成器);
co_return實(shí)現(xiàn)最后一次簡單掛起,立即放棄運(yùn)行權(quán),返回調(diào)用者,協(xié)程后續(xù)不再可恢復(fù)(應(yīng)用于協(xié)程退出);
4、可等待體(類形如 struct B{ await_ready();await_suspend();await_resume(); } 實(shí)現(xiàn) 三個(gè)await_xxx接口的類B是一個(gè)可等待體定義),他的實(shí)例是一個(gè)可等待對(duì)象;其中await_suspend()在執(zhí)行后(不是執(zhí)行前),會(huì)觸發(fā)當(dāng)前協(xié)程掛起(記住,此處不是可等待對(duì)象掛起,是co_await 此可等待對(duì)象的當(dāng)前協(xié)程掛起,不能混淆,由于概念不清,我在這個(gè)位置耽誤了很久的時(shí)間)
5、協(xié)程管理類A,和可等待體B,他們沒有直接關(guān)系,是兩個(gè)不同的東西。可等待體B控制掛起時(shí)點(diǎn)的行為是局部的,協(xié)程控制A控制協(xié)程整體創(chuàng)建,運(yùn)行,掛起,銷毀,異常捕獲等過程的行為是整體的;協(xié)程只對(duì)應(yīng)有一個(gè)控制類A,但是內(nèi)部可以有多次掛起操作,每次操作對(duì)應(yīng)一個(gè)可等待對(duì)象;
庫開發(fā)
本文重點(diǎn)是庫實(shí)戰(zhàn)開發(fā),關(guān)于協(xié)程框架中的 3大概念:協(xié)程定義類及promise_type{},可等待體awaitable,協(xié)程控制句柄std::coroutine_handle< > ,此處不做介紹,自行了解。
但是要介紹一下協(xié)程調(diào)度的運(yùn)行邏輯,以此加深庫開發(fā)過程的理解。這個(gè)過程在多線程下面是由內(nèi)核管理的我們很少會(huì)了解,但是到了協(xié)程,你還要自己寫庫,那必須自己實(shí)現(xiàn)協(xié)程的調(diào)度算法和event loop模式
在此,我打個(gè)形象比喻:
現(xiàn)在一個(gè)家中有5個(gè)兒子,他們能力各不相同(工作者協(xié)程),還有一個(gè)媽媽(調(diào)度者協(xié)程),現(xiàn)在只有一臺(tái)電腦(單線程時(shí)間片),同一時(shí)刻,這臺(tái)電腦只能被老媽分給其中一個(gè)兒子來使用(協(xié)程搶占),其中一個(gè)兒子首先得到電腦開始工作(協(xié)程恢復(fù)),其他兒子只能等著無法工作(協(xié)程等待狀態(tài)),有電腦的兒子工作一會(huì)后此時(shí)他發(fā)送一封對(duì)外郵件(可等待對(duì)象)但要等待郵件回復(fù)后才能繼續(xù)工作(io等待完成),因?yàn)槠渌舜藭r(shí)還在等著用電腦而自己此時(shí)不具備繼續(xù)工作的條件,所以他識(shí)趣的放棄電腦的使用權(quán),并把電腦交還給老媽(協(xié)程掛起等待,執(zhí)行權(quán)交還caller)并等著老媽下次再把電腦給他使用,老媽拿到電腦后(調(diào)度協(xié)程恢復(fù)執(zhí)行)檢查是否有回復(fù)郵件到來(調(diào)度協(xié)程檢查事件完成,對(duì)應(yīng)事件循環(huán)iocp/epoll),如果有了,老媽檢查這封回復(fù)郵件是回復(fù)給哪個(gè)兒子的,并叫來對(duì)應(yīng)的兒子(協(xié)程調(diào)度),把電腦交給他(協(xié)程恢復(fù)),得到電腦的兒子打開回復(fù)郵件拿到結(jié)果(await_resume() 返回異步io結(jié)果)繼續(xù)工作,…, 不斷循環(huán)。至此,完成一個(gè)協(xié)程完整調(diào)度流程。
要實(shí)現(xiàn)一個(gè)協(xié)程庫,他需要幾個(gè)東西:
1、實(shí)現(xiàn)具體的異步操作的可等待體(類似比喻中的發(fā)郵件操作,定義是否將電腦歸還,獲取回復(fù)后打開查詢結(jié)果等行為);
2、協(xié)程控制類A(他是一個(gè)協(xié)程任務(wù)task),A的promise_type中應(yīng)該記錄協(xié)程的相關(guān)狀態(tài),記錄掛起點(diǎn)的可等待對(duì)象的指針(很重要),可等待對(duì)象也可以充當(dāng)task和調(diào)度協(xié)程,信息交換的媒介,可等待對(duì)象指針通過 await_suspend() 過程傳遞給task的promise做記錄并保存。調(diào)度協(xié)程通過可等待對(duì)象指針在異步操作完成時(shí)將異步操作結(jié)果傳回給等待的task。
3、 如總結(jié)和比喻所說,最重要的,還需要一個(gè)“協(xié)程調(diào)度器”。第一、他有一個(gè)主調(diào)度協(xié)程,調(diào)度協(xié)程具有一系列的調(diào)度算法,他的工作就是監(jiān)測io異步完成事件的到來和分配執(zhí)行權(quán)給task,第二,他維護(hù)有一個(gè)task協(xié)程隊(duì)列(可以多種方法實(shí)現(xiàn)),隊(duì)列記錄著所有的協(xié)程實(shí)例的句柄,這個(gè)隊(duì)列是為了協(xié)程調(diào)度準(zhǔn)備的。
(注:之所以C++20無法直接使用的原因,其實(shí)就是,以上3個(gè)具體的工具沒有現(xiàn)成的庫,由于高度靈活,c++希望使用者自己實(shí)現(xiàn)以上組件,這讓用慣成品庫的我們非常難受,望而卻步,天天喊著等c++23的標(biāo)準(zhǔn)庫,但c++23也不能將所有的需求都囊括,遇到特殊需求還是要自己寫)
實(shí)現(xiàn)思路
調(diào)度器:
1 調(diào)度協(xié)程中的event loop本例是在Windows下采用的iocp模型(linux下可以使用epoll也很好改,原理一樣)
2、調(diào)度算法采用簡單的等權(quán)重調(diào)度,也就是掛入?yún)f(xié)程隊(duì)列的task,輪流調(diào)度,每個(gè)被調(diào)度的task被調(diào)度的機(jī)會(huì)相同;
3、完成事件標(biāo)記和task恢復(fù),業(yè)務(wù)分開,這樣目的是使得通過co_yield簡單掛起的任務(wù)有重新執(zhí)行的機(jī)會(huì)(因?yàn)閏o_yeild不會(huì)在后續(xù)觸發(fā)完成事件)
4、調(diào)度器中記錄著協(xié)程隊(duì)列的開始task和末尾task的handle,以便調(diào)度協(xié)程;
可等待體:
1、文件file異步read,write操作;
2、網(wǎng)絡(luò)套接字,tcp協(xié)議下異步listen,accept, send, recv 操作;
3、網(wǎng)絡(luò)套接字,udp協(xié)議下異步sendto, recvfrom 操作;
4、協(xié)程休眠,實(shí)現(xiàn)sleepFor,sleepForEx操作,分別實(shí)現(xiàn)協(xié)程任務(wù)的毫秒和微秒級(jí)休眠;
5、在iocp模型下以上api都提供了重疊io操作,此時(shí)將api執(zhí)行成功的重疊io操作,將對(duì)應(yīng)的可等待體指針記錄到當(dāng)前協(xié)程變量中(promise_type中的變量),一旦完成事件到來,調(diào)度協(xié)程就會(huì)設(shè)置可等待對(duì)象的完成標(biāo)記狀態(tài)為true,調(diào)度協(xié)程只要在輪詢中逐個(gè)檢查task保存的可等待對(duì)象指針,檢查完成標(biāo)記是否為true,為true恢復(fù)執(zhí)行該協(xié)程,為false則跳過該協(xié)程,繼續(xù)輪詢 event loop;
任務(wù)定義(task協(xié)程):
1、task協(xié)程的promise_type中定義3個(gè)變量,
2、保存當(dāng)前掛起的可等待提指針,如果當(dāng)前協(xié)程不是io掛起或者是沒有掛起,該指針應(yīng)該為null
3、保存當(dāng)前協(xié)程自身所屬調(diào)度器Scheduler的指針;
4、保存此刻協(xié)程隊(duì)列中的前一個(gè)協(xié)程task的handle和后一個(gè)協(xié)程task的handle;
5、若當(dāng)前task的可等待對(duì)象完成標(biāo)記為true,則調(diào)度協(xié)程會(huì)將該task的before task和behind task鏈接,將該task的handle移動(dòng)到協(xié)程隊(duì)列尾部,并且resume task,完成調(diào)度和恢復(fù);
啟動(dòng)協(xié)程調(diào)度:
1、實(shí)例化調(diào)度器 CoScheduler;
2、通過lambda表達(dá)方式定義task協(xié)程,并加入到調(diào)度器的協(xié)程隊(duì)列;
3、通過run方法啟動(dòng)調(diào)度器調(diào)度運(yùn)行各協(xié)程任務(wù);
4、task協(xié)程中又可以動(dòng)態(tài)嵌套生產(chǎn)新的task協(xié)程加入到調(diào)度隊(duì)列;
先看測試效果:(后面會(huì)有源碼)
案例1:tcp 服務(wù)器/客戶端模型測試
除調(diào)度協(xié)程外,協(xié)程隊(duì)列中會(huì)產(chǎn)生4個(gè)task,一個(gè)服務(wù)監(jiān)聽器task,一個(gè)客戶端生成器task,服務(wù)端task,客戶端task
Main coro scheduler: Iocp loop started ... //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064
Tcp server coro started ... //1 監(jiān)聽器task執(zhí)行
Server listener accept wait ... --》 在accept異步掛起
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 (event loop段)
Clients creater coro started. //2 客戶端生成器task執(zhí)行
Clients creater make client task 1. --》 動(dòng)態(tài)生成客戶端task加入隊(duì)列
Clients creater yield run privilege to coro scheduler. --> 通過co_yield返回調(diào)度協(xié)程
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Iocp: New task arrived and first run, tid=26064 --》 調(diào)度新到來的task
Client[1] connect wait ... //3 客戶端task執(zhí)行 在connect異步掛起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行 檢測到connect完成事件
Clients creater fetch run privilege again. //2 客戶端生成器task 執(zhí)行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到accept完成事件
Server listener accept wait ... //1 服務(wù)端監(jiān)聽task執(zhí)行 在accept異步掛起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程 執(zhí)行
Clients creater fetch run privilege again. //2 客戶端生成器task執(zhí)行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=26064 //0 調(diào)度協(xié)程執(zhí)行
Server[1] send wait ... //4 服務(wù)端task執(zhí)行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到send完成事件
Client[1] recv wait ... //3 客戶端task執(zhí)行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服務(wù)端task執(zhí)行 在recv異步掛起
Client[1] recv server msg = //3 客戶端task執(zhí)行
Hello client. this is server 1. 1st response. --》打印服務(wù)端發(fā)來的消息
Client[1] send wait ... --》在send異步掛起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid=26064 //0 調(diào)度協(xié)程執(zhí)行 檢測到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid=26064 --》 檢測到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg = //4 服務(wù)端task執(zhí)行
Helle server, this is client 1: 2st request. -->打印客戶端發(fā)來的消息
Server[1] send wait ...
多個(gè)協(xié)程任務(wù)的異步交替執(zhí)行,就是在一個(gè)協(xié)程遇到 可掛起的異步操作時(shí),比如connect accept send recv等,把運(yùn)行權(quán)限歸還給調(diào)度器,當(dāng)完成事件到來,調(diào)度器又把執(zhí)行權(quán)返回給task,形成執(zhí)行權(quán)在調(diào)度器和task之間反復(fù)橫跳的情況,實(shí)現(xiàn)cpu的多任務(wù)復(fù)用;
案例2:udp 廣播模式測試
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid=31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid=31188
Iocp: New task arrived and first run, tid=31188
Udp server[1] coro started bind port = 33000...
Udp server[1] recvfrom wait ... //服務(wù)端1 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[2] coro started bind port = 33001...
Udp server[2] recvfrom wait ... //服務(wù)端2 異步接收
Iocp: New task arrived and first run, tid=31188
Udp server[3] coro started bind port = 33002...
Udp server[3] recvfrom wait ... //服務(wù)端3 異步接收
Iocp: New task arrived and first run, tid=31188
Broadcastor[1] send wait ... //客戶端1 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[2] send wait ... //客戶端2 異步發(fā)送
Iocp: New task arrived and first run, tid=31188
Broadcastor[3] send wait ... //客戶端3 異步發(fā)送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188 //調(diào)度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188 //調(diào)度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg = //服務(wù)端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Udp server[2] recvfrom wait ... --》 在recvfrom異步掛起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg =
Helle server, this is broadcastor 1: 1st randon broadcast to port=33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid=31188
Broadcastor[2] sendto server msg =
Helle server, this is broadcastor 2: 1st randon broadcast to port=33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid=31188
Broadcastor[3] sendto server msg =
Helle server, this is broadcastor 3: 1st randon broadcast to port=33001.
再看看性能測試:
同樣采用案例1和案例2的模型,但這次tcp采用100個(gè)server/client共200個(gè)task,udp采用20個(gè)braodcast/reciever共40個(gè)task來測試并發(fā)效果,做一下統(tǒng)計(jì);效果如下
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.
按server和client一次完整的send和recv,也就是4此tcp通信,記錄為一次有效通訊記錄,記為1times,
則結(jié)果顯示,在coro=200時(shí)候,單個(gè)線程平均每秒將完成3萬次有效通訊(雖然是自導(dǎo)自演,但是協(xié)程的功能完整實(shí)現(xiàn)了,性能可觀)
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port = 33000...
...
Udp server[20] coro started bind port = 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.
由于udp是單向非鏈接協(xié)議,速度會(huì)比tcp快得多,按一次sendto和recvfrom記為一次有效通訊,則在coro=40時(shí)候,單線程每秒有效通訊18萬次。
最后
c++協(xié)程理解之后并不是很難,并且只要api提供異步方案,都可以實(shí)現(xiàn)協(xié)程庫的封裝,比如mysql,redis等異步操作,后續(xù)都可以依葫蘆畫瓢,很快實(shí)現(xiàn)c++協(xié)程庫的開發(fā)。
本庫開發(fā)只是為記錄c++協(xié)程學(xué)習(xí)的經(jīng)歷,很多功能后續(xù)還需完善。目前支持在windows下的各位file socket sleep的異步操作,后續(xù)可擴(kuò)展支持linux的epoll模型。
代碼
頭文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的測試代碼。
#define __CL_COROUTINE__
#if (defined(__cplusplus) && __cplusplus >= 202002L) || (defined(_HAS_CXX20) && _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif
#if (defined(CLUseCorotine) && CLUseCorotine)
#include
#include
#include
#include "../_cl_common/CLCommon.h"
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精簡windows包含庫的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include "Windows.h"
#include "Winsock2.h"
#include "WS2tcpip.h"
#include "MSWSock.h"
#pragma comment(lib, "ws2_32.lib")
#else
#endif
struct CoScheduler;
//(協(xié)程)任務(wù)單元
struct CoTask {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
CoTask get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() { }
template
std::suspend_always yield_value(const U& val) {
pAwaitableFile = nullptr;
return {};
}
CoScheduler* sc = 0;
handle before = 0, behind = 0;
void* pAwaitableFile = 0;
};
bool resume();
handle hd;
};
//(協(xié)程)任務(wù)調(diào)度器。包含主調(diào)度協(xié)程和事件循環(huán),維護(hù)掛起的(協(xié)程)任務(wù)隊(duì)列
struct CoScheduler {
struct MainCoro {
using return_type = void;
struct promise_type;
using handle = std::coroutine_handle;
struct promise_type {
MainCoro get_return_object() { return { handle::from_promise(*this) }; }
void unhandled_exception() { std::terminate(); }
std::suspend_always initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() { }
CoScheduler* sc = 0;
};
constexpr bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<>) { }
auto await_resume() const { }
handle hd;
};
CoScheduler()
: m_curTid(std::this_thread::get_id())
, m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0))
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
}
bool registe(HANDLE hFile) {
if (!hFile || hFile == INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) != m_hIocp)
return false;
else
return true;
}
bool registe(SOCKET sock) {
return registe((HANDLE)sock);
}
// 創(chuàng)建task并等待后續(xù)調(diào)度執(zhí)行
template
void gather(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);
}
// 創(chuàng)建task并立即調(diào)度執(zhí)行
template
void createTask(F&& func, Args&& ...args) {
if (m_curTid != std::this_thread::get_id())
throw std::logic_error("Scheduler thread is not match.");
CoTask coro = func(std::forward(args)...);
appendNewTaskToEnd(coro);
coro.resume();
}
size_t taskCount() const { return m_taskCount; }
// 執(zhí)行協(xié)程調(diào)度
void run();
private:
void appendNewTaskToEnd(CoTask& cur) {
auto& cprm = cur.hd.promise();
cprm.sc = this;
if (m_end.hd) {
cprm.before = m_end.hd;
cprm.behind = 0;
m_end.hd.promise().behind = cur.hd;
}
m_end.hd = cur.hd;
++m_taskCount;
if (m_begin.hd == 0) {
m_begin.hd = cur.hd;
cprm.before = 0;
}
}
void moveTaskToEnd(CoTask::handle h) {
if (removeDoneTask())
return;
if (!h)
return;
auto& cprm = h.promise();
if (h == m_begin.hd) {
m_begin.hd = cprm.behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
else if (h == m_end.hd) {
}
else {
cprm.behind.promise().before = cprm.before;
cprm.before.promise().behind = cprm.behind;
if (m_end.hd)
m_end.hd.promise().behind = h;
cprm.behind = 0;
cprm.before = m_end.hd;
m_end.hd = h;
}
}
bool removeDoneTask() {
bool ret = false;
while (m_begin.hd && m_begin.hd.done()) {
auto h = m_begin.hd;
m_begin.hd = h.promise().behind;
if (m_begin.hd)
m_begin.hd.promise().before = 0;
h.destroy();
--m_taskCount;
ret = true;
}
return ret;
}
HANDLE m_hIocp;
const std::thread::id m_curTid;
MainCoro m_main;
CoTask m_begin, m_end;
std::atomic m_taskCount = 0;
};
// IO文件操作類型
enum IoFileType :int {
TUnknown = 0,
TRead,
TWrite,
TListen,
TAccept,
TConnect,
TSend,
TRecv,
TSendto,
TRecvfrom,
TSleep,
};
// IO文件調(diào)度優(yōu)先級(jí)
enum IoFilePriority : int {
WaitingForPolling = 0, // 等待順序輪詢調(diào)度
DispatchImmediately, // 立即調(diào)度
};
// 支持異步掛起的可等待文件對(duì)象(基類)
template
struct IoFileAwaitable : OVERLAPPED {
operator HANDLE() const { return m_hFile; }
operator SOCKET() const { return (SOCKET)m_hFile; }
bool isRegisted() const { return m_isRegisted; }
bool isCompleted() const { return m_isCompleted; }
void setCompleted() { m_isCompleted = true; }
void resetCompleted() {
memset(this, 0, sizeof(OVERLAPPED));
m_isCompleted = 0;
}
void setReturn(Ret ret) { m_ret = ret; }
Ret getReturn() const { return m_ret; }
IoFileType& type() { return m_fileType; }
const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;
switch (m_fileType)
{
_TypeNameItem(TUnknown);
_TypeNameItem(TRead);
_TypeNameItem(TWrite);
_TypeNameItem(TListen);
_TypeNameItem(TAccept);
_TypeNameItem(TConnect);
_TypeNameItem(TSend);
_TypeNameItem(TRecv);
_TypeNameItem(TSendto);
_TypeNameItem(TRecvfrom);
_TypeNameItem(TSleep);
default:
return "TUnknown";
}
}
void* getTransferredBytesCountBuffer() const {
return m_transferredBytesCount;
}
void setTransferredBytesCountRecvBuffer(void* countBuf) {
m_transferredBytesCount = countBuf;
}
bool close() {
if (m_hFile) {
return CloseHandle(detach());
}
return true;
}
HANDLE detach() {
HANDLE ret = *this;
m_hFile = 0;
m_isRegisted = 0;
return ret;
}
HANDLE attach(CoScheduler& sc, HANDLE s) {
HANDLE ret = *this;
m_hFile = s;
m_isRegisted = sc.registe(m_hFile);
return ret;
}
int getLastError() const { return m_lastError; }
void setLastError(int err) { m_lastError = err; }
CoTask::handle& onwer() { return m_owner; }
auto getPriority() const { return m_priority; }
void setPriority(IoFilePriority priority) { m_priority = priority; }
// awaitable methed
bool await_ready() const { return isCompleted(); }
void await_suspend(CoTask::handle h) {
h.promise().pAwaitableFile = this;
m_owner = h;
}
Ret await_resume() {
setTransferredBytesCountRecvBuffer(nullptr);
return getReturn();
}
protected:
IoFileAwaitable()
: m_hFile((HANDLE)0)
, m_isRegisted(false)
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, HANDLE hFile)
: m_hFile(hFile)
, m_isRegisted(sc.registe(m_hFile))
{
resetCompleted();
}
IoFileAwaitable(CoScheduler& sc, SOCKET sock)
: m_hFile((HANDLE)sock)
, m_isRegisted(sc.registe(sock))
{
resetCompleted();
}
HANDLE m_hFile;
bool m_isRegisted;
bool m_isCompleted;
IoFileType m_fileType = IoFileType::TUnknown;
void* m_transferredBytesCount = nullptr;
int m_lastError = ERROR_SUCCESS;
IoFilePriority m_priority = IoFilePriority::WaitingForPolling;
CoTask::handle m_owner;
Ret m_ret = 0;
};
// 支持異步掛起的套接字(基類)
template
struct AsyncSocket :public IoFileAwaitable {
using base = IoFileAwaitable;
~AsyncSocket() { close(); }
sockaddr_in localAddress() const { return m_local; }
sockaddr_in remoteAddress() const { return m_remote; }
sockaddr_in* localAddress() { return &m_local; }
sockaddr_in* remoteAddress() { return &m_remote; }
int close() {
int ret = 0;
if (base::m_hFile) {
if (base::m_hFile != (HANDLE)INVALID_SOCKET) {
ret = closesocket(detach());
}
else {
base::m_hFile = 0;
base::m_isRegisted = 0;
}
}
return ret;
}
SOCKET detach() {
return (SOCKET)base::detach();
}
SOCKET attach(CoScheduler& sc, SOCKET s) {
return (SOCKET)base::attach(sc, (HANDLE)s);
}
protected:
AsyncSocket(CoScheduler& sc, SOCKET sock)
:base(sc, sock)
{ }
sockaddr_in m_local = { 0 };
sockaddr_in m_remote = { 0 };
};
struct AsyncAcceptor;
// 支持異步掛起的服務(wù)端監(jiān)聽器,是一個(gè)等待連接到來的TCP監(jiān)聽套接字
struct AsyncListener :public AsyncSocket {
AsyncListener(CoScheduler& sc, unsigned long addr, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = addr;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
AsyncListener(CoScheduler& sc, const char* ip, unsigned short port, int backlog = SOMAXCONN)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
type() = IoFileType::TListen;
m_local.sin_family = AF_INET;
m_local.sin_port = htons(port);
InetPton(AF_INET, ip, &m_local.sin_addr);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener set reuse addr error.");
}
if (SOCKET_ERROR == ::bind(*this, (sockaddr*)&m_local, sizeof(m_local))
|| SOCKET_ERROR == ::listen(*this, backlog)
)
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener bind or listen error.");
}
}
sockaddr_in listenAddress() const { return localAddress(); }
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};
// 支持異步掛起的TCP連接(基類)
struct AsyncTcp :public AsyncSocket {
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);
// 返回值0成功,SOCKET_ERROR失敗
AsyncTcp& recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:
AsyncTcp(CoScheduler& sc)
:AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED))
{ }
};
// 支持異步掛起的服務(wù)端接收器,是一個(gè)接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {
AsyncAcceptor(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TAccept;
}
// 解析到來連接的地址信息,保存在內(nèi)部地址變量
void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {
if (lpAcceptBuffer == 0 || nNumberOfBytesAcceptBuffer == 0)
throw std::logic_error("perseAddress parm is invalid.");
static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs = 0;
if (!lpfnGetAcceptSockAddrs) {
GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
*this,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptexSockAddrs,
sizeof(GuidGetAcceptexSockAddrs),
&lpfnGetAcceptSockAddrs,
sizeof(lpfnGetAcceptSockAddrs),
&dwBytes, NULL, NULL))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncListener GetAcceptexSockAddrs error.");
}
}
int localLen = 0, remoteLen = 0;
lpfnGetAcceptSockAddrs(
lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
(LPSOCKADDR*)localAddress(),
&localLen,
(LPSOCKADDR*)remoteAddress(),
&remoteLen
);
}
// 返回值true成功,false失敗
AsyncAcceptor& accept(AsyncListener& sockListener, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
int await_resume() {
setPriority(IoFilePriority::WaitingForPolling);
return AsyncTcp::await_resume();
}
};
// 支持異步掛起的用戶端連接器,是一個(gè)發(fā)起端TCP套接字
struct AsyncConnector : public AsyncTcp {
AsyncConnector(CoScheduler& sc)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
}
AsyncConnector(CoScheduler& sc, const char* ip, unsigned short port)
: AsyncTcp(sc)
{
type() = IoFileType::TConnect;
setConnectRemoteAddress(ip, port);
bindConnectLocalPort(0);
}
void setConnectRemoteAddress(const char* ip, unsigned short port) {
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_port = htons(port);
InetPton(AF_INET, ip, &m_remote.sin_addr);
}
int bindConnectLocalPort(unsigned short port = 0) {
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.s_addr = INADDR_ANY;
m_local.sin_port = htons(port);
return ::bind(*this, (const sockaddr*)&m_local, sizeof(m_local));
}
// 返回值true成功,false失敗
AsyncConnector& connect(const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
// 返回值true成功,false失敗
AsyncConnector& connect(void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr);
};
// 作為服務(wù)端Acceptor應(yīng)該具有事件完成并立即調(diào)度優(yōu)先級(jí),保證吞吐量
// 返回值true成功,false失敗
inline
AsyncAcceptor&
accept(AsyncListener& sockListener, AsyncAcceptor& sockAccept, void* lpAcceptBuffer
, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {
static LPFN_ACCEPTEX lpfnAcceptEx = 0;
sockListener.type() = IoFileType::TListen;
sockAccept.type() = IoFileType::TAccept;
sockAccept.resetCompleted();
sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);
sockAccept.setPriority(IoFilePriority::DispatchImmediately);//設(shè)置為立即調(diào)度優(yōu)先級(jí)
if (lpNumberOfBytesAccepted)
*lpNumberOfBytesAccepted = 0;
if (!lpfnAcceptEx) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; // GUID,這個(gè)是識(shí)別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockListener,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
lpfnAcceptEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Accept get AcceptEx function address error.");
}
}
bool ret = lpfnAcceptEx(
sockListener,
sockAccept,
(char*)lpAcceptBuffer,
nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) + 16) * 2,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
lpNumberOfBytesAccepted,
&sockAccept
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockAccept.setReturn(ret);
return sockAccept;
}
sockAccept.setReturn(false);
sockAccept.setCompleted();
sockAccept.setPriority(IoFilePriority::WaitingForPolling);
return sockAccept;
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const sockaddr* name, int namelen, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
static LPFN_CONNECTEX lpfnConnectEx = 0;
sockCon.type() = IoFileType::TConnect;
sockCon.resetCompleted();
if (lpdwBytesSent)
*lpdwBytesSent = 0;
if (!lpfnConnectEx) {
GUID GuidConnectEx = WSAID_CONNECTEX; // GUID,這個(gè)是識(shí)別AcceptEx函數(shù)必須的
unsigned long dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
sockCon,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidConnectEx,
sizeof(GuidConnectEx),
&lpfnConnectEx,
sizeof(lpfnConnectEx),
&dwBytes, NULL, NULL))
{
lpfnConnectEx = 0;
throw std::system_error(WSAGetLastError(), std::system_category(), "Connect get ConnectEx function address error.");
}
}
sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);
bool ret = lpfnConnectEx(
sockCon,
name,
namelen,
lpSendBuffer,
dwSendDataLength,
lpdwBytesSent,
&sockCon
);
if (ret == false) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = true;
}
if (ret) {
sockCon.setReturn(ret);
return sockCon;
}
sockCon.setReturn(false);
sockCon.setCompleted();
return sockCon;
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, const char* ip, unsigned short port
, void* lpSendBuffer = nullptr, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
sockCon.setConnectRemoteAddress(ip, port);
sockCon.bindConnectLocalPort(0);
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值true成功,false失敗
inline
AsyncConnector&
connect(AsyncConnector& sockCon, void* lpSendBuffer = nullptr
, unsigned long dwSendDataLength = 0, unsigned long* lpdwBytesSent = nullptr) {
return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),
lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
send(AsyncTcp& sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {
sock.type() = IoFileType::TSend;
sock.resetCompleted();
if (lpNumberOfBytesSend)
*lpNumberOfBytesSend = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);
WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };
auto ret = WSASend(sock, &wsaBuf, 1, lpNumberOfBytesSend, 0, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncTcp&
recv(AsyncTcp& sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {
sock.type() = IoFileType::TRecv;
sock.resetCompleted();
if (lpNumberOfBytesRecv)
*lpNumberOfBytesRecv = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);
WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };
unsigned long dwFlag = 0;
auto ret = WSARecv(sock, &wsaBuf, 1, NULL, &dwFlag, &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 支持異步掛起的UDP(非連接)套接字
struct AsyncUdp : public AsyncSocket {
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int status() const { return m_isBroadCast; }
int* remoteLen() { return &m_remoteLen; }
protected:
//isBroadCast = true則為發(fā)送端udp(client端),使用sendTo,此時(shí)可以在sendTo階段動(dòng)態(tài)指定廣播目的地址
//isBroadCast = false則為接受端udp(server端),使用recvFrom,構(gòu)造時(shí)必須指定綁定的廣播接收地址
AsyncUdp(CoScheduler& sc, bool isBroadCast = true, const char* ip = 0, unsigned short port = 0)
: AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED))
{
setBroadCast(isBroadCast, ip, port);
}
// 設(shè)置失敗返回-1;返回1設(shè)置為廣播模式(client端),返回0則為接收端(server端)
int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {
if (*this && *this != INVALID_SOCKET)
{
m_isBroadCast = isBroadCast;
if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)&m_isBroadCast, sizeof(m_isBroadCast)) == 0) {
if (isBroadCast) {
setBindAddress(0, 0);
setBroadcastAddress(ip, port);
}
else {
setBindAddress(ip, port);
}
return m_isBroadCast;
}
}
return m_isBroadCast = -1;
}
// 設(shè)置接收器綁定的收聽本地地址
bool setBindAddress(const char* ip, unsigned short port)
{
memset(&m_local, 0, sizeof(m_local));
m_local.sin_family = AF_INET;
m_local.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_local.sin_port = htons(port);
char opt = 1;
if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt))) {
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp set reuse address error.");
}
if (::bind(*this, (const sockaddr*)&m_local, sizeof(sockaddr_in)))
{
throw std::system_error(WSAGetLastError(), std::system_category(), "AsyncUdp bind address error.");
}
return true;
}
// 設(shè)置發(fā)送要廣播到的目標(biāo)地址(遠(yuǎn)端地址)
void setBroadcastAddress(const char* ip, unsigned short port)
{
memset(&m_remote, 0, sizeof(m_remote));
m_remote.sin_family = AF_INET;
m_remote.sin_addr.S_un.S_addr = ip ? inet_addr(ip) : INADDR_ANY;
m_remote.sin_port = htons(port);
}
int m_remoteLen = 0;
int m_isBroadCast = -1;
};
// 支持異步掛起的UDP協(xié)議廣播器套接字(發(fā)送端,client端)
struct AsyncBroadcastor :public AsyncUdp {
AsyncBroadcastor(CoScheduler& sc, const char* ip = 0, unsigned short port = 0)
:AsyncUdp(sc, true, ip, port)
{
type() = IoFileType::TSendto;
}
// 發(fā)送端udp(client端)向內(nèi)部已保存的指定的廣播地址發(fā)送數(shù)據(jù)(未設(shè)置廣播地址將失敗)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
// 發(fā)送端udp(client端)向動(dòng)態(tài)指定的廣播地址發(fā)送數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncBroadcastor& sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);
bool isValidBroadcastor() const { return status() == 1; }
using AsyncUdp::setBroadcastAddress;
};
// 支持異步掛起的UDP協(xié)議接收器套接字(接收端,server端)
struct AsyncReceiver :public AsyncUdp {
AsyncReceiver(CoScheduler& sc, const char* ip, unsigned short port)
:AsyncUdp(sc, false, ip, port)
{
type() = IoFileType::TRecvfrom;
}
// 接收端udp(server端)向綁定的本地地址獲取廣播數(shù)據(jù)
// 返回值0成功,SOCKET_ERROR失敗
AsyncReceiver& recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);
bool isValidReceiver() const { return status() == 0; }
using AsyncUdp::setBindAddress;
};
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.type() = IoFileType::TSendto;
sock.resetCompleted();
if (lpNumberOfBytesSent)
*lpNumberOfBytesSent = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);
WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };
auto ret = WSASendTo(sock, &wsaBuf, 1, lpNumberOfBytesSent, 0,
(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncBroadcastor&
sendTo(AsyncBroadcastor& sock, const char* ip, unsigned short port,
const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {
sock.setBroadcastAddress(ip, port);
return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}
// 返回值0成功,SOCKET_ERROR失敗
inline
AsyncReceiver&
recvFrom(AsyncReceiver& sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {
sock.type() = IoFileType::TRecvfrom;
sock.resetCompleted();
if (lpNumberOfBytesRecvd)
*lpNumberOfBytesRecvd = 0;
sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);
WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };
DWORD dwFlag = 0;
*sock.remoteLen() = sizeof(sockaddr_in);
auto ret = WSARecvFrom(sock, &wsaBuf, 1, NULL, &dwFlag,
(sockaddr*)sock.remoteAddress(), sock.remoteLen(), &sock, NULL);
if (ret == SOCKET_ERROR) {
auto lr = WSAGetLastError();
if (lr == WSA_IO_PENDING)
ret = 0;
else
sock.setCompleted();
}
sock.setReturn(ret);
return sock;
}
struct AsyncFile : public IoFileAwaitable {
AsyncFile(CoScheduler& sc, const char* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
AsyncFile(CoScheduler& sc, const wchar_t* filename,
unsigned long dwDesiredAccess,
unsigned long dwShareMode,
LPSECURITY_ATTRIBUTES lpSecurityAttributes,
unsigned long dwCreationDisposition,
unsigned long dwFlagsAndAttributes,
HANDLE hTemplateFile
)
:IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile))
{
}
~AsyncFile() { close(); }
AsyncFile& read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);
AsyncFile& write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};
// 返回值true成功,false失敗
inline
AsyncFile&
read(AsyncFile& file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesRead)
*lpNumberOfBytesRead = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);
auto ret = ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
// 返回值true成功,false失敗
inline
AsyncFile&
write(AsyncFile& file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {
file.type() = IoFileType::TWrite;
file.resetCompleted();
if (lpNumberOfBytesWritten)
*lpNumberOfBytesWritten = 0;
file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);
auto ret = WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, &file);
if (ret == false) {
auto lr = GetLastError();
if (lr == ERROR_IO_PENDING)
ret = true;
else
file.setCompleted();
}
file.setReturn(ret);
return file;
}
struct AsyncSleepor :public IoFileAwaitable {
AsyncSleepor(long long microOrMilliSeconds = 0, bool useMicroSeconds = false)
: microOrMilliSeconds(microOrMilliSeconds)
, useMicroSeconds(useMicroSeconds)
{
type() = IoFileType::TSleep;
start();
}
void start()
{
tp = std::chrono::steady_clock::now();
}
auto getSpendMicroSeconds() const {
constexpr auto div = std::nano::den / std::micro::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
auto getSpendMilliSeconds() const {
constexpr auto div = std::nano::den / std::milli::den;
std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);
return delta.count() / div;
}
bool isCompleted() {
setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());
return (m_isCompleted = getReturn() >= microOrMilliSeconds);
}
protected:
long long microOrMilliSeconds;
bool useMicroSeconds;
std::chrono::steady_clock::time_point tp;
};
//毫秒妙級(jí)別休眠,返回實(shí)際休眠的毫妙數(shù)
inline
AsyncSleepor
sleepFor(long long milliSeconds) {
return AsyncSleepor{ milliSeconds };
}
//微妙級(jí)別休眠,返回實(shí)際休眠的微妙數(shù)
inline
AsyncSleepor
sleepForEx(long long microSeconds) {
return AsyncSleepor{ microSeconds, true };
}
void test_coroutine_tcp_server(unsigned short serverPort = 33100, int totalClientCount = 100, bool dumpTestInfo = 0);
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort = 33000, int totalClientBroadcastCount = 20, bool dumpTestInfo = 0);
#endif
#endif
實(shí)現(xiàn)文件
CLCoroutine.cpp
#if (defined(CLUseCorotine) && CLUseCorotine)
#include "../_cl_common/CLCommon.h"
#include "../_cl_string/CLString.h"
#include "../_cl_logger/CLLogger.h"
void CoScheduler::run() {
auto coro = [this]() ->MainCoro {
//logger.debug("nMain coro scheduler started ...");
#ifdef _WIN32
if (m_hIocp) {
CLString err;
DWORD dwMilliseconds = 0;
//logger.debug("nMain coro scheduler: Iocp loop started ...");
while (1) {
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED* pOverlapped = 0;
while (GetQueuedCompletionStatus(m_hIocp, &numberOfBytesTransferred, &completionKey, &pOverlapped, dwMilliseconds))
{
if (pOverlapped) { //io完成事件
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(ERROR_SUCCESS);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = numberOfBytesTransferred;
// 根據(jù)可等待對(duì)象的優(yōu)先級(jí),決定是否立即調(diào)度或是輪流調(diào)度讓每個(gè)任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else { //新task來到,立即調(diào)度
if (numberOfBytesTransferred == 0 && completionKey) {
auto h = CoTask::handle::from_address((void*)completionKey);
moveTaskToEnd(h);
h.resume();
}
else {
auto lr = GetLastError();
logger.warning("Iocp: get status in event loop: ",err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
}
}
auto lr = GetLastError();
if (lr == WSA_WAIT_TIMEOUT) {
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
m_end.resume(); //執(zhí)行resume,此刻所有等待io均未完成不會(huì)執(zhí)行,但yeild讓渡的協(xié)程得到執(zhí)行;
}
else if(pOverlapped) {
auto pFile = (IoFileAwaitable<>*)pOverlapped;
pFile->setCompleted();
pFile->setLastError(lr);
auto saveBuf = (DWORD*)pFile->getTransferredBytesCountBuffer();
if (saveBuf) *saveBuf = 0;
IoFileType fileType = pFile->type();
switch (fileType)
{
case TUnknown:
break;
case TRead:
case TWrite:
case TListen:
case TAccept:
case TConnect:
pFile->setReturn(false);
break;
case TSend:
case TRecv:
case TSendto:
case TRecvfrom:
pFile->setReturn(SOCKET_ERROR);
break;
case TSleep:
break;
default:
break;
}
switch (lr)
{
case ERROR_NETNAME_DELETED: //64 指定的網(wǎng)絡(luò)名不再可用
break;
case ERROR_SEM_TIMEOUT://121 信號(hào)燈超時(shí)
break;
default:
logger.error("Iocp: get status out event loop: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
break;
}
// 根據(jù)可等待對(duì)象的優(yōu)先級(jí),決定是否立即調(diào)度或是輪流調(diào)度讓每個(gè)任務(wù)的權(quán)重相同
switch (pFile->getPriority())
{
case IoFilePriority::DispatchImmediately:
moveTaskToEnd(pFile->onwer()); //立即調(diào)度
break;
default:
moveTaskToEnd(m_begin.hd); //輪詢調(diào)度
break;
}
m_end.resume();
}
else {
logger.error("Iocp: get status out event loop no completed: ", err.getLastErrorString(lr));
CLString().getLastErrorMessageBoxExceptSucceed(lr);
}
if (taskCount() == 0)
break;
}
CloseHandle(m_hIocp);
m_hIocp = 0;
//logger.debug("nMain coro scheduler: Iocp loop has done ...");
}
#endif
//logger.debug("nMain coro scheduler quit ...");
co_return;
};
m_main = coro();
m_main.hd.promise().sc = this;
m_main.hd.resume();
m_main.hd.destroy();
}
bool CoTask::resume() {
if (!hd)
return true;
else if (hd.done()) {
return false;
}
else {
auto pFile = (IoFileAwaitable<>*) hd.promise().pAwaitableFile;
if (!pFile) //第一次調(diào)度或者yield的協(xié)程
hd.resume();
else {
if (pFile->type() == IoFileType::TSleep) { //休眠調(diào)度
if (((AsyncSleepor*)pFile)->isCompleted()) {
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
else if (pFile->isCompleted()) { //io完成調(diào)度
hd.promise().pAwaitableFile = nullptr;
hd.resume();
}
}
return true;
}
}
#ifdef _WIN32
#else // Windows
#endif // Linux
AsyncAcceptor& AsyncListener::accept(AsyncAcceptor& sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncAcceptor& AsyncAcceptor::accept(AsyncListener& sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{
return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}
AsyncConnector& AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncConnector& AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{
return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}
AsyncTcp& AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{
return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}
AsyncTcp& AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{
return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncBroadcastor& AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{
return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}
AsyncReceiver& AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{
return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}
AsyncFile& AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{
return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}
AsyncFile& AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{
return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}
#include
void test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
CoScheduler sc;
int servRun = 0;
int totals = 0;
CLTick tk;
// 服務(wù)端監(jiān)聽器task
sc.gather([&]()->CoTask {
logger.info("nTcp server coro started ...");
AsyncListener listener(sc, ADDR_ANY, serverPort);
// loop accept
std::vector acceptbuf(260);
AsyncAcceptor* pAcceptor = 0;
int servId = 0;
while (true)
{
AsyncAcceptor& acceptor = pAcceptor ? *pAcceptor : *(pAcceptor = new AsyncAcceptor(sc));
DWORD nValidAccept;
logger.debug("nServer listener accept wait ...");
bool ret = co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), &nValidAccept);
if (ret) {
//create server task
acceptor.perseAddress(acceptbuf.data(), acceptbuf.size());
servRun++;
// 服務(wù)端task
sc.gather([&](AsyncAcceptor* pAcceptor, int idx) ->CoTask {
AsyncAcceptor& acp = *pAcceptor;
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::sprintf(bufSend.data(), "nHello client. this is server %d. %dst response.", idx, total);
logger.debug("nServer[%d] send wait ...", idx);
int ret = co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
logger.debug("nServer[%d] recv wait ...", idx);
ret = co_await acp.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (nbytesRecv == 0)
break;
logger.debug("nServer[%d] recv client msg = %s", idx, bufRecv.data());
total++;
totals++;
}
logger.debug("nServer[%d] recv client close msg", idx);
delete pAcceptor;
servRun--;
}, pAcceptor, ++servId);
pAcceptor = 0;
}
}
logger.info("nTcp server coro quit.%d", GetCurrentThreadId());
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientCount) {
i++;
logger.info("nClients creater make client task %d.", i);
nClient++;
// 客戶端task
sc.gather([&](int idx)->CoTask {
AsyncConnector con(sc);
logger.debug("nClient[%d] connect wait ...", idx);
auto ret = co_await con.connect("127.0.0.1", serverPort);
if (!ret) {
logger.debug("nClinet[%d] connect server fail, %s", idx, CLString().getLastErrorString(GetLastError()));
co_return;
}
std::vector bufSend(260), bufRecv(260);
DWORD nbytesSend, nbytesRecv;
int total = 1;
while (1) {
std::snprintf(bufSend.data(), bufSend.size(), "nHelle server, this is client %d: %dst request.", idx, total);
logger.debug("nClient[%d] send wait ...", idx);
auto ret = co_await con.send(bufSend.data(), std::strlen(bufSend.data()) + 1, &nbytesSend);
if (!(ret == SOCKET_ERROR || nbytesSend == 0)) {
logger.debug("nClient[%d] recv wait ...", idx);
ret = co_await con.recv(bufRecv.data(), bufRecv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0)
break;
logger.debug("nClient[%d] recv server msg = %s", idx, bufRecv.data());
}
total++;
}
logger.debug("nClient[%d] get server close msg and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計(jì)協(xié)程
sc.gather([&]()->CoTask {
auto last = totals;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000);
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totals, time, (totals - last) / (time - lastTime));
last = totals;
lastTime = time;
}
});
sc.run();
}
void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{
logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);
logger.openHeadInfoFlag(false);
srand(time(0));
CoScheduler sc;
int servRun = 0;
int totalsRecvd = 0;
int totalsSendto = 0;
CLTick tk;
std::vector portList(totalClientBroadcastCount);
for (int i = 0; i < totalClientBroadcastCount; i++)portList[i] = broadCastPort + i;
// 服務(wù)端生成器
sc.gather([&]()->CoTask {
logger.info("nServers creater coro started.");
int nServer = 0;
for (int i = 0; 1; )
{
if (nServer < totalClientBroadcastCount) {
i++;
logger.info("nServers creater make server task %d.", i);
nServer++;
// 服務(wù)端task (廣播接收端)
sc.gather([&](int i)->CoTask {
logger.info("nUdp server[%d] coro started bind port = %d...", i, portList[i - 1]);
AsyncReceiver serv(sc, "127.0.0.1", portList[i - 1]);
// recv
std::vector recv(260);
int servId = 0;
int total = 1;
while (true)
{
DWORD nbytesRecv;
logger.debug("nUdp server[%d] recvfrom wait ...", i);
int ret = co_await serv.recvFrom(recv.data(), recv.size(), &nbytesRecv);
if (ret == SOCKET_ERROR || nbytesRecv == 0) {
CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());
break;
}
logger.debug("nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg = %s", i, total, nbytesRecv, recv.data());
total++;
totalsRecvd++;
}
logger.info("nUdp server[%d] coro quit.%d", i, GetCurrentThreadId());
nServer--;
}, i);
}
else {
logger.debug("nServers creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nServers creater fetch run privilege again.");
}
}
logger.debug("nServers creater coro quit.");
});
// 客戶端生成器
sc.gather([&]()->CoTask {
logger.info("nClients creater coro started.");
int nClient = 0;
for (int i = 0; 1; )
{
if (nClient < totalClientBroadcastCount) {
i++;
logger.info("nClients creater make broadcastor client task %d.", i);
nClient++;
// 客戶端task (廣播發(fā)送端)
sc.gather([&](int idx)->CoTask {
AsyncBroadcastor broadcast(sc);
std::vector bufSent(260);
DWORD nbytesSent;
int total = 1;
while (1) {
auto randPort = portList[rand() % totalClientBroadcastCount];
std::snprintf(bufSent.data(), bufSent.size(),
"nHelle server, this is broadcastor %d: %dst randon broadcast to port=%d."
, idx, total, randPort);
logger.debug("nBroadcastor[%d] send wait ...", idx);
auto ret = co_await broadcast.sendTo("127.0.0.1", randPort,
bufSent.data(), std::strlen(bufSent.data()) + 1, &nbytesSent);
if (ret == SOCKET_ERROR || nbytesSent == 0) {
break;
}
logger.debug("nBroadcastor[%d] sendto server msg = %s", idx, bufSent.data());
total++;
totalsSendto++;
}
logger.debug("nBroadcastor[%d] send 0 bytes and shutdown.", idx);
nClient--;
}, i);
}
else {
logger.debug("nClients creater yield run privilege to coro scheduler.");
co_yield 1;
logger.debug("nClients creater fetch run privilege again.");
}
}
logger.debug("nClients creater coro quit.");
});
// 統(tǒng)計(jì)協(xié)程
sc.gather([&]()->CoTask {
auto last = totalsRecvd + totalsSendto;
auto lastTime = tk.getSpendTime();
while (1)
{
co_await sleepFor(3000); // 協(xié)程休眠3000毫秒
auto time = tk.getSpendTime();
logger.info("nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.",
sc.taskCount(), totalsRecvd + totalsSendto, time, (totalsRecvd + totalsSendto - last) / (time - lastTime));
last = totalsRecvd + totalsSendto;
lastTime = time;
}
});
sc.run();
}
#endif
-
函數(shù)
+關(guān)注
關(guān)注
3文章
4344瀏覽量
62861 -
C++
+關(guān)注
關(guān)注
22文章
2114瀏覽量
73775 -
源代碼
+關(guān)注
關(guān)注
96文章
2946瀏覽量
66843 -
生成器
+關(guān)注
關(guān)注
7文章
319瀏覽量
21074
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論