色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

normal worker_pool詳細(xì)的創(chuàng)建過(guò)程代碼分析

Linux閱碼場(chǎng) ? 來(lái)源:未知 ? 作者:李倩 ? 2018-04-08 14:35 ? 次閱讀

由于內(nèi)核的workqueue變遷一直在發(fā)生,而一般的內(nèi)核書(shū)又比較老,跟不上時(shí)代。

Workqueue 是內(nèi)核里面很重要的一個(gè)機(jī)制,特別是內(nèi)核驅(qū)動(dòng),一般的小型任務(wù) (work) 都不會(huì)自己起一個(gè)線程來(lái)處理,而是扔到 Workqueue 中處理。Workqueue 的主要工作就是用進(jìn)程上下文來(lái)處理內(nèi)核中大量的小任務(wù)。

本文的代碼分析基于 Linux kernel 3.18.22,最好的學(xué)習(xí)方法還是 “read the fucking source code”

1.CMWQ 的幾個(gè)基本概念

關(guān)于 workqueue 中幾個(gè)概念都是 work 相關(guān)的數(shù)據(jù)結(jié)構(gòu)非常容易混淆,大概可以這樣來(lái)理解:

work :工作。

workqueue :工作的集合。workqueue 和 work 是一對(duì)多的關(guān)系。

worker :工人。在代碼中 worker 對(duì)應(yīng)一個(gè)work_thread()內(nèi)核線程。

worker_pool:工人的集合。worker_pool 和 worker 是一對(duì)多的關(guān)系。

pwq(pool_workqueue):中間人 / 中介,負(fù)責(zé)建立起 workqueue 和 worker_pool 之間的關(guān)系。workqueue 和 pwq 是一對(duì)多的關(guān)系,pwq 和 worker_pool 是一對(duì)一的關(guān)系。

最終的目的還是把 work( 工作 ) 傳遞給 worker( 工人 ) 去執(zhí)行,中間的數(shù)據(jù)結(jié)構(gòu)和各種關(guān)系目的是把這件事組織的更加清晰高效。

1.1 worker_pool

每個(gè)執(zhí)行 work 的線程叫做 worker,一組 worker 的集合叫做 worker_pool。CMWQ 的精髓就在 worker_pool 里面 worker 的動(dòng)態(tài)增減管理上manage_workers()。

CMWQ 對(duì) worker_pool 分成兩類:

normal worker_pool,給通用的 workqueue 使用;

unbound worker_pool,給 WQ_UNBOUND 類型的的 workqueue 使用;

1.1.1 normal worker_pool

默認(rèn) work 是在 normal worker_pool 中處理的。系統(tǒng)的規(guī)劃是每個(gè) CPU 創(chuàng)建兩個(gè) normal worker_pool:一個(gè) normal 優(yōu)先級(jí) (nice=0)、一個(gè)高優(yōu)先級(jí) (nice=HIGHPRI_NICE_LEVEL),對(duì)應(yīng)創(chuàng)建出來(lái)的 worker 的進(jìn)程 nice 不一樣。

每個(gè) worker 對(duì)應(yīng)一個(gè)worker_thread()內(nèi)核線程,一個(gè) worker_pool 包含一個(gè)或者多個(gè) worker,worker_pool 中 worker 的數(shù)量是根據(jù) worker_pool 中 work 的負(fù)載來(lái)動(dòng)態(tài)增減的。

我們可以通過(guò)ps | grep kworker命令來(lái)查看所有 worker 對(duì)應(yīng)的內(nèi)核線程,normal worker_pool 對(duì)應(yīng)內(nèi)核線程 (worker_thread()) 的命名規(guī)則是這樣的:

snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, pool->attrs->nice < 0 ?? "H" : "");worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);

so 類似名字是 normal worker_pool:

shell@PRO5:/ $ ps | grep "kworker"root 14 2 0 0 worker_thr 0000000000 S kworker/1:0H// cpu1 高優(yōu)先級(jí) worker_pool 的第 0 個(gè) worker 進(jìn)程root 17 2 0 0 worker_thr 0000000000 S kworker/2:0// cpu2 低優(yōu)先級(jí) worker_pool 的第 0 個(gè) worker 進(jìn)程root 18 2 0 0 worker_thr 0000000000 S kworker/2:0H// cpu2 高優(yōu)先級(jí) worker_pool 的第 0 個(gè) worker 進(jìn)程root 23699 2 0 0 worker_thr 0000000000 S kworker/0:1// cpu0 低優(yōu)先級(jí) worker_pool 的第 1 個(gè) worker 進(jìn)程

對(duì)應(yīng)的拓?fù)鋱D如下:

以下是 normal worker_pool 詳細(xì)的創(chuàng)建過(guò)程代碼分析:

kernel/workqueue.c:

init_workqueues()->init_worker_pool()/create_worker()

static int __init init_workqueues(void)

{

int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };

int i, cpu;

// (1) 給每個(gè) cpu 創(chuàng)建對(duì)應(yīng)的 worker_pool

/* initialize CPU pools */

for_each_possible_cpu(cpu) {

struct worker_pool *pool;

i = 0;

for_each_cpu_worker_pool(pool, cpu) {

BUG_ON(init_worker_pool(pool));

// 指定 cpu

pool->cpu = cpu;

cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));

// 指定進(jìn)程優(yōu)先級(jí) nice

pool->attrs->nice = std_nice[i++];

pool->node = cpu_to_node(cpu);

/* alloc pool ID */

mutex_lock(&wq_pool_mutex);

BUG_ON(worker_pool_assign_id(pool));

mutex_unlock(&wq_pool_mutex);

}

}

// (2) 給每個(gè) worker_pool 創(chuàng)建第一個(gè) worker

/* create the initial worker */

for_each_online_cpu(cpu) {

struct worker_pool *pool;

for_each_cpu_worker_pool(pool, cpu) {

pool->flags &= ~POOL_DISASSOCIATED;

BUG_ON(!create_worker(pool));

}

}

}

| →

static int init_worker_pool(struct worker_pool *pool)

{

spin_lock_init(&pool->lock);

pool->id = -1;

pool->cpu = -1;

pool->node = NUMA_NO_NODE;

pool->flags |= POOL_DISASSOCIATED;

// (1.1) worker_pool 的 work list,各個(gè) workqueue 把 work 掛載到這個(gè)鏈表上,

// 讓 worker_pool 對(duì)應(yīng)的多個(gè) worker 來(lái)執(zhí)行

INIT_LIST_HEAD(&pool->worklist);

// (1.2) worker_pool 的 idle worker list,

// worker 沒(méi)有活干時(shí),不會(huì)馬上銷毀,先進(jìn)入 idle 狀態(tài)備選

INIT_LIST_HEAD(&pool->idle_list);

// (1.3) worker_pool 的 busy worker list,

// worker 正在干活,在執(zhí)行 work

hash_init(pool->busy_hash);

// (1.4) 檢查 idle 狀態(tài) worker 是否需要 destroy 的 timer

init_timer_deferrable(&pool->idle_timer);

pool->idle_timer.function = idle_worker_timeout;

pool->idle_timer.data = (unsigned long)pool;

// (1.5) 在 worker_pool 創(chuàng)建新的 worker 時(shí),檢查是否超時(shí)的 timer

setup_timer(&pool->mayday_timer, pool_mayday_timeout,

(unsigned long)pool);

mutex_init(&pool->manager_arb);

mutex_init(&pool->attach_mutex);

INIT_LIST_HEAD(&pool->workers);

ida_init(&pool->worker_ida);

INIT_HLIST_NODE(&pool->hash_node);

pool->refcnt = 1;

/* shouldn't fail above this point */

pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);

