今天,想聊聊workflow這個開源項目。
關于workflow,我之前特意寫過一篇文章【推薦學習這個C++開源項目】。
今天還是想再啰嗦啰嗦,因為自己這一年也在帶團隊從0到1做項目,需要負責整個項目的架構設計、接口設計、模塊劃分等工作。
做了一段時間后再回過頭復盤一下,深知架構設計、接口設計的重要性,也感受到了架構設計的困難程度,編碼和設計相比,真的容易的多了。
然后自己又回頭來研究了一下workflow,想著學習下其他項目的設計理念,隨著自己研究的越來越深入,越來越感覺它的高端,對外暴露特別簡單的接口卻能完成非常復雜的功能。
上篇文章是基礎篇,主要向大家普及一下workflow的特點和作用,感興趣的朋友可以移步到那里哈。
本篇文章是進階篇,主要就是想介紹下workflow的任務模型,其他的框架一般只能處理普通的網絡通信,而workflow卻特別適用于通信與計算關系很復雜的應用。其實我最感興趣的是它的內存管理機制,下面也會詳細介紹。
優秀的系統設計
在作者的設計理念中:程序 = 協議 + 算法 + 任務流。
**協議:**就是指通用的網絡協議,比如http、redis等,當然還可以自定義網絡協議,這里只需要提供序列化和反序列化函數就可以達到想要的效果。
算法: workflow提供了一些通用的算法,比如sort、merge、reduce等,當然還可以自定義算法,用過C++ STL的朋友應該都知道如何自定義算法吧,在workflow中,任何復雜的計算都應該包裝成算法。
**任務流:**我認為這應該就是整個系統設計的核心,通過任務流來抽象封裝實際的業務邏輯,就是把開發好的協議和算法組成一個任務流程圖,然后調度執行這個圖。
任務流
這里多聊聊任務流,在workflow中,一切業務邏輯皆是任務,多個任務會組成任務流,任務流可組成圖,這個圖可能是串聯圖,可能是并聯圖,也有可能是串并聯圖,類似于這種:
也可能是這種復雜的DAG圖:
圖的層次結構可以由用戶自定義,框架也是支持動態地創建任務流。
引用作者的一句話:
如果把業務邏輯想象成用設計好的電子元件搭建電路,那么每個電子元件內部可能又是一個復雜電路。
workflow對任務做了很好的抽象和封裝。整個系統包含6種基礎任務:通訊、文件IO、CPU、定時器、計數器。
workflow提供了任務工廠,所有的任務都由任務工廠產生,并且會自動回收。
大多數情況下,通過任務工廠創建的任務都是一個復合任務,但用戶并不感知。例如一個http請求,可能包含很多次異步過程(DNS,重定向),內部有很多復雜的任務,但對用戶來講,這就是一次簡單的通信任務。
哪有什么歲月靜好,只不過是有人替你負重前行。workflow的一大特點就是接口暴露的特別簡潔,非常方便用戶的接入,外部接入如此簡單,肯定是將很多組合的邏輯都放在了內部,但其實workflow項目內部代碼結構層次非常簡潔清晰,感興趣的朋友可以自己研究研究哈。
內存管理機制
還有就是項目的內存管理機制,workflow整個項目都遵循著誰申請誰釋放的原則,內部申請的內存不需要外部顯式釋放,內部會自動回收內存。
而且整個項目都沒有使用shared_ptr,那多個對象共同使用同一塊裸內存,workflow是怎么處理的呢?
內部為這種需要共享的對象各自維護一個引用計數,手動incref和decref,至于為什么要這樣做,可以看看workflow美女架構師的回答【https://www.zhihu.com/question/33084543/answer/2209929271】。
我總結了一下:
- shared_ptr是非侵入式指針,一層包一層,而且為了保持shared_ptr覆蓋對象整個生命周期,每次傳遞時都必須帶著智能指針模板,使用具有傳染性,而且也比較麻煩。
- shared_ptr引用計數的內存區域和數據區域不一致,不連續,緩存失效可能導致性能問題,盡管有make_shared,但還是容易用錯。
- 手動為對象做incref和decref,使用起來更靈活,可以明確在引用計數為固定數字時做一些自定義操作,而且方便調試。因為手動管理對象的引用計數,就會要求開發者明晰對象的生命周期,明確什么時候該使用對象,什么時候該釋放對象。
- 如果使用shared_ptr可能會激起開發者的惰性,反正也不需要管理內存啦,就無腦使用shared_ptr唄,最后出現問題時調試起來也比較困難。
那再深入源碼中研究研究,看看workflow是如何做到把對象指針給到外部后,內部還自動回收的。
拿WFClientTask舉例說明一下,workflow中所有的Task都是通過Factory創建:
static WFHttpTask *create_http_task(const std::string& url,
int redirect_max,
int retry_max,
http_callback_t callback);
using WFHttpTask = WFNetworkTask;
template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask {};
注意,create參數里有一個callback,workflow一定會執行這個callback,然后內部回收掉該WFClientTask占用的內存,任何任務的生命周期都是從創建到callback函數結束。
它是怎么做到的?繼續看下WFClientTask的繼承層次結構:
template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask {};
template<class REQ, class RESP>
class WFNetworkTask : public CommRequest {};
class CommRequest : public SubTask, public CommSession {};
class SubTask {
public:
virtual void dispatch() = 0;
private:
virtual SubTask *done() = 0;
protected:
void subtask_done();
};
WFClientTask繼承于WFNetworkTask,WFNetworkTask又繼承于SubTask。
SubTask內部有subtask_done()方法,看下它的實現:
void SubTask::subtask_done() {
SubTask *cur = this;
ParallelTask *parent;
SubTask **entry;
while (1) {
parent = cur->parent;
entry = cur->entry;
cur = cur->done();
if (cur) {
cur->parent = parent;
cur->entry = entry;
if (parent)
*entry = cur;
cur->dispatch();
}
else if (parent) {
if (__sync_sub_and_fetch(&parent->nleft, 1) == 0) {
cur = parent;
continue;
}
}
break;
}
}
subtask_done()方法中會調用它的done()方法,然而這幾個方法都是virtual方法,看看WFClientTask是怎么重寫它們的:
template <class REQ, class RESP>
class WFClientTask : public WFNetworkTask<REQ, RESP> {
protected:
virtual SubTask *done() {
SeriesWork *series = series_of(this);
if (this->state == WFT_STATE_SYS_ERROR && this->error < 0) {
this->state = WFT_STATE_SSL_ERROR;
this->error = -this->error;
}
if (this->callback)
this->callback(this);
delete this;
return series->pop();
}
};
子類重寫了done()方法,可以看到在它的實現里,觸發了callback,然后調用了delete this,釋放掉了當前占用的這塊內存。
那誰調用的done(),可以看下上面的代碼,subtask_done()會觸發done(),那誰觸發的subtask_done():
void CommRequest::dispatch() {
if (this->scheduler->request(this, this->object, this->wait_timeout,
&this->target) < 0) {
this->state = CS_STATE_ERROR;
this->error = errno;
if (errno != ETIMEDOUT)
this->timeout_reason = TOR_NOT_TIMEOUT;
else
this->timeout_reason = TOR_WAIT_TIMEOUT;
this->subtask_done();
}
}
可以看到,dispatch()里觸發了subtask_done(),那誰觸發的dispatch():
template<class REQ, class RESP>
class WFNetworkTask : public CommRequest {
public:
/* start(), dismiss() are for client tasks only. */
void start() {
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
};
inline void
Workflow::start_series_work(SubTask *first, SubTask *last,
series_callback_t callback) {
SeriesWork *series = new SeriesWork(first, std::move(callback));
series->set_last_task(last);
first->dispatch();
}
這里可以看到,Task的start()方法觸發start_series_work(),進而觸發dispatch()方法。
總結一下:
● 步驟一
通過工廠方法創建WFClientTask,同時設置callback;
● 步驟二
外部調用start()方法,start()中調用Workflow::start_series_work()方法;
● 步驟三
start_series_work()中調用SubTask的dispatch()方法,這個dispatch()方法由SubTask的子類CommRequest(WFClientTask的父類)實現;
● 步驟四
dispatch()方法在異步操作結束后會觸發subtask_done()方法;
● 步驟五
subtask_done()方法內會觸發done()方法;
● 步驟六
done()方法內會觸發callback,然后delete this;
● 步驟七
內存釋放完成。
其實這塊可以猜到,想要銷毀自己的內存,基本上也就delete this這個方法。
然而我認為這中間調用的思想和過程才是我們需要重點關注和學習的,遠不止我上面描述的這么簡單,感興趣的朋友可以自己去研究研究哈。
關于workflow還有很多優點,這里就不一一列舉啦,它的源碼也值得我們CPP開發者學習和進階,具體可以看我之前的文章。
發現workflow團隊對這個項目相當重視,還特意建了個QQ交流群(群號碼是618773193),對此項目有任何問題都可以在這個群里探討,也方便了我們學習,真的不錯。項目地址如下:https://github.com/sogou/workflow,也可以點擊閱讀原文直達。
在訪問GitHub遇到困難時,可使用他們的Gitee官方倉庫:https://gitee.com/sogou/workflow。
項目也特別實用,據說搜狗內外很多團隊都在使用workflow。感覺這個項目值得學習的話就給人家個star,不要白嫖哈,對項目團隊來說也是一種認可和鼓勵。
-
框架
+關注
關注
0文章
403瀏覽量
17527 -
網絡通信
+關注
關注
4文章
814瀏覽量
29894 -
workflows
+關注
關注
0文章
6瀏覽量
5935
發布評論請先 登錄
相關推薦
評論