需求:第三方的接口,限制接口請求的QPS,每秒5次
需要控制job「訪問接口」的次數,每秒不能同時超過5次,包括 進行中的任務、剛啟動的任務
要確保單位時間內(例如每秒)運行的任務數量不超過特定的上限(如5個任務),并且在任務執行完成得很快時,考慮已完成的任務和正在執行的任務作為正在運行的任務總數,可以使用限流器來控制任務的啟動頻率,并結合使用信號量來管理同時運行的任務數量。
具體來說,使用一個信號量來限制同時進行的任務數量,并且在任務完成時,僅在下一秒鐘允許新的任務開始,以確保即使某些任務快速完成,也不會在同一秒鐘內啟動超過限制數量的任務
package main import ( "context" "fmt" "math/rand" "sync" "sync/atomic" "time" "golang.org/x/time/rate" ) func RateLimit() { const maxJobsPerSecond = 5 const numJobs = 22 var wg sync.WaitGroup // 計數器 var runningJobs int32 // 當前正在執行的任務數量 var startedJobs int32 // 啟動后的任務數量 var finishedJobs int32 // 剛完成的任務數量 limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond) semaphore := make(chan struct{}, maxJobsPerSecond) for i := 1; i <= numJobs; i++ { wg.Add(1) go func(jobID int) { defer wg.Done() limiter.Wait(context.Background()) // 等待限流器允許進行下一個任務 semaphore <- struct{}{} // 獲取信號量 atomic.AddInt32(&startedJobs, 1) atomic.AddInt32(&runningJobs, 1) executeJob(jobID) // 執行任務 atomic.AddInt32(&finishedJobs, 1) atomic.AddInt32(&runningJobs, -1) <-time.After(time.Second) // 等待一秒鐘后釋放信號量 <-semaphore // 打印當前狀態 printStatus(&runningJobs, &startedJobs, &finishedJobs) }(i) } wg.Wait() fmt.Println("所有工作完成") }
注意事項
限流器rate.NewLimiter用于控制任務啟動的頻率,以確保每秒不超過maxJobsPerSecond個任務開始執行。
使用信號量semaphore來控制同時進行的任務數量。
為了確保在任何一秒內同時進行的任務數量不超過限制,在任務完成后等待一秒鐘,然后再釋放信號量。這樣做可以保證即使任務很快完成,也不會立即啟動新的任務。
這種實現方式確保了即使任務執行得很快,每秒鐘啟動的新任務數量也不會超過限制,并且同時考慮了正在執行和剛剛完成的任務。
動態創建協程
協程的啟動是動態的。在代碼中,每個任務對應于一個動態創建的協程。這些協程是在循環中根據任務數量(numJobs)動態生成的。
具體來說,每當有一個新的任務需要執行時,都會創建一個新的協程來處理這個任務。這是通過在main函數的循環中調用go關鍵字實現的。這個過程在每次循環迭代中發生,從而為每個任務動態創建一個新的協程
由于使用了限流器(rate.Limiter),這些協程不是一次性全部創建,而是根據限流器允許的速率逐個創建。每個協程在開始執行任務之前會等待限流器的許可,以此確保每秒啟動的任務數量不超過設定的最大值
func executeJob(jobID int) { startTime := time.Now() // 記錄任務開始時間 // 模擬任務執行時間 fmt.Printf("%v Job %d started ",time.Now().Format("2006-01-02 1505.000"), jobID) // 初始化隨機數種子 rand.Seed(time.Now().UnixNano()) // 隨機生成一個時間間隔(例如,1到5000毫秒之間) min := 1 max := 5000 duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond time.Sleep(duration) durationCost := time.Since(startTime) // 計算任務耗時 fmt.Printf("%v Job %d finished Cost:%v ", time.Now().Format("2006-01-02 1505.000"),jobID, durationCost) } func printStatus(runningJobs, startedJobs, finishedJobs *int32) { fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d ", atomic.LoadInt32(runningJobs), atomic.LoadInt32(startedJobs), atomic.LoadInt32(finishedJobs)) }
可以在代碼中添加額外的邏輯來跟蹤和打印正在執行、進行中、剛啟動和剛完成的任務數量。使用原子操作(來自sync/atomic包)來確保在并發環境下對這些計數器的操作是安全的。
在這個示例中:
使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計數器的值。
在每個任務開始時,增加startedJobs和runningJobs計數器。
在每個任務完成時,增加finishedJobs計數器,并減少runningJobs計數器。
在任務完成后和釋放信號量前,打印當前的任務狀態。
注意事項
這種方法可以幫助我們跟蹤不同狀態下的任務數量。
使用原子操作確保在并發環境中對計數器的讀寫是安全的。
printStatus函數在每個任務的結束時被調用,以打印當前的任務狀態
鏈接:https://juejin.cn/post/7315314479204696079
審核編輯:劉清
-
限流器
+關注
關注
0文章
41瀏覽量
14509 -
QPS
+關注
關注
0文章
24瀏覽量
8820
原文標題:Golang根據job數量動態控制每秒協程的最大創建數量方法
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論