if (!pool->attrs)

return -ENOMEM;

return 0;

}

| →

static struct worker *create_worker(struct worker_pool *pool)

{

struct worker *worker = NULL;

int id = -1;

char id_buf[16];

/* ID is needed to determine kthread name */

id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);

if (id < 0)

goto fail;

worker = alloc_worker(pool->node);

if (!worker)

goto fail;

worker->pool = pool;

worker->id = id;

if (pool->cpu >= 0)

// (2.1) 給 normal worker_pool 的 worker 構(gòu)造進(jìn)程名

snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,

pool->attrs->nice < 0? ? "H" : "");

else

// (2.2) 給 unbound worker_pool 的 worker 構(gòu)造進(jìn)程名

snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

// (2.3) 創(chuàng)建 worker 對(duì)應(yīng)的內(nèi)核進(jìn)程

worker->task = kthread_create_on_node(worker_thread, worker, pool->node,

"kworker/%s", id_buf);

if (IS_ERR(worker->task))

goto fail;

// (2.4) 設(shè)置內(nèi)核進(jìn)程對(duì)應(yīng)的優(yōu)先級(jí) nice

set_user_nice(worker->task, pool->attrs->nice);

/* prevent userland from meddling with cpumask of workqueue workers */

worker->task->flags |= PF_NO_SETAFFINITY;

// (2.5) 將 worker 和 worker_pool 綁定

/* successful, attach the worker to the pool */

worker_attach_to_pool(worker, pool);

// (2.6) 將 worker 初始狀態(tài)設(shè)置成 idle,

// wake_up_process 以后,worker 自動(dòng) leave idle 狀態(tài)

/* start the newly created worker */

spin_lock_irq(&pool->lock);

worker->pool->nr_workers++;

worker_enter_idle(worker);

wake_up_process(worker->task);

spin_unlock_irq(&pool->lock);

return worker;

fail:

if (id >= 0)

ida_simple_remove(&pool->worker_ida, id);

kfree(worker);

return NULL;

}

|| →

static void worker_attach_to_pool(struct worker *worker,

struct worker_pool *pool)

{

mutex_lock(&pool->attach_mutex);

// (2.5.1) 將 worker 線程和 cpu 綁定

/*

* set_cpus_allowed_ptr() will fail if the cpumask doesn't have any

* online CPUs. It'll be re-applied when any of the CPUs come up.

*/

set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

/*

* The pool->attach_mutex ensures %POOL_DISASSOCIATED remains

* stable across this function. See the comments above the

* flag definition for details.

*/

if (pool->flags & POOL_DISASSOCIATED)

worker->flags |= WORKER_UNBOUND;

// (2.5.2) 將 worker 加入 worker_pool 鏈表

list_add_tail(&worker->node, &pool->workers);

mutex_unlock(&pool->attach_mutex);

}

1.1.2 unbound worker_pool

大部分的 work 都是通過(guò) normal worker_pool 來(lái)執(zhí)行的 ( 例如通過(guò)schedule_work()、schedule_work_on()壓入到系統(tǒng) workqueue(system_wq) 中的 work),最后都是通過(guò) normal worker_pool 中的 worker 來(lái)執(zhí)行的。這些 worker 是和某個(gè) CPU 綁定的,work 一旦被 worker 開(kāi)始執(zhí)行,都是一直運(yùn)行到某個(gè) CPU 上的不會(huì)切換 CPU。

unbound worker_pool 相對(duì)應(yīng)的意思,就是 worker 可以在多個(gè) CPU 上調(diào)度的。但是他其實(shí)也是綁定的,只不過(guò)它綁定的單位不是 CPU 而是 node。所謂的 node 是對(duì) NUMA(Non Uniform Memory Access Architecture) 系統(tǒng)來(lái)說(shuō)的,NUMA 可能存在多個(gè) node,每個(gè) node 可能包含一個(gè)或者多個(gè) CPU。

unbound worker_pool 對(duì)應(yīng)內(nèi)核線程 (worker_thread()) 的命名規(guī)則是這樣的:

so 類似名字是 unbound worker_pool:

shell@PRO5:/ $ ps | grep "kworker"

root 23906 2 0 0 worker_thr 0000000000 S kworker/u20:2// unbound pool 20 的第 2 個(gè) worker 進(jìn)程

root 24564 2 0 0 worker_thr 0000000000 S kworker/u20:0// unbound pool 20 的第 0 個(gè) worker 進(jìn)程

root 24622 2 0 0 worker_thr 0000000000 S kworker/u21:1// unbound pool 21 的第 1 個(gè) worker 進(jìn)程

unbound worker_pool 也分成兩類:

unbound_std_wq。每個(gè) node 對(duì)應(yīng)一個(gè) worker_pool,多個(gè) node 就對(duì)應(yīng)多個(gè) worker_pool;

對(duì)應(yīng)的拓?fù)鋱D如下:

unbound_std_wq topology

ordered_wq。所有 node 對(duì)應(yīng)一個(gè) default worker_pool;

對(duì)應(yīng)的拓?fù)鋱D如下:

以下是 unbound worker_pool 詳細(xì)的創(chuàng)建過(guò)程代碼分析:

kernel/workqueue.c:

init_workqueues()-> unbound_std_wq_attrs/ordered_wq_attrs

kernel/workqueue.c:

__alloc_workqueue_key()->alloc_and_link_pwqs()->apply_workqueue_attrs()->alloc_unbound_pwq()/numa_pwq_tbl_install()

struct workqueue_struct *__alloc_workqueue_key(const char *fmt,

unsigned int flags,

int max_active,

struct lock_class_key *key,

const char *lock_name, ...)

{

size_t tbl_size = 0;

va_list args;

struct workqueue_struct *wq;

struct pool_workqueue *pwq;

/* see the comment above the definition of WQ_POWER_EFFICIENT */

if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)

flags |= WQ_UNBOUND;

/* allocate wq and format name */

if (flags & WQ_UNBOUND)

tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);

// (1) 分配 workqueue_struct 數(shù)據(jù)結(jié)構(gòu)

wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);

if (!wq)

return NULL;

if (flags & WQ_UNBOUND) {

wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);

if (!wq->unbound_attrs)

goto err_free_wq;

}

va_start(args, lock_name);

vsnprintf(wq->name, sizeof(wq->name), fmt, args);

va_end(args);

// (2) pwq 最多放到 worker_pool 中的 work 數(shù)

max_active = max_active ?: WQ_DFL_ACTIVE;

max_active = wq_clamp_max_active(max_active, flags, wq->name);

/* init wq */

wq->flags = flags;

wq->saved_max_active = max_active;

mutex_init(&wq->mutex);

atomic_set(&wq->nr_pwqs_to_flush, 0);

