1 什么是協程
協程是與其他函數或方法一起并發運行的函數或方法。Go協程可以看作是輕量級線程,與線程相比,創建一個 Go 協程的成本很小。
1.1 協程與線程的對比
協程的成本極低。堆棧大小只有若干KB(2或4KB),并且可以根據應用的需求進行增減。而線程必須指定堆棧的大小,其堆棧是固定不變的(一般默認2MB)。固定了棧的大小導致兩個問題:
一是對于很多只需要很小的??臻g的線程來說是一個巨大的浪費
二是對于少數需要巨大??臻g的線程來說又面臨棧溢出的風險
協程會復用(Multiplex)數量更少的 OS 線程。即使程序有數以千計的協程,也可能只有一個線程。
如果該線程中的某一Go協程發生了阻塞(比如說等待用戶輸入),那么系統會再創建一個OS線程,并把其余協程都移動到這個新的OS線程。所有這一切都在運行時進行,作為程序員,我們沒有直接面臨這些復雜的細節,而是有一個簡潔的 API 來處理并發。
Go內置半搶占式的協作調度器,在用戶態進行協程的調度。
Go協程使用信道(Channel)來進行通信。信道用于防止多個協程訪問共享內存時發生競態條件(Race Condition)。信道可以看作是協程之間通信的管道。
1.2 啟動協程
調用函數或者方法時,在前面加上關鍵字go,可以讓一個新的Go協程并發地運行。需要注意:
啟動一個新的協程時,協程的調用會立即返回。程序控制不會去等待Go協程執行完畢。在調用Go協程之后,程序控制會立即返回到代碼的下一行,忽略該協程的任何返回值。
如果希望運行其他Go協程,Go 主協程必須繼續運行著。如果Go主協程終止,則程序終止,于是其他協程也不會繼續運行。
使用示例如下:
package?main import?(?? ????"fmt" ????"time" ) func?numbers()?{?? ????for?i?:=?1;?i?<=?5;?i++?{ ????????time.Sleep(250?*?time.Millisecond) ????????fmt.Printf("%d?",?i) ????} } func?alphabets()?{?? ????for?i?:=?'a';?i?<=?'e';?i++?{ ????????time.Sleep(400?*?time.Millisecond) ????????fmt.Printf("%c?",?i) ????} } func?main()?{?? ????go?numbers()?//啟動協程 ????go?alphabets()?//啟動協程 ????//等待子協程允許完畢,后面介紹更高級的信道方式,這里就簡單的等待 ????time.Sleep(3000?*?time.Millisecond) ????fmt.Println("main?terminated") } //輸出:1?a?2?3?b?4?c?5?d?e?main?terminated
下圖可以清晰的看到三個協程的運行關系:
2 信道
2.1 信道的創建
信道可以想像成協程之間通信的管道。如同管道中的水會從一端流到另一端,通過使用信道,數據也可以從一端發送,在另一端接收。所有信道都關聯了一個類型。信道只能運輸這種類型的數據,而運輸其他類型的數據都是非法的。chan T表示T類型的信道,使用make函數進行初始化。例如:
a?:=?make(chan?int)
2.2 信道的收發
信道旁的箭頭方向指定了是發送數據還是接收數據
data?:=?<-?a?//?讀取信道a,保存值到data a?<-?data?//?寫入信道a
發送與接收默認是阻塞的。當把數據發送到信道時,程序控制會在發送數據的語句處發生阻塞,直到有其它協程從信道讀取到數據,才會解除阻塞。與此類似,當讀取信道的數據時,如果沒有其它的協程把數據寫入到這個信道,那么讀取過程就會一直阻塞著。**信道的這種特性能夠幫助Go協程之間進行高效的通信,不需要用到其他編程語言常見的顯式鎖或條件變量。 借助阻塞這個特性,我們可以用一個讀操作等待子協程結束,而不是使用sleep:
func?hello(done?chan?bool)?{?? ????fmt.Println("Hello?world?goroutine") ????done?<-?true//子協程結束,寫入數據 } func?main()?{?? ????done?:=?make(chan?bool)//創建bool信道 ????go?hello(done) ????<-done?//讀操作,一直阻塞直到子協程結束 ????fmt.Println("main?function") }
2.3 小心死鎖
使用信道需要考慮的一個重點是死鎖。
當Go協程給一個信道發送數據時,照理說會有其他Go協程來接收數據。如果沒有的話,程序就會在運行時觸發 panic,形成死鎖。
當有Go協程等著從一個信道接收數據時,我們期望其他的Go協程會向該信道寫入數據,要不然程序就會觸發 panic。
2.4 關閉信道和range遍歷
數據發送方可以關閉信道,通知接收方這個信道不再有數據發送過來。當從信道接收數據時,接收方可以多用一個變量來檢查信道是否已經關閉。
func?producer(chnl?chan?int)?{??
????for?i?:=?0;?i?10;?i++?{ ????????chnl?<-?i ????} ????close(chnl)//關閉信道 } func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?{ ????????v,?ok?:=?<-ch?//判斷信道是否關閉 ????????if?ok?==?false?{ ????????????break ????????} ????????fmt.Println("Received?",?v,?ok) ????} }
上面的語句里,如果成功接收信道所發送的數據,那么 ok 等于 true。而如果 ok 等于 false,說明我們試圖讀取一個關閉的通道。從關閉的信道讀取到的值會是該信道類型的零值。 或者我們可以用range遍歷信道,代替上面示例中的for循環:
func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?v?:=?range?ch?{//range可以在信道關閉后自動結束,不用顯示的判斷 ????????fmt.Println("Received?",v) ????} }
2.5 緩沖信道
上面無緩沖信道的發送和接收過程是阻塞的,讀寫操作會一直阻塞。我們還可以創建一個有緩沖的信道(Buffered Channel)。只在緩沖已滿的情況,才會阻塞向緩沖信道發送數據。同樣,只有在緩沖為空的時候,才會阻塞從緩沖信道接收數據。 通過向 make 函數時再傳遞一個表示容量的參數(指定緩沖的大小,sizeof(type) * capacity),就可以創建緩沖信道。
ch?:=?make(chan?type,?capacity)//capacity?應該大于?0。無緩沖信道的容量默認為?0
緩沖區容量和長度的區別:
容量是指信道可以存儲的值的數量(總的大?。?。我們在使用make函數創建緩沖信道的時候會指定容量大小。
長度是指信道中當前排隊的元素個數(當前保存的大小)。
使用示例如下:
func?write(ch?chan?int)?{??
????for?i?:=?0;?i?5;?i++?{ ????????ch?<-?i?//寫入兩個值之后緩沖區滿,阻塞等待緩沖區空閑 ????????fmt.Println("successfully?wrote",?i,?"to?ch") ????} ????close(ch) } func?main()?{?? ????ch?:=?make(chan?int,?2)//緩沖大小為2 ????go?write(ch) ????time.Sleep(2?*?time.Second) ????for?v?:=?range?ch?{ ????????fmt.Println("read?value",?v,"from?ch") ????????time.Sleep(2?*?time.Second) ????} }
2.6 select
select 語句用于在多個發送/接收信道操作中進行選擇。該語法與 switch 類似,所不同的是,這里的每個 case 語句都是信道操作。
select 語句會一直阻塞,直到發送/接收操作準備就緒。如果有多個信道操作準備完畢,select 會隨機地選取其中之一執行。
在沒有case準備就緒時,可以執行select語句中的默認情況(Default Case),這通常用于防止select語句一直阻塞,沒有信道可用時會立刻返回。
使用示例:
func?server1(ch?chan?string)?{?? ????time.Sleep(6?*?time.Second) ????ch?<-?"from?server1" } func?server2(ch?chan?string)?{?? ????time.Sleep(3?*?time.Second) ????ch?<-?"from?server2" } func?main()?{?? ????output1?:=?make(chan?string) ????output2?:=?make(chan?string) ????go?server1(output1) ????go?server2(output2) ????select?{//一直阻塞,直到某個信道可用 ????case?s1?:=?<-output1: ????????fmt.Println(s1) ????case?s2?:=?<-output2: ????????fmt.Println(s2) ????} }
3 WaitGroup
3.1 WaitGroup的使用
WaitGroup可以用來等待一批go協程執行結束,類似于C++的std::join。使用示例如下:
import?( ????"fmt" ????"sync" ????"time" ) func?process(i?int,?wg?*sync.WaitGroup)?{//waitgroup參數指針,因為要修改內部的值,不能是值傳遞 ????fmt.Println("started?Goroutine?",?i) ????time.Sleep(2?*?time.Second) ????fmt.Printf("Goroutine?%d?ended ",?i) ????wg.Done()//子協程結束,調用done減少計數器 } func?main()?{ ????no?:=?3 ????var?wg?sync.WaitGroup?//定義waitgroup ????for?i?:=?0;?i?3.2 實現一個協程池
基本思路:
創建一個Go協程池,監聽一個等待作業分配的輸入型緩沖信道
將作業添加到該輸入型緩沖信道中
作業完成后,再將結果寫入一個輸出型緩沖信道
從輸出型緩沖信道讀取并打印結果
代碼和解析如下:
package?main import?(?? ????"fmt" ????"math/rand" ????"sync" ????"time" ) //定義任務和結果兩個結構體 type?Job?struct?{?? ????id???????int ????randomno?int } type?Result?struct?{?? ????job?????????Job?//包含job結構體 ????sumofdigits?int } //創建任務和結果的兩個緩沖信道 var?jobs?=?make(chan?Job,?10)?? var?results?=?make(chan?Result,?10) //計算一個整數每一位相加的和 func?digits(number?int)?int?{?? ????sum?:=?0 ????no?:=?number ????for?no?!=?0?{ ????????digit?:=?no?%?10 ????????sum?+=?digit ????????no?/=?10 ????} ????time.Sleep(2?*?time.Second) ????return?sum } //遍歷job信道,計算后每個job的數字并將結果寫入reslut信道 func?worker(wg?*sync.WaitGroup)?{?? ????for?job?:=?range?jobs?{ ????????output?:=?Result{job,?digits(job.randomno)} ????????results?<-?output ????} ????wg.Done() } //初始化waitgroup,并開啟多個協程開始計算 func?createWorkerPool(noOfWorkers?int)?{?? ????var?wg?sync.WaitGroup ????for?i?:=?0;?i?4 協程的同步手段4.1 互斥與Mutex
Mutex用于提供一種加鎖機制(Locking Mechanism),可確保在某時刻只有一個協程在臨界區運行,以防止出現競態條件。Mutex可以在sync包內找到。Mutex 定義了兩個方法:Lock和Unlock。所有在 Lock 和 Unlock 之間的代碼,都只能由一個Go協程執行,于是就可以避免競態條件。
mutex.Lock() x?=?x?+?1?? mutex.Unlock()使用示例:
//互斥鎖保證線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?struct?{?//全局的結構體變量 ?sync.Mutex?//互斥鎖 ?value??????int } func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?for?i?:=?0;?i?<=?100;?i++?{ ??total.Lock()?//加鎖 ??total.value++ ??total.Unlock()?//解鎖 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(total.value) }4.2 原子操作
用互斥鎖來保護一個數值型的共享資源,麻煩且效率低下。標準庫的sync/atomic包對原子操作提供了豐富的支持:sync/atomic包對基本的數值類型及復雜對象的讀寫都提供了原子操作的支持。atomic.Value原子對象提供了Load和Store兩個原子方法,分別用于加載和保存數據,返回值和參數都是interface{}類型。
//原子操作實現線程同步 package?main import?( ?"fmt" ?"sync" ?"sync/atomic" ) var?total?uint64 func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??atomic.AddUint64(&total,?1)?//原子操作,線程安全的 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(atomic.LoadUint64(&total))?//讀取值 }4.3 阻塞信道
上面的示例我們也可以用信道來實現互斥(還是推薦實際中使用Mutex),使用大小為1的緩沖信道可以導致可寫阻塞,這樣其他協程就不能繼續執行,只能等待阻塞結束。在并發編程中,對共享資源的正確訪問需要精確的控制,在目前的絕大多數語言中,都是通過加鎖等線程同步方案來解決這一困難問題,而Go語言卻另辟蹊徑,它將共享的值通過Channel傳遞(實際上多個獨立執行的線程很少主動共享資源)。在任意給定的時刻,最好只有一個Goroutine能夠擁有該資源。
//使用channel實現線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?uint64 func?worker(wg?*sync.WaitGroup,?ch?chan?bool)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??ch?<-?true?//信道被寫入值,其他協程到這一句也想寫入值,就會阻塞等待信道可寫 ??total++ ??<-ch?//本協程讀取信道,信道空了,其他協程可以寫入了 ?} } func?main()?{ ?ch?:=?make(chan?bool,?1)?//?創建大小為1的chan ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg,?ch) ?go?worker(&wg,?ch) ?wg.Wait() ?fmt.Println(total)?//讀取值 }不僅如此,我們還可以通過設置chan的緩存大小來控制最大并發數。5 常見并發模型
5.1 生產者消費者模型
通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。簡單地說,就是生產者生產一些數據,然后放到成果隊列中,同時消費者從成果隊列中來取這些數據。這樣就讓生產消費變成了異步的兩個過程。當成果隊列中沒有數據時,消費者就進入饑餓的等待中;而當成果隊列中數據已滿時,生產者則面臨因產品擠壓導致CPU被剝奪的下崗問題。 Go可以使用帶緩沖區的chan作為成功隊列,由不同的協程負責接入和讀取,很簡單的實現生產者消費者模型:
package?main import?( ?"fmt" ?"os" ?"os/signal" ?"syscall" ) //?生產者:?生成?factor?整數倍的序列 func?Producer(factor?int,?out?chan<-?int)?{ ?for?i?:=?0;?;?i++?{ ??out?<-?i?*?factor?//往信道緩沖區內寫入數據 ?} } //?消費者 func?Consumer(in?<-chan?int)?{ ?for?v?:=?range?in?{ ??fmt.Println(v)?//從信道讀取數據打印 ?} } func?main()?{ ?ch?:=?make(chan?int,?64)?//?成果隊列,大小為64 ?//開啟了2個Producer生產流水線,分別用于生成3和5的倍數的序列 ?//然后開啟1個Consumer消費者線程,打印獲取的結果 ?go?Producer(3,?ch)?//?生成?3?的倍數的序列 ?go?Producer(5,?ch)?//?生成?5?的倍數的序列 ?go?Consumer(ch)????//?消費?生成的隊列 ?//?Ctrl+C?退出 ?sig?:=?make(chan?os.Signal,?1) ?signal.Notify(sig,?syscall.SIGINT,?syscall.SIGTERM) ?fmt.Printf("quit?(%v) ",?<-sig) }5.2 發布訂閱模型
發布訂閱(publish/subscribe)模型通常被簡寫為pub/sub模型。在這個模型中,消息生產者成為發布者(publisher),而消息消費者則成為訂閱者(subscriber),生產者和消費者是M:N的關系。在傳統生產者和消費者模型中,是將消息發送到一個隊列中,而發布訂閱模型則是將消息發布給一個主題。在發布訂閱模型中,每條消息都會傳送給多個訂閱者。發布者通常不會知道、也不關心哪一個訂閱者正在接收主題消息。訂閱者和發布者可以在運行時動態添加,是一種松散的耦合關系,這使得系統的復雜性可以隨時間的推移而增長。在現實生活中,像天氣預報之類的應用就可以應用這個并發模式。 示例代碼如下:
//?發布訂閱模型實現 package?pubsub import?( ?"sync" ?"time" ) type?( ?subscriber?chan?interface{}?????????//?訂閱者為一個管道 ?topicFunc??func(v?interface{})?bool?//?主題為一個過濾器 ) //?發布者對象 type?Publisher?struct?{ ?m???????????sync.RWMutex?????????????//?讀寫鎖,保護訂閱者map ?buffer??????int??????????????????????//?訂閱隊列的緩存大小 ?timeout?????time.Duration????????????//?發布超時時間 ?subscribers?map[subscriber]topicFunc?//?訂閱者信息 } //?構建一個發布者對象,?可以設置發布超時時間和緩存隊列的長度 func?NewPublisher(publishTimeout?time.Duration,?buffer?int)?*Publisher?{ ?return?&Publisher{?//返回對象指針 ??buffer:??????buffer, ??timeout:?????publishTimeout, ??subscribers:?make(map[subscriber]topicFunc),?//創建訂閱者map ?} } //?添加一個新的訂閱者,訂閱全部主題 func?(p?*Publisher)?Subscribe()?chan?interface{}?{ ?return?p.SubscribeTopic(nil) } //?添加一個新的訂閱者,訂閱過濾器篩選后的主題 func?(p?*Publisher)?SubscribeTopic(topic?topicFunc)?chan?interface{}?{ ?ch?:=?make(chan?interface{},?p.buffer) ?p.m.Lock() ?p.subscribers[ch]?=?topic ?p.m.Unlock() ?return?ch } //?退出訂閱 func?(p?*Publisher)?Evict(sub?chan?interface{})?{ ?p.m.Lock() ?defer?p.m.Unlock()?//函數退出時解鎖 ?delete(p.subscribers,?sub)?//根據key刪除map中一項 ?close(sub)?????????????????//關閉chan } //?發布一個主題 func?(p?*Publisher)?Publish(v?interface{})?{ ?p.m.RLock() ?defer?p.m.RUnlock() ?var?wg?sync.WaitGroup ?for?sub,?topic?:=?range?p.subscribers?{ ??wg.Add(1) ??go?p.sendTopic(sub,?topic,?v,?&wg) ?} ?wg.Wait() } //?關閉發布者對象,同時關閉所有的訂閱者管道。 func?(p?*Publisher)?Close()?{ ?p.m.Lock() ?defer?p.m.Unlock() ?for?sub?:=?range?p.subscribers?{ ??delete(p.subscribers,?sub) ??close(sub) ?} } //?發送主題,可以容忍一定的超時 func?(p?*Publisher)?sendTopic(sub?subscriber,?topic?topicFunc,?v?interface{},?wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?if?topic?!=?nil?&&?!topic(v)?{ ??return ?} ?//監聽sub?chan寫入成功或超時 ?select?{ ?case?sub?<-?v: ?case?<-time.After(p.timeout): ?} }我們可以選擇訂閱全部,或指定自定義函數只訂閱符合要求的消息,返回chan對象:
all?:=?p.Subscribe()?//添加一個訂閱者,訂閱全部消息 //添加一個訂閱者,只關系有golang字符串的內容 golang?:=?p.SubscribeTopic(func(v?interface{})?bool?{ ?if?s,?ok?:=?v.(string);?ok?{ ??return?strings.Contains(s,?"golang") ?} ?return?false }) 編輯:黃飛?
?
?
評論
查看更多