在3-工作队列使用-Workqueue文章中,我们已经简单的调用以下API来使用工作队列。
/* 1. 自己创建一个workqueue, 中间参数为0,默认配置 */workqueue_test = alloc_workqueue("workqueue_test", 0, 0);/* 2. 初始化一个工作项,并添加自己实现的函数 */INIT_WORK(&work_test, work_test_func);/* 3. 将自己的工作项添加到指定的工作队列去, 同时唤醒相应线程处理 */queue_work(workqueue_test, &work_test);
可是为什么这个函数会被调用了? 在什么时候被调用的?这样的实现过程是最优的嘛? 带着上面几个问题,我们来深入分析一下工作队列。
创建工作队列
alloc_workqueue
alloc_workqueue 函数 调用了 __alloc_workqueue_key 来创建一个工作队列
# kernel/workqueue.cstruct workqueue_struct *__alloc_workqueue_key(const char *fmt,unsigned int flags,int max_active,struct lock_class_key *key,const char *lock_name, ...)struct workqueue_struct *wq;struct pool_workqueue *pwq;.../* (1) pwq 最多放到 worker_pool 中的 work 数*/max_active = max_active ?: WQ_DFL_ACTIVE;max_active = wq_clamp_max_active(max_active, flags, wq->name);/*对一系列的链表进行初始化*/.../*给 workqueue 分配对应的 pool_workqueue*//*pool_workqueue 将 workqueue 和 worker_pool 链接起来*/alloc_and_link_pwqs(wq)/*如果是 WQ_MEM_RECLAIM 类型的 workqueue 需要建立一个线程来保护*/if (flags & WQ_MEM_RECLAIM)struct worker *rescuer;rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",wq->name);/*将绑定的线程绑定到CPU上*/kthread_bind_mask(rescuer->task, cpu_possible_mask);/*需要的话建立 sysfs 文件接口*/if (wq->flags & WQ_SYSFS)workqueue_sysfs_register(wq)/*将新的 workqueue 加入到全局链表 workqueues 中*//*list_add_tail_rcu - add a new entry to rcu-protected list*/list_add_tail_rcu(&wq->list, &workqueues);
上面我们看到了一个全局链表 workqueues,以及提到了workqueue 和 worker_pool的绑定。下面我们来分析以这些变量和操作的作用。
workqueue 和 worker_pool 的绑定
static int alloc_and_link_pwqs(struct workqueue_struct *wq)/* normal workqueue */if (!(wq->flags & WQ_UNBOUND))wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);/*遍历每一个CPU*/for_each_possible_cpu(cpu)struct pool_workqueue *pwq =per_cpu_ptr(wq->cpu_pwqs, cpu);struct worker_pool *cpu_pools =per_cpu(cpu_worker_pools, cpu);init_pwq(pwq, wq, &cpu_pools[highpri]);/*绑定 struct workqueue_struct 和 struct pool_workqueue 结构体*//*workqueue 和 pool_workqueue*/link_pwq(pwq);
worker
每个 worker 对应一个 worker_thread() 内核线程,一个 worker_pool 对应一个或者多个 worker。多个 worker 从同一个链表中 worker_pool->worklist 获取 work 进行处理。那么问题来了。
- worker 怎么处理 work;
- worker_pool 怎么动态管理 worker 的数量;
worker 处理 work
上面我们提到一个worker对应一个worker_thread() 内核线程,那么从代码入手
worker_thread
kernel/workqueue.cstatic int worker_thread(void *__worker)struct worker *worker = __worker;struct worker_pool *pool = worker->pool;/* tell the scheduler that this is a workqueue worker *//*我们前面提到过 一个worker_pool 管理多个 worker那么worker_pool 需要知道 worker 的状态,就是下面这面标志*/worker->task->flags |= PF_WQ_WORKER;...do {/*遍历 pool->worklist 获得工作*/struct work_struct *work =list_first_entry(&pool->worklist,struct work_struct, entry);pool->watchdog_ts = jiffies;/*调度工作 work*/if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {/* optimization path, not strictly necessary */process_one_work(worker, work);if (unlikely(!list_empty(&worker->scheduled)))process_scheduled_works(worker);} else {move_linked_works(work, &worker->scheduled, NULL);process_scheduled_works(worker);}} while (keep_working(pool));
上面会调用process_one_work 函数处理单个work.
process_one_work
/*** process_one_work - process single work* @worker: self* @work: work to process** Process @work. This function contains all the logics necessary to* process a single work including synchronization against and* interaction with other workers on the same cpu, queueing and* flushing. As long as context requirement is met, any worker can* call this function to process a work.** CONTEXT:* spin_lock_irq(pool->lock) which is released and regrabbed.*/static void process_one_work(struct worker *worker, struct work_struct *work)struct pool_workqueue *pwq = get_work_pwq(work);struct worker_pool *pool = worker->pool;worker->current_work = work;worker->current_func = work->func;worker->current_pwq = pwq;/*执行我们注册的函数,现在这里就会处理对应的函数了*/worker->current_func(work);list_del_init(&work->entry);
worker_thread 函数的创建
前面我们已经知道 worker_thread 线程是按照下面的顺序来处理
worker_threadprocess_one_work调用我们需要调用的回调函数