INIT_LIST_HEAD(&wq->pwqs);

INIT_LIST_HEAD(&wq->flusher_queue);

INIT_LIST_HEAD(&wq->flusher_overflow);

INIT_LIST_HEAD(&wq->maydays);

lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

INIT_LIST_HEAD(&wq->list);

// (3) 給 workqueue 分配對(duì)應(yīng)的 pool_workqueue

// pool_workqueue 將 workqueue 和 worker_pool 鏈接起來(lái)

if (alloc_and_link_pwqs(wq) < 0)

goto err_free_wq;

// (4) 如果是 WQ_MEM_RECLAIM 類型的 workqueue

// 創(chuàng)建對(duì)應(yīng)的 rescuer_thread() 內(nèi)核進(jìn)程

/*

* Workqueues which may be used during memory reclaim should

* have a rescuer to guarantee forward progress.

*/

if (flags & WQ_MEM_RECLAIM) {

struct worker *rescuer;

rescuer = alloc_worker(NUMA_NO_NODE);

if (!rescuer)

goto err_destroy;

rescuer->rescue_wq = wq;

rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",

wq->name);

if (IS_ERR(rescuer->task)) {

kfree(rescuer);

goto err_destroy;

}

wq->rescuer = rescuer;

rescuer->task->flags |= PF_NO_SETAFFINITY;

wake_up_process(rescuer->task);

}

// (5) 如果是需要,創(chuàng)建 workqueue 對(duì)應(yīng)的 sysfs 文件

if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))

goto err_destroy;

/*

* wq_pool_mutex protects global freeze state and workqueues list.

* Grab it, adjust max_active and add the new @wq to workqueues

* list.

*/

mutex_lock(&wq_pool_mutex);

mutex_lock(&wq->mutex);

for_each_pwq(pwq, wq)

pwq_adjust_max_active(pwq);

mutex_unlock(&wq->mutex);

// (6) 將新的 workqueue 加入到全局鏈表 workqueues 中

list_add(&wq->list, &workqueues);

mutex_unlock(&wq_pool_mutex);

return wq;

err_free_wq:

free_workqueue_attrs(wq->unbound_attrs);

kfree(wq);

return NULL;

err_destroy:

destroy_workqueue(wq);

return NULL;

}

| →

static int alloc_and_link_pwqs(struct workqueue_struct *wq)

{

bool highpri = wq->flags & WQ_HIGHPRI;

int cpu, ret;

// (3.1) normal workqueue

// pool_workqueue 鏈接 workqueue 和 worker_pool 的過(guò)程

if (!(wq->flags & WQ_UNBOUND)) {

// 給 workqueue 的每個(gè) cpu 分配對(duì)應(yīng)的 pool_workqueue,賦值給 wq->cpu_pwqs

wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);

if (!wq->cpu_pwqs)

return -ENOMEM;

for_each_possible_cpu(cpu) {

struct pool_workqueue *pwq =

per_cpu_ptr(wq->cpu_pwqs, cpu);

struct worker_pool *cpu_pools =

per_cpu(cpu_worker_pools, cpu);

// 將初始化時(shí)已經(jīng)創(chuàng)建好的 normal worker_pool,賦值給 pool_workqueue

init_pwq(pwq, wq, &cpu_pools[highpri]);

mutex_lock(&wq->mutex);

// 將 pool_workqueue 和 workqueue 鏈接起來(lái)

link_pwq(pwq);

mutex_unlock(&wq->mutex);

}

return 0;

} else if (wq->flags & __WQ_ORDERED) {

// (3.2) unbound ordered_wq workqueue

// pool_workqueue 鏈接 workqueue 和 worker_pool 的過(guò)程

ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);

/* there should only be single pwq for ordering guarantee */

WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||

wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),

"ordering guarantee broken for workqueue %s\n", wq->name);

return ret;

} else {

// (3.3) unbound unbound_std_wq workqueue

// pool_workqueue 鏈接 workqueue 和 worker_pool 的過(guò)程

return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);

}

}

|| →

int apply_workqueue_attrs(struct workqueue_struct *wq,

const struct workqueue_attrs *attrs)

{

// (3.2.1) 根據(jù)的 ubound 的 ordered_wq_attrs/unbound_std_wq_attrs

// 創(chuàng)建對(duì)應(yīng)的 pool_workqueue 和 worker_pool

// 其中 worker_pool 不是默認(rèn)創(chuàng)建好的,是需要?jiǎng)討B(tài)創(chuàng)建的,對(duì)應(yīng)的 worker 內(nèi)核進(jìn)程也要重新創(chuàng)建

// 創(chuàng)建好的 pool_workqueue 賦值給 pwq_tbl[node]

/*

* If something goes wrong during CPU up/down, we'll fall back to

* the default pwq covering whole @attrs->cpumask. Always create

* it even if we don't use it immediately.

*/

dfl_pwq = alloc_unbound_pwq(wq, new_attrs);

if (!dfl_pwq)

goto enomem_pwq;

for_each_node(node) {

if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {

pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);

if (!pwq_tbl[node])

goto enomem_pwq;

} else {

dfl_pwq->refcnt++;

pwq_tbl[node] = dfl_pwq;

}

}

/* save the previous pwq and install the new one */

// (3.2.2) 將臨時(shí) pwq_tbl[node] 賦值給 wq->numa_pwq_tbl[node]

for_each_node(node)

pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);

}

||| →

static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,

const struct workqueue_attrs *attrs)

{

struct worker_pool *pool;

struct pool_workqueue *pwq;

lockdep_assert_held(&wq_pool_mutex);

// (3.2.1.1) 如果對(duì)應(yīng) attrs 已經(jīng)創(chuàng)建多對(duì)應(yīng)的 unbound_pool,則使用已有的 unbound_pool

// 否則根據(jù) attrs 創(chuàng)建新的 unbound_pool

pool = get_unbound_pool(attrs);

if (!pool)

return NULL;

pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);

if (!pwq) {

put_unbound_pool(pool);

return NULL;

}

init_pwq(pwq, wq, pool);

return pwq;

}

1.2 worker

每個(gè) worker 對(duì)應(yīng)一個(gè)worker_thread()內(nèi)核線程,一個(gè) worker_pool 對(duì)應(yīng)一個(gè)或者多個(gè) worker。多個(gè) worker 從同一個(gè)鏈表中 worker_pool->worklist 獲取 work 進(jìn)行處理。

所以這其中有幾個(gè)重點(diǎn):

worker 怎么處理 work;

worker_pool 怎么動(dòng)態(tài)管理 worker 的數(shù)量;

1.2.1 worker 處理 work

處理 work 的過(guò)程主要在worker_thread()->process_one_work()中處理,我們具體看看代碼的實(shí)現(xiàn)過(guò)程。

kernel/workqueue.c:

worker_thread()->process_one_work()

