1. 前言
工作隊(duì)列(workqueue)的Linux內(nèi)核中的定義的用來(lái)處理不是很緊急事件的回調(diào)方式處理方法.
以下代碼的linux內(nèi)核版本為2.6.19.2, 源代碼文件主要為kernel/workqueue.c.
2. 數(shù)據(jù)結(jié)構(gòu)
/* include/linux/workqueue.h */
// 工作節(jié)點(diǎn)結(jié)構(gòu)
struct work_struct {
// 等待時(shí)間
unsigned long pending;
// 鏈表節(jié)點(diǎn)
struct list_head entry;
// workqueue回調(diào)函數(shù)
void (*func)(void *);
// 回調(diào)函數(shù)func的數(shù)據(jù)
void *data;
// 指向CPU相關(guān)數(shù)據(jù), 一般指向struct cpu_workqueue_struct結(jié)構(gòu)
void *wq_data;
// 定時(shí)器
struct timer_list timer;
};
struct execute_work {
struct work_struct work;
};
/* kernel/workqueue.c */
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until all currently-scheduled works are completed, but it doesn't
* want to be livelocked by new, incoming ones. So it waits until
* remove_sequence is >= the insert_sequence which pertained when
* flush_scheduled_work() was called.
*/
// 這個(gè)結(jié)構(gòu)是針對(duì)每個(gè)CPU的
struct cpu_workqueue_struct {
// 結(jié)構(gòu)鎖
spinlock_t lock;
// 下一個(gè)要執(zhí)行的節(jié)點(diǎn)序號(hào)
long remove_sequence; /* Least-recently added (next to run) */
// 下一個(gè)要插入節(jié)點(diǎn)的序號(hào)
long insert_sequence; /* Next to add */
// 工作機(jī)構(gòu)鏈表節(jié)點(diǎn)
struct list_head worklist;
// 要進(jìn)行處理的等待隊(duì)列
wait_queue_head_t more_work;
// 處理完的等待隊(duì)列
wait_queue_head_t work_done;
// 工作隊(duì)列節(jié)點(diǎn)
struct workqueue_struct *wq;
// 進(jìn)程指針
struct task_struct *thread;
int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
// 工作隊(duì)列結(jié)構(gòu)
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
const char *name;
struct list_head list; /* Empty if single thread */
};
kernel/workqueue.c中定義了一個(gè)工作隊(duì)列鏈表, 所有工作隊(duì)列可以?huà)旖拥竭@個(gè)鏈表中:
static LIST_HEAD(workqueues);
3. 一些宏定義
/* include/linux/workqueue.h */
// 初始化工作隊(duì)列
#define __WORK_INITIALIZER(n, f, d) {
// 初始化list
.entry = { &(n).entry, &(n).entry },
// 回調(diào)函數(shù)
.func = (f),
// 回調(diào)函數(shù)參數(shù)
.data = (d),
// 初始化定時(shí)器
.timer = TIMER_INITIALIZER(NULL, 0, 0),
}
// 聲明工作隊(duì)列并初始化
#define DECLARE_WORK(n, f, d)
struct work_struct n = __WORK_INITIALIZER(n, f, d)
/*
* initialize a work-struct's func and data pointers:
*/
// 重新定義工作結(jié)構(gòu)參數(shù)
#define PREPARE_WORK(_work, _func, _data)
do {
(_work)->func = _func;
(_work)->data = _data;
} while (0)
/*
* initialize all of a work-struct:
*/
// 初始化工作結(jié)構(gòu), 和__WORK_INITIALIZER功能相同,不過(guò)__WORK_INITIALIZER用在
// 參數(shù)初始化定義, 而該宏用在程序之中對(duì)工作結(jié)構(gòu)賦值
#define INIT_WORK(_work, _func, _data)
do {
INIT_LIST_HEAD(&(_work)->entry);
(_work)->pending = 0;
PREPARE_WORK((_work), (_func), (_data));
init_timer(&(_work)->timer);
} while (0)
4. 操作函數(shù)
4.1 創(chuàng)建工作隊(duì)列
一般的創(chuàng)建函數(shù)是create_workqueue, 但這其實(shí)只是一個(gè)宏:
/* include/linux/workqueue.h */
#define create_workqueue(name) __create_workqueue((name), 0)
在workqueue的初始化函數(shù)中, 定義了一個(gè)針對(duì)內(nèi)核中所有線(xiàn)程可用的事件工作隊(duì)列, 其他內(nèi)核線(xiàn)程建立的事件工作結(jié)構(gòu)就都掛接到該隊(duì)列:
void init_workqueues(void)
{
...
keventd_wq = create_workqueue("events");
...
}
核心創(chuàng)建函數(shù)是__create_workqueue:
struct workqueue_struct *__create_workqueue(const char *name,
int singlethread)
{
int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
// 分配工作隊(duì)列結(jié)構(gòu)空間
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
// 為每個(gè)CPU分配單獨(dú)的工作隊(duì)列空間
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
mutex_lock(&workqueue_mutex);
if (singlethread) {
// 使用create_workqueue宏時(shí)該參數(shù)始終為0
// 如果是單一線(xiàn)程模式, 在單線(xiàn)程中調(diào)用各個(gè)工作隊(duì)列
// 建立一個(gè)的工作隊(duì)列內(nèi)核線(xiàn)程
INIT_LIST_HEAD(&wq->list);
// 建立工作隊(duì)列的線(xiàn)程
p = create_workqueue_thread(wq, singlethread_cpu);
if (!p)
destroy = 1;
else
// 喚醒該線(xiàn)程
wake_up_process(p);
} else {
// 鏈表模式, 將工作隊(duì)列添加到工作隊(duì)列鏈表
list_add(&wq->list, &workqueues);
// 為每個(gè)CPU建立一個(gè)工作隊(duì)列線(xiàn)程
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
// 綁定CPU
kthread_bind(p, cpu);
// 喚醒線(xiàn)程
wake_up_process(p);
} else
destroy = 1;
}
}
mutex_unlock(&workqueue_mutex);
/*
* Was there any error during startup? If yes then clean up:
*/
if (destroy) {
// 建立線(xiàn)程失敗, 釋放工作隊(duì)列
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
EXPORT_SYMBOL_GPL(__create_workqueue);
// 創(chuàng)建工作隊(duì)列線(xiàn)程
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{
// 每個(gè)CPU的工作隊(duì)列
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
spin_lock_init(&cwq->lock);
// 初始化
cwq->wq = wq;
cwq->thread = NULL;
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
INIT_LIST_HEAD(&cwq->worklist);
// 初始化等待隊(duì)列more_work, 該隊(duì)列處理要執(zhí)行的工作結(jié)構(gòu)
init_waitqueue_head(&cwq->more_work);
// 初始化等待隊(duì)列work_done, 該隊(duì)列處理執(zhí)行完的工作結(jié)構(gòu)
init_waitqueue_head(&cwq->work_done);
// 建立內(nèi)核線(xiàn)程work_thread
if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);
else
p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p))
return NULL;
// 保存線(xiàn)程指針
cwq->thread = p;
return p;
}
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
// 聲明一個(gè)等待隊(duì)列
DECLARE_WAITQUEUE(wait, current);
// 信號(hào)
struct k_sigaction sa;
sigset_t blocked;
current->flags |= PF_NOFREEZE;
// 降低進(jìn)程優(yōu)先級(jí), 工作進(jìn)程不是個(gè)很緊急的進(jìn)程,不和其他進(jìn)程搶占CPU,通常在系統(tǒng)空閑時(shí)運(yùn)行
set_user_nice(current, -5);
/* Block and flush all signals */
// 阻塞所有信號(hào)
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
/*
* We inherited MPOL_INTERLEAVE from the booting kernel.
* Set MPOL_DEFAULT to insure node local allocations.
*/
numa_default_policy();
/* SIG_IGN makes children autoreap: see do_notify_parent(). */
// 信號(hào)處理都是忽略
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
// 進(jìn)程可中斷
set_current_state(TASK_INTERRUPTIBLE);
// 進(jìn)入循環(huán), 沒(méi)明確停止該進(jìn)程就一直運(yùn)行
while (!kthread_should_stop()) {
// 設(shè)置more_work等待隊(duì)列, 當(dāng)有新work結(jié)構(gòu)鏈入隊(duì)列中時(shí)會(huì)激發(fā)此等待隊(duì)列
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))
// 工作隊(duì)列為空, 睡眠
schedule();
else
// 進(jìn)行運(yùn)行狀態(tài)
__set_current_state(TASK_RUNNING);
// 刪除等待隊(duì)列
remove_wait_queue(&cwq->more_work, &wait);
// 按鏈表遍歷執(zhí)行工作任務(wù)
if (!list_empty(&cwq->worklist))
run_workqueue(cwq);
// 執(zhí)行完工作, 設(shè)置進(jìn)程是可中斷的, 重新循環(huán)等待工作
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
}
// 運(yùn)行工作結(jié)構(gòu)
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
unsigned long flags;
/*
* Keep taking off work from the queue until
* done.
*/
// 加鎖
spin_lock_irqsave(&cwq->lock, flags);
// 統(tǒng)計(jì)已經(jīng)遞歸調(diào)用了多少次了
cwq->run_depth++;
if (cwq->run_depth > 3) {
// 遞歸調(diào)用此時(shí)太多
/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %dn",
__FUNCTION__, cwq->run_depth);
dump_stack();
}
// 遍歷工作鏈表
while (!list_empty(&cwq->worklist)) {
// 獲取的是next節(jié)點(diǎn)的
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
void (*f) (void *) = work->func;
void *data = work->data;
// 刪除節(jié)點(diǎn), 同時(shí)節(jié)點(diǎn)中的list參數(shù)清空
list_del_init(cwq->worklist.next);
// 解鎖
// 現(xiàn)在在執(zhí)行以下代碼時(shí)可以中斷,run_workqueue本身可能會(huì)重新被調(diào)用, 所以要判斷遞歸深度
spin_unlock_irqrestore(&cwq->lock, flags);
BUG_ON(work->wq_data != cwq);
// 工作結(jié)構(gòu)已經(jīng)不在鏈表中
clear_bit(0, &work->pending);
// 執(zhí)行工作函數(shù)
f(data);
// 重新加鎖
spin_lock_irqsave(&cwq->lock, flags);
// 執(zhí)行完的工作序列號(hào)遞增
cwq->remove_sequence++;
// 喚醒工作完成等待隊(duì)列, 供釋放工作隊(duì)列
wake_up(&cwq->work_done);
}
// 減少遞歸深度
cwq->run_depth--;
// 解鎖
spin_unlock_irqrestore(&cwq->lock, flags);
}
4.2 釋放工作隊(duì)列
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*
* Safely destroy a workqueue. All work currently pending will be done first.
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
int cpu;
// 清除當(dāng)前工作隊(duì)列中的所有工作
flush_workqueue(wq);
/* We don't need the distraction of CPUs appearing and vanishing. */
mutex_lock(&workqueue_mutex);
// 結(jié)束該工作隊(duì)列的線(xiàn)程
if (is_single_threaded(wq))
cleanup_workqueue_thread(wq, singlethread_cpu);
else {
for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu);
list_del(&wq->list);
}
mutex_unlock(&workqueue_mutex);
// 釋放工作隊(duì)列中對(duì)應(yīng)每個(gè)CPU的工作隊(duì)列數(shù)據(jù)
free_percpu(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
/**
* flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
*
* Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers.
*
* This function will sample each workqueue's current insert_sequence number and
* will sleep until the head sequence is greater than or equal to that. This
* means that we sleep until all works which were queued on entry have been
* handled, but we are not livelocked by new incoming ones.
*
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
// 該進(jìn)程可以睡眠
might_sleep();
// 清空每個(gè)CPU上的工作隊(duì)列
if (is_single_threaded(wq)) {
/* Always use first cpu's area. */
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
} else {
int cpu;
mutex_lock(&workqueue_mutex);
for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
mutex_unlock(&workqueue_mutex);
}
}
EXPORT_SYMBOL_GPL(flush_workqueue);
flush_workqueue的核心處理函數(shù)為flush_cpu_workqueue:
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
if (cwq->thread == current) {
// 如果是工作隊(duì)列進(jìn)程正在被調(diào)度
/*
* Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking.
*/
// 執(zhí)行完該工作隊(duì)列
run_workqueue(cwq);
} else {
// 定義等待
DEFINE_WAIT(wait);
long sequence_needed;
// 加鎖
spin_lock_irq(&cwq->lock);
// 最新工作結(jié)構(gòu)序號(hào)
sequence_needed = cwq->insert_sequence;
// 該條件是判斷隊(duì)列中是否還有沒(méi)有執(zhí)行的工作結(jié)構(gòu)
while (sequence_needed - cwq->remove_sequence > 0) {
// 有為執(zhí)行的工作結(jié)構(gòu)
// 通過(guò)work_done等待隊(duì)列等待
prepare_to_wait(&cwq->work_done, &wait,
TASK_UNINTERRUPTIBLE);
// 解鎖
spin_unlock_irq(&cwq->lock);
// 睡眠, 由wake_up(&cwq->work_done)來(lái)喚醒
schedule();
// 重新加鎖
spin_lock_irq(&cwq->lock);
}
// 等待清除
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
}
}
4.3 調(diào)度工作
在大多數(shù)情況下, 并不需要自己建立工作隊(duì)列,而是只定義工作, 將工作結(jié)構(gòu)掛接到內(nèi)核預(yù)定義的事件工作隊(duì)列中調(diào)度, 在kernel/workqueue.c中定義了一個(gè)靜態(tài)全局量的工作隊(duì)列keventd_wq:
static struct workqueue_struct *keventd_wq;
4.3.1 立即調(diào)度
// 在其他函數(shù)中使用以下函數(shù)來(lái)調(diào)度工作結(jié)構(gòu), 是把工作結(jié)構(gòu)掛接到工作隊(duì)列中進(jìn)行調(diào)度
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* This puts a job in the kernel-global workqueue.
*/
// 調(diào)度工作結(jié)構(gòu), 將工作結(jié)構(gòu)添加到事件工作隊(duì)列keventd_wq
int fastcall schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work);
/**
* queue_work - queue work on a workqueue
* @wq: workqueue to use
* @work: work to queue
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*
* We queue the work to the CPU it was submitted, but there is no
* guarantee that it will be processed by that CPU.
*/
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0, cpu = get_cpu();
if (!test_and_set_bit(0, &work->pending)) {
// 工作結(jié)構(gòu)還沒(méi)在隊(duì)列, 設(shè)置pending標(biāo)志表示把工作結(jié)構(gòu)掛接到隊(duì)列中
if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;
BUG_ON(!list_empty(&work->entry));
// 進(jìn)行具體的排隊(duì)
__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
ret = 1;
}
put_cpu();
return ret;
}
EXPORT_SYMBOL_GPL(queue_work);
/* Preempt must be disabled. */
// 不能被搶占
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
{
unsigned long flags;
// 加鎖
spin_lock_irqsave(&cwq->lock, flags);
// 指向CPU工作隊(duì)列
work->wq_data = cwq;
// 掛接到工作鏈表
list_add_tail(&work->entry, &cwq->worklist);
// 遞增插入的序列號(hào)
cwq->insert_sequence++;
// 喚醒等待隊(duì)列準(zhǔn)備處理工作結(jié)構(gòu)
wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags);
}
4.3.2 延遲調(diào)度
4.3.2.1 schedule_delayed_work
/**
* schedule_delayed_work - put work task in global workqueue after delay
* @work: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue.
*/
// 延遲調(diào)度工作, 延遲一定時(shí)間后再將工作結(jié)構(gòu)掛接到工作隊(duì)列
int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
{
return queue_delayed_work(keventd_wq, work, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
/**
* queue_delayed_work - queue work on a workqueue after delay
* @wq: workqueue to use
* @work: work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int fastcall queue_delayed_work(struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{
int ret = 0;
// 定時(shí)器, 此時(shí)的定時(shí)器應(yīng)該是不起效的, 延遲將通過(guò)該定時(shí)器來(lái)實(shí)現(xiàn)
struct timer_list *timer = &work->timer;
if (!test_and_set_bit(0, &work->pending)) {
// 工作結(jié)構(gòu)還沒(méi)在隊(duì)列, 設(shè)置pending標(biāo)志表示把工作結(jié)構(gòu)掛接到隊(duì)列中
// 如果現(xiàn)在定時(shí)器已經(jīng)起效, 出錯(cuò)
BUG_ON(timer_pending(timer));
// 工作結(jié)構(gòu)已經(jīng)掛接到鏈表, 出錯(cuò)
BUG_ON(!list_empty(&work->entry));
/* This stores wq for the moment, for the timer_fn */
// 保存工作隊(duì)列的指針
work->wq_data = wq;
// 定時(shí)器初始化
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
// 定時(shí)函數(shù)
timer->function = delayed_work_timer_fn;
// 定時(shí)器生效, 定時(shí)到期后再添加到工作隊(duì)列
add_timer(timer);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_delayed_work);
// 定時(shí)中斷函數(shù)
static void delayed_work_timer_fn(unsigned long __data)
{
struct work_struct *work = (struct work_struct *)__data;
struct workqueue_struct *wq = work->wq_data;
// 獲取CPU
int cpu = smp_processor_id();
if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;
// 將工作結(jié)構(gòu)添加到工作隊(duì)列,注意這是在時(shí)間中斷調(diào)用
__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
}
4.3.2.2 schedule_delayed_work_on
指定CPU的延遲調(diào)度工作結(jié)構(gòu), 和schedule_delayed_work相比增加了一個(gè)CPU參數(shù), 其他都相同
/**
* schedule_delayed_work_on - queue work in global workqueue on CPU after delay
* @cpu: cpu to use
* @work: job to be done
* @delay: number of jiffies to wait
*
* After waiting for a given time this puts a job in the kernel-global
* workqueue on the specified CPU.
*/
int schedule_delayed_work_on(int cpu,
struct work_struct *work, unsigned long delay)
{
return queue_delayed_work_on(cpu, keventd_wq, work, delay);
}
/**
* queue_delayed_work_on - queue work on specific CPU after delay
* @cpu: CPU number to execute work on
* @wq: workqueue to use
* @work: work to queue
* @delay: number of jiffies to wait before queueing
*
* Returns 0 if @work was already on a queue, non-zero otherwise.
*/
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{
int ret = 0;
struct timer_list *timer = &work->timer;
if (!test_and_set_bit(0, &work->pending)) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
/* This stores wq for the moment, for the timer_fn */
work->wq_data = wq;
timer->expires = jiffies + delay;
timer->data = (unsigned long)work;
timer->function = delayed_work_timer_fn;
add_timer_on(timer, cpu);
ret = 1;
}
return ret;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
5. 結(jié)論
工作隊(duì)列和定時(shí)器函數(shù)處理有點(diǎn)類(lèi)似, 都是執(zhí)行一定的回調(diào)函數(shù), 但和定時(shí)器處理函數(shù)不同的是定時(shí)器回調(diào)函數(shù)只執(zhí)行一次, 而且執(zhí)行定時(shí)器回調(diào)函數(shù)的時(shí)候是在時(shí)鐘中斷中, 限制比較多, 因此回調(diào)程序不能太復(fù)雜; 而工作隊(duì)列是通過(guò)內(nèi)核線(xiàn)程實(shí)現(xiàn), 一直有效, 可重復(fù)執(zhí)行, 由于執(zhí)行時(shí)降低了線(xiàn)程的優(yōu)先級(jí), 執(zhí)行時(shí)可能休眠, 因此工作隊(duì)列處理的應(yīng)該是那些不是很緊急的任務(wù), 如垃圾回收處理等, 通常在系統(tǒng)空閑時(shí)執(zhí)行,在xfrm庫(kù)中就廣泛使用了workqueue,使用時(shí),只需要定義work結(jié)構(gòu),然后調(diào)用schedule_(delayed_)work即可。
評(píng)論
查看更多