static int worker_thread(void *__worker){struct worker *worker = __worker;struct worker_pool *pool = worker->pool;/* tell the scheduler that this is a workqueue worker */worker->task->flags |= PF_WQ_WORKER;woke_up:spin_lock_irq(&pool->lock);// (1) 是否 die/* am I supposed to die? */if (unlikely(worker->flags & WORKER_DIE)) {spin_unlock_irq(&pool->lock);WARN_ON_ONCE(!list_empty(&worker->entry));worker->task->flags &= ~PF_WQ_WORKER;set_task_comm(worker->task, "kworker/dying");ida_simple_remove(&pool->worker_ida, worker->id);worker_detach_from_pool(worker, pool);kfree(worker);return 0;}// (2) 脫離 idle 狀態(tài)// 被喚醒之前 worker 都是 idle 狀態(tài)worker_leave_idle(worker);recheck:// (3) 如果需要本 worker 繼續(xù)執(zhí)行則繼續(xù),否則進(jìn)入 idle 狀態(tài)// need more worker 的條件: (pool->worklist != 0) && (pool->nr_running == 0)// worklist 上有 work 需要執(zhí)行,并且現(xiàn)在沒(méi)有處于 running 的 work/* no more worker necessary? */if (!need_more_worker(pool))goto sleep;// (4) 如果 (pool->nr_idle == 0),則啟動(dòng)創(chuàng)建更多的 worker// 說(shuō)明 idle 隊(duì)列中已經(jīng)沒(méi)有備用 worker 了,先創(chuàng)建 一些 worker 備用/* do we need to manage? */if (unlikely(!may_start_working(pool)) && manage_workers(worker))goto recheck;/* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */WARN_ON_ONCE(!list_empty(&worker->scheduled));/* * Finish PREP stage. We're guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);do {// (5) 如果 pool->worklist 不為空,從其中取出一個(gè) work 進(jìn)行處理struct work_struct *work =list_first_entry(&pool->worklist, struct work_struct, entry);if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {/* optimization path, not strictly necessary */// (6) 執(zhí)行正常的 workprocess_one_work(worker, work);if (unlikely(!list_empty(&worker->scheduled)))process_scheduled_works(worker);} else {// (7) 執(zhí)行系統(tǒng)特意 scheduled 給某個(gè) worker 的 work// 普通的 work 是放在池子的公共 list 中的 pool->worklist// 只有一些特殊的 work 被特意派送給某個(gè) worker 的 worker->scheduled// 包括:1、執(zhí)行 flush_work 時(shí)插入的 barrier work;// 2、collision 時(shí)從其他 worker 推送到本 worker 的 workmove_linked_works(work, &worker->scheduled, NULL);process_scheduled_works(worker);}// (8) worker keep_working 的條件:// pool->worklist 不為空 && (pool->nr_running <= 1)} while (keep_working(pool));worker_set_flags(worker, WORKER_PREP);supposedsleep:// (9) worker 進(jìn)入 idle 狀態(tài)/* * pool->lock is held and there's no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */worker_enter_idle(worker);__set_current_state(TASK_INTERRUPTIBLE);spin_unlock_irq(&pool->lock);schedule();goto woke_up;}| →static void process_one_work(struct worker *worker, struct work_struct *work)__releases(&pool->lock)__acquires(&pool->lock){struct pool_workqueue *pwq = get_work_pwq(work);struct worker_pool *pool = worker->pool;bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;int work_color;struct worker *collision;#ifdef CONFIG_LOCKDEP/* * It is permissible to free the struct work_struct from * inside the function that is called from it, this we need to * take into account for lockdep too. To avoid bogus "held * lock freed" warnings as well as problems when looking into * work->lockdep_map, make a copy and use that here. */struct lockdep_map lockdep_map;lockdep_copy_map(&lockdep_map, &work->lockdep_map);#endif/* ensure we're on the correct CPU */WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && raw_smp_processor_id() != pool->cpu);// (8.1) 如果 work 已經(jīng)在 worker_pool 的其他 worker 上執(zhí)行,// 將 work 放入對(duì)應(yīng) worker 的 scheduled 隊(duì)列中延后執(zhí)行/* * A single work shouldn't be executed concurrently by * multiple workers on a single cpu. Check whether anyone is * already processing the work. If so, defer the work to the * currently executing one. */collision = find_worker_executing_work(pool, work);if (unlikely(collision)) {move_linked_works(work, &collision->scheduled, NULL);return;}// (8.2) 將 worker 加入 busy 隊(duì)列 pool->busy_hash/* claim and dequeue */debug_work_deactivate(work);hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);worker->current_work = work;worker->current_func = work->func;worker->current_pwq = pwq;work_color = get_work_color(work);list_del_init(&work->entry);// (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE// 則當(dāng)前 work 的執(zhí)行脫離 worker_pool 的動(dòng)態(tài)調(diào)度,成為一個(gè)獨(dú)立的線程/* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */if (unlikely(cpu_intensive))worker_set_flags(worker, WORKER_CPU_INTENSIVE);// (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判斷是否需要喚醒 idle worker// 普通 work 不會(huì)執(zhí)行這個(gè)操作/* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */if (need_more_worker(pool))wake_up_worker(pool);/* * Record the last pool and clear PENDING which should be the last * update to @work. Also, do this inside @pool->lock so that * PENDING and queued state changes happen together while IRQ is * disabled. */set_work_pool_and_clear_pending(work, pool->id);spin_unlock_irq(&pool->lock);lock_map_acquire_read(&pwq->wq->lockdep_map);lock_map_acquire(&lockdep_map);trace_workqueue_execute_start(work);// (8.5) 執(zhí)行 work 函數(shù)worker->current_func(work);/* * While we must be careful to not use "work" after this, the trace * point will only record its address. */trace_workqueue_execute_end(work);lock_map_release(&lockdep_map);lock_map_release(&pwq->wq->lockdep_map);if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n" " last function: %pf\n", current->comm, preempt_count(), task_pid_nr(current), worker->current_func);debug_show_held_locks(current);dump_stack();}/* * The following prevents a kworker from hogging CPU on !PREEMPT * kernels, where a requeueing work item waiting for something to * happen could deadlock with stop_machine as such work item could * indefinitely requeue itself while all other CPUs are trapped in * stop_machine. At the same time, report a quiescent RCU state so * the same condition doesn't freeze RCU. */cond_resched_rcu_qs();spin_lock_irq(&pool->lock);/* clear cpu intensive status */if (unlikely(cpu_intensive))worker_clr_flags(worker, WORKER_CPU_INTENSIVE);/* we're done with it, release */hash_del(&worker->hentry);worker->current_work = NULL;worker->current_func = NULL;worker->current_pwq = NULL;worker->desc_valid = false;pwq_dec_nr_in_flight(pwq, work_color);

}

1.2.2 worker_pool 動(dòng)態(tài)管理 worker

worker_pool 怎么來(lái)動(dòng)態(tài)增減 worker,這部分的算法是 CMWQ 的核心。其思想如下:

worker_pool 中的 worker 有 3 種狀態(tài):idle、running、suspend;

如果 worker_pool 中有 work 需要處理,保持至少一個(gè) running worker 來(lái)處理;

running worker 在處理 work 的過(guò)程中進(jìn)入了阻塞 suspend 狀態(tài),為了保持其他 work 的執(zhí)行,需要喚醒新的 idle worker 來(lái)處理 work;

如果有 work 需要執(zhí)行且 running worker 大于 1 個(gè),會(huì)讓多余的 running worker 進(jìn)入 idle 狀態(tài);

如果沒(méi)有 work 需要執(zhí)行,會(huì)讓所有 worker 進(jìn)入 idle 狀態(tài);

如果創(chuàng)建的 worker 過(guò)多,destroy_worker 在 300s(IDLE_WORKER_TIMEOUT) 時(shí)間內(nèi)沒(méi)有再次運(yùn)行的 idle worker。

詳細(xì)代碼可以參考上節(jié)worker_thread()->process_one_work()的分析。

為了追蹤 worker 的 running 和 suspend 狀態(tài),用來(lái)動(dòng)態(tài)調(diào)整 worker 的數(shù)量。wq 使用在進(jìn)程調(diào)度中加鉤子函數(shù)的技巧:

追蹤 worker 從 suspend 進(jìn)入 running 狀態(tài):ttwu_activate()->wq_worker_waking_up()

追蹤 worker 從 running 進(jìn)入 suspend 狀態(tài):__schedule()->wq_worker_sleeping()

struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu){struct worker *worker = kthread_data(task), *to_wakeup = NULL;struct worker_pool *pool;/* * Rescuers, which may not have all the fields set up like normal * workers, also reach here, let's not access anything before * checking NOT_RUNNING. */if (worker->flags & WORKER_NOT_RUNNING)return NULL;pool = worker->pool;/* this can only happen on the local cpu */if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))return NULL;/* * The counterpart of the following dec_and_test, implied mb, * worklist not empty test sequence is in insert_work(). * Please read comment there. * * NOT_RUNNING is clear. This means that we're bound to and * running on the local cpu w/ rq lock held and preemption * disabled, which in turn means that none else could be * manipulating idle_list, so dereferencing idle_list without pool * lock is safe. */// 減少 worker_pool 中 running 的 worker 數(shù)量// 如果 worklist 還有 work 需要處理,喚醒第一個(gè) idle worker 進(jìn)行處理if (atomic_dec_and_test(&pool->nr_running) && !list_empty(&pool->worklist))to_wakeup = first_idle_worker(pool);return to_wakeup ? to_wakeup->task : NULL;

}

這里 worker_pool 的調(diào)度思想是:如果有 work 需要處理,保持一個(gè) running 狀態(tài)的 worker 處理,不多也不少。

但是這里有一個(gè)問(wèn)題如果 work 是 CPU 密集型的,它雖然也沒(méi)有進(jìn)入 suspend 狀態(tài),但是會(huì)長(zhǎng)時(shí)間的占用 CPU,讓后續(xù)的 work 阻塞太長(zhǎng)時(shí)間。

為了解決這個(gè)問(wèn)題,CMWQ 設(shè)計(jì)了 WQ_CPU_INTENSIVE,如果一個(gè) wq 聲明自己是 CPU_INTENSIVE,則讓當(dāng)前 worker 脫離動(dòng)態(tài)調(diào)度,像是進(jìn)入了 suspend 狀態(tài),那么 CMWQ 會(huì)創(chuàng)建新的 worker,后續(xù)的 work 會(huì)得到執(zhí)行。

kernel/workqueue.c:

worker_thread()->process_one_work()

static void process_one_work(struct worker *worker, struct work_struct *work)__releases(&pool->lock)__acquires(&pool->lock){bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;// (1) 設(shè)置當(dāng)前 worker 的 WORKER_CPU_INTENSIVE 標(biāo)志// nr_running 會(huì)被減 1// 對(duì) worker_pool 來(lái)說(shuō),當(dāng)前 worker 相當(dāng)于進(jìn)入了 suspend 狀態(tài)/* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */if (unlikely(cpu_intensive))worker_set_flags(worker, WORKER_CPU_INTENSIVE);// (2) 接上一步,判斷是否需要喚醒新的 worker 來(lái)處理 work/* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */if (need_more_worker(pool))wake_up_worker(pool);// (3) 執(zhí)行 workworker->current_func(work);// (4) 執(zhí)行完,清理當(dāng)前 worker 的 WORKER_CPU_INTENSIVE 標(biāo)志// 當(dāng)前 worker 重新進(jìn)入 running 狀態(tài)/* clear cpu intensive status */if (unlikely(cpu_intensive))worker_clr_flags(worker, WORKER_CPU_INTENSIVE);}WORKER_NOT_RUNNING= WORKER_PREP | WORKER_CPU_INTENSIVE | WORKER_UNBOUND | WORKER_REBOUND,static inline void worker_set_flags(struct worker *worker, unsigned int flags){struct worker_pool *pool = worker->pool;WARN_ON_ONCE(worker->task != current);/* If transitioning into NOT_RUNNING, adjust nr_running. */if ((flags & WORKER_NOT_RUNNING) && !(worker->flags & WORKER_NOT_RUNNING)) {atomic_dec(&pool->nr_running);}worker->flags |= flags;}static inline void worker_clr_flags(struct worker *worker, unsigned int flags){struct worker_pool *pool = worker->pool;unsigned int oflags = worker->flags;WARN_ON_ONCE(worker->task != current);worker->flags &= ~flags;/* * If transitioning out of NOT_RUNNING, increment nr_running. Note * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask * of multiple flags, not a single flag. */if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))if (!(worker->flags & WORKER_NOT_RUNNING))atomic_inc(&pool->nr_running);

}

1.2.3 CPU hotplug 處理

從上幾節(jié)可以看到,系統(tǒng)會(huì)創(chuàng)建和 CPU 綁定的 normal worker_pool 和不綁定 CPU 的 unbound worker_pool,worker_pool 又會(huì)動(dòng)態(tài)的創(chuàng)建 worker。

那么在 CPU hotplug 的時(shí)候,會(huì)怎么樣動(dòng)態(tài)的處理 worker_pool 和 worker 呢?來(lái)看具體的代碼分析:

kernel/workqueue.c:

workqueue_cpu_up_callback()/workqueue_cpu_down_callback()

static int __init init_workqueues(void)

{cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);}| →static int workqueue_cpu_down_callback(struct notifier_block *nfb, unsigned long action, void *hcpu){int cpu = (unsigned long)hcpu;struct work_struct unbind_work;struct workqueue_struct *wq;switch (action & ~CPU_TASKS_FROZEN) {case CPU_DOWN_PREPARE:/* unbinding per-cpu workers should happen on the local CPU */INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);// (1) cpu down_prepare// 把和當(dāng)前 cpu 綁定的 normal worker_pool 上的 worker 停工// 隨著當(dāng)前 cpu 被 down 掉,這些 worker 會(huì)遷移到其他 cpu 上queue_work_on(cpu, system_highpri_wq, &unbind_work);// (2) unbound wq 對(duì) cpu 變化的更新/* update NUMA affinity of unbound workqueues */mutex_lock(&wq_pool_mutex);list_for_each_entry(wq, &workqueues, list)wq_update_unbound_numa(wq, cpu, false);mutex_unlock(&wq_pool_mutex);/* wait for per-cpu unbinding to finish */flush_work(&unbind_work);destroy_work_on_stack(&unbind_work);break;}return NOTIFY_OK;}| →static int workqueue_cpu_up_callback(struct notifier_block *nfb,unsigned long action, void *hcpu){int CPU = (unsigned long)hcpu;struct worker_pool *pool;struct workqueue_struct *wq;int pi;switch (action & ~CPU_TASKS_FROZEN) {case CPU_UP_PREPARE:for_each_cpu_worker_pool(pool, CPU) {if (pool->nr_workers)continue;if (!create_worker(pool))return NOTIFY_BAD;}break;case CPU_DOWN_FAILED:case CPU_ONLINE:mutex_lock(&wq_pool_mutex);// (3) CPU upfor_each_pool(pool, pi) {mutex_lock(&pool->attach_mutex);// 如果和當(dāng)前 CPU 綁定的 normal worker_pool 上,有 WORKER_UNBOUND 停工的 worker// 重新綁定 worker 到 worker_pool// 讓這些 worker 開(kāi)工,并綁定到當(dāng)前 CPUif (pool->CPU == CPU)rebind_workers(pool);else if (pool->CPU < 0)restore_unbound_workers_cpumask(pool, CPU);mutex_unlock(&pool->attach_mutex);}/* update NUMA affinity of unbound workqueues */list_for_each_entry(wq, &workqueues, list)wq_update_unbound_numa(wq, CPU, true);mutex_unlock(&wq_pool_mutex);break;}return NOTIFY_OK;

}

1.3 workqueue

workqueue 就是存放一組 work 的集合,基本可以分為兩類:一類系統(tǒng)創(chuàng)建的 workqueue,一類是用戶自己創(chuàng)建的 workqueue。

不論是系統(tǒng)還是用戶的 workqueue,如果沒(méi)有指定 WQ_UNBOUND,默認(rèn)都是和 normal worker_pool 綁定。

1.3.1 系統(tǒng) workqueue

系統(tǒng)在初始化時(shí)創(chuàng)建了一批默認(rèn)的 workqueue:system_wq、system_highpri_wq、system_long_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。

像 system_wq,就是 schedule_work() 默認(rèn)使用的。

kernel/workqueue.c:

init_workqueues()

static int __init init_workqueues(void){system_wq = alloc_workqueue("events", 0, 0);system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);system_long_wq = alloc_workqueue("events_long", 0, 0);system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE);system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0);system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0);system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0);}

1.3.2 workqueue 創(chuàng)建

詳細(xì)過(guò)程見(jiàn)上幾節(jié)的代碼分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

1.3.3 flush_workqueue()

這一部分的邏輯,wq->work_color、wq->flush_color 換來(lái)?yè)Q去的邏輯實(shí)在看的頭暈。看不懂暫時(shí)不想看,放著以后看吧,或者有誰(shuí)看懂了教我一下。:)

1.4 pool_workqueue

pool_workqueue 只是一個(gè)中介角色。

詳細(xì)過(guò)程見(jiàn)上幾節(jié)的代碼分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

1.5 work

描述一份待執(zhí)行的工作。

1.5.1 queue_work()

將 work 壓入到 workqueue 當(dāng)中。

kernel/workqueue.c:

queue_work() -> queue_work_on() -> __queue_work()

static void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work){struct pool_workqueue *pwq;struct worker_pool *last_pool;struct list_head *worklist;unsigned int work_flags;unsigned int req_cpu = cpu;/* * While a work item is PENDING && off queue, a task trying to * steal the PENDING will busy-loop waiting for it to either get * queued or lose PENDING. Grabbing PENDING and queueing should * happen with IRQ disabled. */WARN_ON_ONCE(!irqs_disabled());debug_work_activate(work);/* if draining, only works from the same workqueue are allowed */if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq)))return;retry:// (1) 如果沒(méi)有指定 cpu,則使用當(dāng)前 cpuif (req_cpu == WORK_CPU_UNBOUND)cpu = raw_smp_processor_id();/* pwq which will be used unless @work is executing elsewhere */if (!(wq->flags & WQ_UNBOUND))// (2) 對(duì)于 normal wq,使用當(dāng)前 cpu 對(duì)應(yīng)的 normal worker_poolpwq = per_cpu_ptr(wq->cpu_pwqs, cpu);else// (3) 對(duì)于 unbound wq,使用當(dāng)前 cpu 對(duì)應(yīng) node 的 worker_poolpwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));// (4) 如果 work 在其他 worker 上正在被執(zhí)行,把 work 壓到對(duì)應(yīng)的 worker 上去// 避免 work 出現(xiàn)重入的問(wèn)題/* * If @work was previously on a different pool, it might still be * running there, in which case the work needs to be queued on that * pool to guarantee non-reentrancy. */last_pool = get_work_pool(work);if (last_pool && last_pool != pwq->pool) {struct worker *worker;spin_lock(&last_pool->lock);worker = find_worker_executing_work(last_pool, work);if (worker && worker->current_pwq->wq == wq) {pwq = worker->current_pwq;} else {/* meh... not running there, queue here */spin_unlock(&last_pool->lock);spin_lock(&pwq->pool->lock);}} else {spin_lock(&pwq->pool->lock);}/* * pwq is determined and locked. For unbound pools, we could have * raced with pwq release and it could already be dead. If its * refcnt is zero, repeat pwq selection. Note that pwqs never die * without another pwq replacing it in the numa_pwq_tbl or while * work items are executing on it, so the retrying is guaranteed to * make forward-progress. */if (unlikely(!pwq->refcnt)) {if (wq->flags & WQ_UNBOUND) {spin_unlock(&pwq->pool->lock);cpu_relax();goto retry;}/* oops */WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt", wq->name, cpu);}/* pwq determined, queue */trace_workqueue_queue_work(req_cpu, pwq, work);if (WARN_ON(!list_empty(&work->entry))) {spin_unlock(&pwq->pool->lock);return;}pwq->nr_in_flight[pwq->work_color]++;work_flags = work_color_to_flags(pwq->work_color);// (5) 如果還沒(méi)有達(dá)到 max_active,將 work 掛載到 pool->worklistif (likely(pwq->nr_active < pwq->max_active)) {trace_workqueue_activate_work(work);pwq->nr_active++;worklist = &pwq->pool->worklist;// 否則,將 work 掛載到臨時(shí)隊(duì)列 pwq->delayed_works} else {work_flags |= WORK_STRUCT_DELAYED;worklist = &pwq->delayed_works;}// (6) 將 work 壓入 worklist 當(dāng)中insert_work(pwq, work, worklist, work_flags);spin_unlock(&pwq->pool->lock);

}

1.5.2flush_work()

flush 某個(gè) work,確保 work 執(zhí)行完成。

怎么判斷異步的 work 已經(jīng)執(zhí)行完成?這里面使用了一個(gè)技巧:在目標(biāo) work 的后面插入一個(gè)新的 work wq_barrier,如果 wq_barrier 執(zhí)行完成,那么目標(biāo) work 肯定已經(jīng)執(zhí)行完成。

kernel/workqueue.c:

queue_work()->queue_work_on()->__queue_work()

/** * flush_work - wait for a work to finish executing the last queueing instance * @work: the work to flush * * Wait until @work has finished execution. @work is guaranteed to be idle * on return if it hasn't been requeued since flush started. * * Return: * %true if flush_work() waited for the work to finish execution, * %false if it was already idle. */bool flush_work(struct work_struct *work){struct wq_barrier barr;lock_map_acquire(&work->lockdep_map);lock_map_release(&work->lockdep_map);if (start_flush_work(work, &barr)) {// 等待 barr work 執(zhí)行完成的信號(hào)wait_for_completion(&barr.done);destroy_work_on_stack(&barr.work);return true;} else {return false;}}| →static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr){struct worker *worker = NULL;struct worker_pool *pool;struct pool_workqueue *pwq;might_sleep();// (1) 如果 work 所在 worker_pool 為 NULL,說(shuō)明 work 已經(jīng)執(zhí)行完local_irq_disable();pool = get_work_pool(work);if (!pool) {local_irq_enable();return false;}spin_lock(&pool->lock);/* see the comment in try_to_grab_pending() with the same code */pwq = get_work_pwq(work);if (pwq) {// (2) 如果 work 所在 pwq 指向的 worker_pool 不等于上一步得到的 worker_pool,說(shuō)明 work 已經(jīng)執(zhí)行完if (unlikely(pwq->pool != pool))goto already_gone;} else {// (3) 如果 work 所在 pwq 為 NULL,并且也沒(méi)有在當(dāng)前執(zhí)行的 work 中,說(shuō)明 work 已經(jīng)執(zhí)行完worker = find_worker_executing_work(pool, work);if (!worker)goto already_gone;pwq = worker->current_pwq;}// (4) 如果 work 沒(méi)有執(zhí)行完,向 work 的后面插入 barr workinsert_wq_barrier(pwq, barr, work, worker);spin_unlock_irq(&pool->lock);/* * If @max_active is 1 or rescuer is in use, flushing another work * item on the same workqueue may lead to deadlock. Make sure the * flusher is not running on the same workqueue by verifying write * access. */if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)lock_map_acquire(&pwq->wq->lockdep_map);elselock_map_acquire_read(&pwq->wq->lockdep_map);lock_map_release(&pwq->wq->lockdep_map);return true;already_gone:spin_unlock_irq(&pool->lock);return false;

}

|| →

static void insert_wq_barrier(struct pool_workqueue *pwq, struct wq_barrier *barr, struct work_struct *target, struct worker *worker){struct list_head *head;unsigned int linked = 0;/* * debugobject calls are safe here even with pool->lock locked * as we know for sure that this will not trigger any of the * checks and call back into the fixup functions where we * might deadlock. */// (4.1) barr work 的執(zhí)行函數(shù) wq_barrier_func()INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));init_completion(&barr->done);/* * If @target is currently being executed, schedule the * barrier to the worker; otherwise, put it after @target. */// (4.2) 如果 work 當(dāng)前在 worker 中執(zhí)行,則 barr work 插入 scheduled 隊(duì)列if (worker)head = worker->scheduled.next;// 否則,則 barr work 插入正常的 worklist 隊(duì)列中,插入位置在目標(biāo) work 后面// 并且置上 WORK_STRUCT_LINKED 標(biāo)志else {unsigned long *bits = work_data_bits(target);head = target->entry.next;/* there can already be other linked works, inherit and set */linked = *bits & WORK_STRUCT_LINKED;__set_bit(WORK_STRUCT_LINKED_BIT, bits);}debug_work_activate(&barr->work);insert_work(pwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR) | linked);

}

||| →

static void wq_barrier_func(struct work_struct *work){struct wq_barrier *barr = container_of(work, struct wq_barrier, work);// (4.1.1) barr work 執(zhí)行完成,發(fā)出 complete 信號(hào)。complete(&barr->done);

}

2.Workqueue 對(duì)外接口函數(shù)

CMWQ 實(shí)現(xiàn)的 workqueue 機(jī)制,被包裝成相應(yīng)的對(duì)外接口函數(shù)。

2.1schedule_work()

把 work 壓入系統(tǒng)默認(rèn) wq system_wq,WORK_CPU_UNBOUND 指定 worker 為當(dāng)前 CPU 綁定的 normal worker_pool 創(chuàng)建的 worker。

kernel/workqueue.c:

schedule_work()->queue_work_on()->__queue_work()

2.2schedule_work_on()

在schedule_work()基礎(chǔ)上,可以指定 work 運(yùn)行的 CPU。

kernel/workqueue.c:

schedule_work_on()->queue_work_on()->__queue_work()

2.3schedule_delayed_work()

啟動(dòng)一個(gè) timer,在 timer 定時(shí)到了以后調(diào)用delayed_work_timer_fn()把 work 壓入系統(tǒng)默認(rèn) wq system_wq。

kernel/workqueue.c:

schedule_work_on()->queue_work_on()->__queue_work()

static inline bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)

{return queue_delayed_work(system_wq, dwork, delay);

}

| →

static inline bool queue_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay)

{return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);}|| →bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay){struct work_struct *work = &dwork->work;bool ret = false;unsigned long flags;/* read the comment in __queue_work() */local_irq_save(flags);if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {__queue_delayed_work(cpu, wq, dwork, delay);ret = true;}local_irq_restore(flags);return ret;

}

||| →

static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay){struct timer_list *timer = &dwork->timer;struct work_struct *work = &dwork->work;WARN_ON_ONCE(timer->function != delayed_work_timer_fn || timer->data != (unsigned long)dwork);WARN_ON_ONCE(timer_pending(timer));WARN_ON_ONCE(!list_empty(&work->entry));/* * If @delay is 0, queue @dwork->work immediately. This is for * both optimization and correctness. The earliest @timer can * expire is on the closest next tick and delayed_work users depend * on that there's no such delay when @delay is 0. */if (!delay) {__queue_work(cpu, wq, &dwork->work);return;}timer_stats_timer_set_start_info(&dwork->timer);dwork->wq = wq;dwork->cpu = cpu;timer->expires = jiffies + delay;if (unlikely(cpu != WORK_CPU_UNBOUND))add_timer_on(timer, cpu);elseadd_timer(timer);

}

|||| →

void delayed_work_timer_fn(unsigned long __data)

{struct delayed_work *dwork = (struct delayed_work *)__data;/* should have been called from irqsafe timer with irq already off */__queue_work(dwork->cpu, dwork->wq, &dwork->work);

}

參考資料

Documentation/workqueue.txt

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • cpu
    cpu
    +關(guān)注

    關(guān)注

    68

    文章

    10855

    瀏覽量

    211606
  • Linux
    +關(guān)注

    關(guān)注

    87

    文章

    11296

    瀏覽量

    209358

原文標(biāo)題:魅族內(nèi)核團(tuán)隊(duì): Linux Workqueue

文章出處:【微信號(hào):LinuxDev,微信公眾號(hào):Linux閱碼場(chǎng)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類庫(kù)多線程TaskPool和Worker的對(duì)比(二)

    易用,支持任務(wù)的執(zhí)行、取消。工作線程數(shù)量上限為4。 Worker運(yùn)作機(jī)制 圖2 Worker運(yùn)作機(jī)制示意圖 創(chuàng)建Worker的線程稱為宿主線程(不一定是主線程,工作線程也支持
    發(fā)表于 03-26 15:25

    鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類庫(kù)多線程TaskPool和Worker的對(duì)比(三)

    是不同的,因此TaskPool工作線程只能使用線程安全的庫(kù),例如UI相關(guān)的非線程安全庫(kù)不能使用。 序列化傳輸?shù)臄?shù)據(jù)量大小限制為16MB。 二、Worker注意事項(xiàng) 創(chuàng)建Worker時(shí),傳入的W
    發(fā)表于 03-27 16:26

    u-boot詳細(xì)代碼分析

    u-boot詳細(xì)代碼分析,自己整理了一下
    發(fā)表于 12-12 22:09

    uboot代碼詳細(xì)分析

    [url=]uboot代碼詳細(xì)分析[/url]
    發(fā)表于 01-29 13:51

    Linux內(nèi)核創(chuàng)建新進(jìn)程的過(guò)程分析

    PCB包含了一個(gè)進(jìn)程的重要運(yùn)行信息,所以我們將圍繞在創(chuàng)建一個(gè)新進(jìn)程時(shí),如何來(lái)建立一個(gè)新的PCB的這一個(gè)過(guò)程來(lái)進(jìn)行分析,在Linux系統(tǒng)中,PCB主要是存儲(chǔ)在一個(gè)叫做task_struct這一個(gè)結(jié)構(gòu)體中,
    發(fā)表于 08-08 08:42

    FreeRTOS的任務(wù)創(chuàng)建過(guò)程

    FreeRTOS筆記(四):任務(wù)創(chuàng)建/刪除,掛起/解掛詳解在第二篇筆記中介紹了任務(wù)創(chuàng)建的API,并且簡(jiǎn)單使用了相關(guān)API,本文將詳細(xì)介紹任務(wù)創(chuàng)建過(guò)
    發(fā)表于 02-08 06:10

    altium-designer使用PCB向?qū)?lái)創(chuàng)建PCB詳細(xì)過(guò)程

    altium-designer使用PCB向?qū)?lái)創(chuàng)建PCB詳細(xì)過(guò)程,感興趣的可以看看。
    發(fā)表于 07-22 16:08 ?0次下載

    詳解移動(dòng)通信領(lǐng)域里的組POOL

    在移動(dòng)通信領(lǐng)域,我們經(jīng)常會(huì)提到Pool的概念。Pool,通常譯為水塘、水池。在移動(dòng)通信中POOL通稱為“池”
    的頭像 發(fā)表于 03-19 16:15 ?7886次閱讀
    詳解移動(dòng)通信領(lǐng)域里的組<b class='flag-5'>POOL</b>

    為何需要CMWQ?CMWQ如何解決問(wèn)題的呢?

    的workqueue關(guān)聯(lián),而是所有的workqueue共享。用戶可以創(chuàng)建workqueue(不創(chuàng)建worker pool)并通過(guò)flag來(lái)約束掛入該workqueue上work的處理方
    的頭像 發(fā)表于 08-20 14:47 ?5309次閱讀

    Bootloader是什么Bootloader的介紹和過(guò)程詳細(xì)

    3.Bootloader工作模式 4.Bootloader啟動(dòng)過(guò)程5.vivi Bootloader源代碼分析 6.vivi Bootloader接口命令7.vivi 源代碼修改移植
    發(fā)表于 12-11 17:33 ?48次下載
    Bootloader是什么Bootloader的介紹和<b class='flag-5'>過(guò)程</b><b class='flag-5'>詳細(xì)</b>解

    如何用Worker pool解決異步任務(wù)的問(wèn)題

    在一些常見(jiàn)的場(chǎng)景中,如果遇到了某些請(qǐng)求特別耗時(shí)間,為了不影響其它用戶的請(qǐng)求以及節(jié)約服務(wù)器資源,我們通常會(huì)考慮使用異步任務(wù)隊(duì)列去解決,這樣可以快速地處理請(qǐng)求、只返回給用戶任務(wù)創(chuàng)建結(jié)果,等待任務(wù)完成之后,我們?cè)俑嬷脩羧蝿?wù)的完成情況。
    的頭像 發(fā)表于 06-08 14:58 ?1652次閱讀

    ModBus Pool下載

    ModBus Pool下載
    發(fā)表于 10-08 09:41 ?6次下載

    公用池化包Commons Pool 2

    Redis 的常用客戶端 Jedis,就是使用 Commons Pool 管理連接池的,可以說(shuō)是一個(gè)最佳實(shí)踐。下圖是 Jedis 使用工廠創(chuàng)建對(duì)象的主要代碼塊。對(duì)象工廠類最主要的方法就是
    的頭像 發(fā)表于 05-04 10:36 ?1016次閱讀
    公用池化包Commons <b class='flag-5'>Pool</b> 2

    鴻蒙APP開(kāi)發(fā):【ArkTS類庫(kù)多線程】TaskPool和Worker的對(duì)比(2)

    創(chuàng)建Worker的線程稱為宿主線程(不一定是主線程,工作線程也支持創(chuàng)建Worker子線程),Worker自身的線程稱為
    的頭像 發(fā)表于 03-27 15:44 ?527次閱讀
    鴻蒙APP開(kāi)發(fā):【ArkTS類庫(kù)多線程】TaskPool和<b class='flag-5'>Worker</b>的對(duì)比(2)

    鴻蒙語(yǔ)言基礎(chǔ)類庫(kù):ohos.worker 啟動(dòng)一個(gè)Worker

    Worker是與主線程并行的獨(dú)立線程。創(chuàng)建Worker的線程稱之為宿主線程,Worker自身的線程稱之為Worker線程。
    的頭像 發(fā)表于 07-11 17:03 ?463次閱讀
    鴻蒙語(yǔ)言基礎(chǔ)類庫(kù):ohos.<b class='flag-5'>worker</b> 啟動(dòng)一個(gè)<b class='flag-5'>Worker</b>
    主站蜘蛛池模板: a级成人免费毛片完整版| 日操夜操天天操| 欧美精品一区二区三区视频| 亚洲AV色香蕉一区二区9255| 丁香成人网址| 欧美久久综合性欧美| 99国产在线视频有精品视频| 久久学生精品国产自在拍| 亚洲免费国产| 国精产品一区一区三区有限公司| 特黄特色大片免费播放器试看 | 美国caopo超碰在线视频| 一本之道高清www在线观看| 黄色a一级视频| 亚洲伊人久久网| 久久免费精彩视频| 18岁末年禁止观看免费1000个| 快播电影官方网站| 99视频精品全部免费观看| 飘雪在线观看免费高清完整版韩国| seyeye免费高清观看| 日本熟妇乱人伦A片精品软件 | 国产精品99久久久久久人韩国| 视频网站入口在线看| 国产精品国产三级国AV在线观看| 偷偷鲁青春草原视频分类| 国产女人与黑人在线播放| 亚洲欧美日韩高清中文在线| 寂寞夜晚在线视频观看| 做暖免费观看日本| 热久久国产欧美一区二区精品| 大胸美女裸身色诱网站| 性xxxx18公交车| 久久久久国产一级毛片高清片| 最新黄色在线| 日本夜夜夜| 含羞草完整视频在线播放免费| 在线播放成人无码日| 欧洲xxxxx| 国产午夜在线精品三级a午夜电影| 亚洲影院在线播放|