Line data Source code
1 : // SPDX-License-Identifier: GPL-2.0
2 : /*
3 : * Basic worker thread pool for io_uring
4 : *
5 : * Copyright (C) 2019 Jens Axboe
6 : *
7 : */
8 : #include <linux/kernel.h>
9 : #include <linux/init.h>
10 : #include <linux/errno.h>
11 : #include <linux/sched/signal.h>
12 : #include <linux/percpu.h>
13 : #include <linux/slab.h>
14 : #include <linux/rculist_nulls.h>
15 : #include <linux/cpu.h>
16 : #include <linux/task_work.h>
17 : #include <linux/audit.h>
18 : #include <uapi/linux/io_uring.h>
19 :
20 : #include "io-wq.h"
21 :
22 : #define WORKER_IDLE_TIMEOUT (5 * HZ)
23 :
24 : enum {
25 : IO_WORKER_F_UP = 1, /* up and active */
26 : IO_WORKER_F_RUNNING = 2, /* account as running */
27 : IO_WORKER_F_FREE = 4, /* worker on free list */
28 : IO_WORKER_F_BOUND = 8, /* is doing bounded work */
29 : };
30 :
31 : enum {
32 : IO_WQ_BIT_EXIT = 0, /* wq exiting */
33 : };
34 :
35 : enum {
36 : IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
37 : };
38 :
39 : /*
40 : * One for each thread in a wqe pool
41 : */
42 : struct io_worker {
43 : refcount_t ref;
44 : unsigned flags;
45 : struct hlist_nulls_node nulls_node;
46 : struct list_head all_list;
47 : struct task_struct *task;
48 : struct io_wqe *wqe;
49 :
50 : struct io_wq_work *cur_work;
51 : struct io_wq_work *next_work;
52 : raw_spinlock_t lock;
53 :
54 : struct completion ref_done;
55 :
56 : unsigned long create_state;
57 : struct callback_head create_work;
58 : int create_index;
59 :
60 : union {
61 : struct rcu_head rcu;
62 : struct work_struct work;
63 : };
64 : };
65 :
66 : #if BITS_PER_LONG == 64
67 : #define IO_WQ_HASH_ORDER 6
68 : #else
69 : #define IO_WQ_HASH_ORDER 5
70 : #endif
71 :
72 : #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
73 :
74 : struct io_wqe_acct {
75 : unsigned nr_workers;
76 : unsigned max_workers;
77 : int index;
78 : atomic_t nr_running;
79 : raw_spinlock_t lock;
80 : struct io_wq_work_list work_list;
81 : unsigned long flags;
82 : };
83 :
84 : enum {
85 : IO_WQ_ACCT_BOUND,
86 : IO_WQ_ACCT_UNBOUND,
87 : IO_WQ_ACCT_NR,
88 : };
89 :
90 : /*
91 : * Per-node worker thread pool
92 : */
93 : struct io_wqe {
94 : raw_spinlock_t lock;
95 : struct io_wqe_acct acct[IO_WQ_ACCT_NR];
96 :
97 : int node;
98 :
99 : struct hlist_nulls_head free_list;
100 : struct list_head all_list;
101 :
102 : struct wait_queue_entry wait;
103 :
104 : struct io_wq *wq;
105 : struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
106 :
107 : cpumask_var_t cpu_mask;
108 : };
109 :
110 : /*
111 : * Per io_wq state
112 : */
113 : struct io_wq {
114 : unsigned long state;
115 :
116 : free_work_fn *free_work;
117 : io_wq_work_fn *do_work;
118 :
119 : struct io_wq_hash *hash;
120 :
121 : atomic_t worker_refs;
122 : struct completion worker_done;
123 :
124 : struct hlist_node cpuhp_node;
125 :
126 : struct task_struct *task;
127 :
128 : struct io_wqe *wqes[];
129 : };
130 :
131 : static enum cpuhp_state io_wq_online;
132 :
133 : struct io_cb_cancel_data {
134 : work_cancel_fn *fn;
135 : void *data;
136 : int nr_running;
137 : int nr_pending;
138 : bool cancel_all;
139 : };
140 :
141 : static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
142 : static void io_wqe_dec_running(struct io_worker *worker);
143 : static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
144 : struct io_wqe_acct *acct,
145 : struct io_cb_cancel_data *match);
146 : static void create_worker_cb(struct callback_head *cb);
147 : static void io_wq_cancel_tw_create(struct io_wq *wq);
148 :
149 : static bool io_worker_get(struct io_worker *worker)
150 : {
151 0 : return refcount_inc_not_zero(&worker->ref);
152 : }
153 :
154 0 : static void io_worker_release(struct io_worker *worker)
155 : {
156 0 : if (refcount_dec_and_test(&worker->ref))
157 0 : complete(&worker->ref_done);
158 0 : }
159 :
160 : static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
161 : {
162 0 : return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
163 : }
164 :
165 : static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
166 : struct io_wq_work *work)
167 : {
168 0 : return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
169 : }
170 :
171 : static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
172 : {
173 0 : return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
174 : }
175 :
176 : static void io_worker_ref_put(struct io_wq *wq)
177 : {
178 0 : if (atomic_dec_and_test(&wq->worker_refs))
179 0 : complete(&wq->worker_done);
180 : }
181 :
182 0 : static void io_worker_cancel_cb(struct io_worker *worker)
183 : {
184 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
185 0 : struct io_wqe *wqe = worker->wqe;
186 0 : struct io_wq *wq = wqe->wq;
187 :
188 0 : atomic_dec(&acct->nr_running);
189 0 : raw_spin_lock(&worker->wqe->lock);
190 0 : acct->nr_workers--;
191 0 : raw_spin_unlock(&worker->wqe->lock);
192 0 : io_worker_ref_put(wq);
193 0 : clear_bit_unlock(0, &worker->create_state);
194 0 : io_worker_release(worker);
195 0 : }
196 :
197 0 : static bool io_task_worker_match(struct callback_head *cb, void *data)
198 : {
199 : struct io_worker *worker;
200 :
201 0 : if (cb->func != create_worker_cb)
202 : return false;
203 0 : worker = container_of(cb, struct io_worker, create_work);
204 0 : return worker == data;
205 : }
206 :
207 0 : static void io_worker_exit(struct io_worker *worker)
208 : {
209 0 : struct io_wqe *wqe = worker->wqe;
210 0 : struct io_wq *wq = wqe->wq;
211 :
212 0 : while (1) {
213 0 : struct callback_head *cb = task_work_cancel_match(wq->task,
214 : io_task_worker_match, worker);
215 :
216 0 : if (!cb)
217 : break;
218 0 : io_worker_cancel_cb(worker);
219 : }
220 :
221 0 : io_worker_release(worker);
222 0 : wait_for_completion(&worker->ref_done);
223 :
224 0 : raw_spin_lock(&wqe->lock);
225 0 : if (worker->flags & IO_WORKER_F_FREE)
226 0 : hlist_nulls_del_rcu(&worker->nulls_node);
227 0 : list_del_rcu(&worker->all_list);
228 0 : raw_spin_unlock(&wqe->lock);
229 0 : io_wqe_dec_running(worker);
230 0 : worker->flags = 0;
231 0 : preempt_disable();
232 0 : current->flags &= ~PF_IO_WORKER;
233 0 : preempt_enable();
234 :
235 0 : kfree_rcu(worker, rcu);
236 0 : io_worker_ref_put(wqe->wq);
237 0 : do_exit(0);
238 : }
239 :
240 : static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
241 : {
242 0 : bool ret = false;
243 :
244 0 : raw_spin_lock(&acct->lock);
245 0 : if (!wq_list_empty(&acct->work_list) &&
246 0 : !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
247 0 : ret = true;
248 0 : raw_spin_unlock(&acct->lock);
249 :
250 : return ret;
251 : }
252 :
253 : /*
254 : * Check head of free list for an available worker. If one isn't available,
255 : * caller must create one.
256 : */
257 0 : static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
258 : struct io_wqe_acct *acct)
259 : __must_hold(RCU)
260 : {
261 : struct hlist_nulls_node *n;
262 : struct io_worker *worker;
263 :
264 : /*
265 : * Iterate free_list and see if we can find an idle worker to
266 : * activate. If a given worker is on the free_list but in the process
267 : * of exiting, keep trying.
268 : */
269 0 : hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
270 0 : if (!io_worker_get(worker))
271 0 : continue;
272 0 : if (io_wqe_get_acct(worker) != acct) {
273 0 : io_worker_release(worker);
274 0 : continue;
275 : }
276 0 : if (wake_up_process(worker->task)) {
277 0 : io_worker_release(worker);
278 0 : return true;
279 : }
280 0 : io_worker_release(worker);
281 : }
282 :
283 : return false;
284 : }
285 :
286 : /*
287 : * We need a worker. If we find a free one, we're good. If not, and we're
288 : * below the max number of workers, create one.
289 : */
290 0 : static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
291 : {
292 : /*
293 : * Most likely an attempt to queue unbounded work on an io_wq that
294 : * wasn't setup with any unbounded workers.
295 : */
296 0 : if (unlikely(!acct->max_workers))
297 0 : pr_warn_once("io-wq is not configured for unbound workers");
298 :
299 0 : raw_spin_lock(&wqe->lock);
300 0 : if (acct->nr_workers >= acct->max_workers) {
301 0 : raw_spin_unlock(&wqe->lock);
302 0 : return true;
303 : }
304 0 : acct->nr_workers++;
305 0 : raw_spin_unlock(&wqe->lock);
306 0 : atomic_inc(&acct->nr_running);
307 0 : atomic_inc(&wqe->wq->worker_refs);
308 0 : return create_io_worker(wqe->wq, wqe, acct->index);
309 : }
310 :
311 : static void io_wqe_inc_running(struct io_worker *worker)
312 : {
313 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
314 :
315 0 : atomic_inc(&acct->nr_running);
316 : }
317 :
318 0 : static void create_worker_cb(struct callback_head *cb)
319 : {
320 : struct io_worker *worker;
321 : struct io_wq *wq;
322 : struct io_wqe *wqe;
323 : struct io_wqe_acct *acct;
324 0 : bool do_create = false;
325 :
326 0 : worker = container_of(cb, struct io_worker, create_work);
327 0 : wqe = worker->wqe;
328 0 : wq = wqe->wq;
329 0 : acct = &wqe->acct[worker->create_index];
330 0 : raw_spin_lock(&wqe->lock);
331 0 : if (acct->nr_workers < acct->max_workers) {
332 0 : acct->nr_workers++;
333 0 : do_create = true;
334 : }
335 0 : raw_spin_unlock(&wqe->lock);
336 0 : if (do_create) {
337 0 : create_io_worker(wq, wqe, worker->create_index);
338 : } else {
339 0 : atomic_dec(&acct->nr_running);
340 : io_worker_ref_put(wq);
341 : }
342 0 : clear_bit_unlock(0, &worker->create_state);
343 0 : io_worker_release(worker);
344 0 : }
345 :
346 0 : static bool io_queue_worker_create(struct io_worker *worker,
347 : struct io_wqe_acct *acct,
348 : task_work_func_t func)
349 : {
350 0 : struct io_wqe *wqe = worker->wqe;
351 0 : struct io_wq *wq = wqe->wq;
352 :
353 : /* raced with exit, just ignore create call */
354 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
355 : goto fail;
356 0 : if (!io_worker_get(worker))
357 : goto fail;
358 : /*
359 : * create_state manages ownership of create_work/index. We should
360 : * only need one entry per worker, as the worker going to sleep
361 : * will trigger the condition, and waking will clear it once it
362 : * runs the task_work.
363 : */
364 0 : if (test_bit(0, &worker->create_state) ||
365 0 : test_and_set_bit_lock(0, &worker->create_state))
366 : goto fail_release;
367 :
368 0 : atomic_inc(&wq->worker_refs);
369 0 : init_task_work(&worker->create_work, func);
370 0 : worker->create_index = acct->index;
371 0 : if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
372 : /*
373 : * EXIT may have been set after checking it above, check after
374 : * adding the task_work and remove any creation item if it is
375 : * now set. wq exit does that too, but we can have added this
376 : * work item after we canceled in io_wq_exit_workers().
377 : */
378 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
379 0 : io_wq_cancel_tw_create(wq);
380 : io_worker_ref_put(wq);
381 : return true;
382 : }
383 0 : io_worker_ref_put(wq);
384 0 : clear_bit_unlock(0, &worker->create_state);
385 : fail_release:
386 0 : io_worker_release(worker);
387 : fail:
388 0 : atomic_dec(&acct->nr_running);
389 : io_worker_ref_put(wq);
390 : return false;
391 : }
392 :
393 0 : static void io_wqe_dec_running(struct io_worker *worker)
394 : {
395 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
396 0 : struct io_wqe *wqe = worker->wqe;
397 :
398 0 : if (!(worker->flags & IO_WORKER_F_UP))
399 : return;
400 :
401 0 : if (!atomic_dec_and_test(&acct->nr_running))
402 : return;
403 0 : if (!io_acct_run_queue(acct))
404 : return;
405 :
406 0 : atomic_inc(&acct->nr_running);
407 0 : atomic_inc(&wqe->wq->worker_refs);
408 0 : io_queue_worker_create(worker, acct, create_worker_cb);
409 : }
410 :
411 : /*
412 : * Worker will start processing some work. Move it to the busy list, if
413 : * it's currently on the freelist
414 : */
415 : static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
416 : {
417 0 : if (worker->flags & IO_WORKER_F_FREE) {
418 0 : worker->flags &= ~IO_WORKER_F_FREE;
419 0 : raw_spin_lock(&wqe->lock);
420 0 : hlist_nulls_del_init_rcu(&worker->nulls_node);
421 0 : raw_spin_unlock(&wqe->lock);
422 : }
423 : }
424 :
425 : /*
426 : * No work, worker going to sleep. Move to freelist, and unuse mm if we
427 : * have one attached. Dropping the mm may potentially sleep, so we drop
428 : * the lock in that case and return success. Since the caller has to
429 : * retry the loop in that case (we changed task state), we don't regrab
430 : * the lock if we return success.
431 : */
432 : static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
433 : __must_hold(wqe->lock)
434 : {
435 0 : if (!(worker->flags & IO_WORKER_F_FREE)) {
436 0 : worker->flags |= IO_WORKER_F_FREE;
437 0 : hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
438 : }
439 : }
440 :
441 : static inline unsigned int io_get_work_hash(struct io_wq_work *work)
442 : {
443 0 : return work->flags >> IO_WQ_HASH_SHIFT;
444 : }
445 :
446 0 : static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
447 : {
448 0 : struct io_wq *wq = wqe->wq;
449 0 : bool ret = false;
450 :
451 0 : spin_lock_irq(&wq->hash->wait.lock);
452 0 : if (list_empty(&wqe->wait.entry)) {
453 0 : __add_wait_queue(&wq->hash->wait, &wqe->wait);
454 0 : if (!test_bit(hash, &wq->hash->map)) {
455 0 : __set_current_state(TASK_RUNNING);
456 0 : list_del_init(&wqe->wait.entry);
457 0 : ret = true;
458 : }
459 : }
460 0 : spin_unlock_irq(&wq->hash->wait.lock);
461 0 : return ret;
462 : }
463 :
464 0 : static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
465 : struct io_worker *worker)
466 : __must_hold(acct->lock)
467 : {
468 : struct io_wq_work_node *node, *prev;
469 : struct io_wq_work *work, *tail;
470 0 : unsigned int stall_hash = -1U;
471 0 : struct io_wqe *wqe = worker->wqe;
472 :
473 0 : wq_list_for_each(node, prev, &acct->work_list) {
474 : unsigned int hash;
475 :
476 0 : work = container_of(node, struct io_wq_work, list);
477 :
478 : /* not hashed, can run anytime */
479 0 : if (!io_wq_is_hashed(work)) {
480 0 : wq_list_del(&acct->work_list, node, prev);
481 : return work;
482 : }
483 :
484 0 : hash = io_get_work_hash(work);
485 : /* all items with this hash lie in [work, tail] */
486 0 : tail = wqe->hash_tail[hash];
487 :
488 : /* hashed, can run if not already running */
489 0 : if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
490 0 : wqe->hash_tail[hash] = NULL;
491 0 : wq_list_cut(&acct->work_list, &tail->list, prev);
492 : return work;
493 : }
494 0 : if (stall_hash == -1U)
495 0 : stall_hash = hash;
496 : /* fast forward to a next hash, for-each will fix up @prev */
497 0 : node = &tail->list;
498 : }
499 :
500 0 : if (stall_hash != -1U) {
501 : bool unstalled;
502 :
503 : /*
504 : * Set this before dropping the lock to avoid racing with new
505 : * work being added and clearing the stalled bit.
506 : */
507 0 : set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
508 0 : raw_spin_unlock(&acct->lock);
509 0 : unstalled = io_wait_on_hash(wqe, stall_hash);
510 0 : raw_spin_lock(&acct->lock);
511 0 : if (unstalled) {
512 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
513 0 : if (wq_has_sleeper(&wqe->wq->hash->wait))
514 0 : wake_up(&wqe->wq->hash->wait);
515 : }
516 : }
517 :
518 : return NULL;
519 : }
520 :
521 0 : static bool io_flush_signals(void)
522 : {
523 0 : if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
524 0 : __set_current_state(TASK_RUNNING);
525 0 : clear_notify_signal();
526 0 : if (task_work_pending(current))
527 0 : task_work_run();
528 : return true;
529 : }
530 : return false;
531 : }
532 :
533 : static void io_assign_current_work(struct io_worker *worker,
534 : struct io_wq_work *work)
535 : {
536 0 : if (work) {
537 0 : io_flush_signals();
538 0 : cond_resched();
539 : }
540 :
541 0 : raw_spin_lock(&worker->lock);
542 0 : worker->cur_work = work;
543 0 : worker->next_work = NULL;
544 0 : raw_spin_unlock(&worker->lock);
545 : }
546 :
547 : static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
548 :
549 0 : static void io_worker_handle_work(struct io_worker *worker)
550 : {
551 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
552 0 : struct io_wqe *wqe = worker->wqe;
553 0 : struct io_wq *wq = wqe->wq;
554 0 : bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
555 :
556 : do {
557 : struct io_wq_work *work;
558 :
559 : /*
560 : * If we got some work, mark us as busy. If we didn't, but
561 : * the list isn't empty, it means we stalled on hashed work.
562 : * Mark us stalled so we don't keep looking for work when we
563 : * can't make progress, any work completion or insertion will
564 : * clear the stalled flag.
565 : */
566 0 : raw_spin_lock(&acct->lock);
567 0 : work = io_get_next_work(acct, worker);
568 0 : raw_spin_unlock(&acct->lock);
569 0 : if (work) {
570 0 : __io_worker_busy(wqe, worker);
571 :
572 : /*
573 : * Make sure cancelation can find this, even before
574 : * it becomes the active work. That avoids a window
575 : * where the work has been removed from our general
576 : * work list, but isn't yet discoverable as the
577 : * current work item for this worker.
578 : */
579 0 : raw_spin_lock(&worker->lock);
580 0 : worker->next_work = work;
581 0 : raw_spin_unlock(&worker->lock);
582 : } else {
583 : break;
584 : }
585 0 : io_assign_current_work(worker, work);
586 0 : __set_current_state(TASK_RUNNING);
587 :
588 : /* handle a whole dependent link */
589 : do {
590 : struct io_wq_work *next_hashed, *linked;
591 0 : unsigned int hash = io_get_work_hash(work);
592 :
593 0 : next_hashed = wq_next_work(work);
594 :
595 0 : if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
596 0 : work->flags |= IO_WQ_WORK_CANCEL;
597 0 : wq->do_work(work);
598 0 : io_assign_current_work(worker, NULL);
599 :
600 0 : linked = wq->free_work(work);
601 0 : work = next_hashed;
602 0 : if (!work && linked && !io_wq_is_hashed(linked)) {
603 0 : work = linked;
604 0 : linked = NULL;
605 : }
606 0 : io_assign_current_work(worker, work);
607 0 : if (linked)
608 0 : io_wqe_enqueue(wqe, linked);
609 :
610 0 : if (hash != -1U && !next_hashed) {
611 : /* serialize hash clear with wake_up() */
612 0 : spin_lock_irq(&wq->hash->wait.lock);
613 0 : clear_bit(hash, &wq->hash->map);
614 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
615 0 : spin_unlock_irq(&wq->hash->wait.lock);
616 0 : if (wq_has_sleeper(&wq->hash->wait))
617 0 : wake_up(&wq->hash->wait);
618 : }
619 0 : } while (work);
620 : } while (1);
621 0 : }
622 :
623 0 : static int io_wqe_worker(void *data)
624 : {
625 0 : struct io_worker *worker = data;
626 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
627 0 : struct io_wqe *wqe = worker->wqe;
628 0 : struct io_wq *wq = wqe->wq;
629 0 : bool last_timeout = false;
630 : char buf[TASK_COMM_LEN];
631 :
632 0 : worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
633 :
634 0 : snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
635 0 : set_task_comm(current, buf);
636 :
637 0 : audit_alloc_kernel(current);
638 :
639 0 : while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
640 : long ret;
641 :
642 0 : set_current_state(TASK_INTERRUPTIBLE);
643 0 : while (io_acct_run_queue(acct))
644 0 : io_worker_handle_work(worker);
645 :
646 0 : raw_spin_lock(&wqe->lock);
647 : /* timed out, exit unless we're the last worker */
648 0 : if (last_timeout && acct->nr_workers > 1) {
649 0 : acct->nr_workers--;
650 0 : raw_spin_unlock(&wqe->lock);
651 0 : __set_current_state(TASK_RUNNING);
652 0 : break;
653 : }
654 0 : last_timeout = false;
655 0 : __io_worker_idle(wqe, worker);
656 0 : raw_spin_unlock(&wqe->lock);
657 0 : if (io_flush_signals())
658 0 : continue;
659 0 : ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
660 0 : if (signal_pending(current)) {
661 : struct ksignal ksig;
662 :
663 0 : if (!get_signal(&ksig))
664 0 : continue;
665 0 : break;
666 : }
667 0 : last_timeout = !ret;
668 : }
669 :
670 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
671 0 : io_worker_handle_work(worker);
672 :
673 0 : audit_free(current);
674 0 : io_worker_exit(worker);
675 : return 0;
676 : }
677 :
678 : /*
679 : * Called when a worker is scheduled in. Mark us as currently running.
680 : */
681 0 : void io_wq_worker_running(struct task_struct *tsk)
682 : {
683 0 : struct io_worker *worker = tsk->worker_private;
684 :
685 0 : if (!worker)
686 : return;
687 0 : if (!(worker->flags & IO_WORKER_F_UP))
688 : return;
689 0 : if (worker->flags & IO_WORKER_F_RUNNING)
690 : return;
691 0 : worker->flags |= IO_WORKER_F_RUNNING;
692 : io_wqe_inc_running(worker);
693 : }
694 :
695 : /*
696 : * Called when worker is going to sleep. If there are no workers currently
697 : * running and we have work pending, wake up a free one or create a new one.
698 : */
699 0 : void io_wq_worker_sleeping(struct task_struct *tsk)
700 : {
701 0 : struct io_worker *worker = tsk->worker_private;
702 :
703 0 : if (!worker)
704 : return;
705 0 : if (!(worker->flags & IO_WORKER_F_UP))
706 : return;
707 0 : if (!(worker->flags & IO_WORKER_F_RUNNING))
708 : return;
709 :
710 0 : worker->flags &= ~IO_WORKER_F_RUNNING;
711 0 : io_wqe_dec_running(worker);
712 : }
713 :
714 0 : static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
715 : struct task_struct *tsk)
716 : {
717 0 : tsk->worker_private = worker;
718 0 : worker->task = tsk;
719 0 : set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
720 0 : tsk->flags |= PF_NO_SETAFFINITY;
721 :
722 0 : raw_spin_lock(&wqe->lock);
723 0 : hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
724 0 : list_add_tail_rcu(&worker->all_list, &wqe->all_list);
725 0 : worker->flags |= IO_WORKER_F_FREE;
726 0 : raw_spin_unlock(&wqe->lock);
727 0 : wake_up_new_task(tsk);
728 0 : }
729 :
730 0 : static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
731 : {
732 0 : return true;
733 : }
734 :
735 0 : static inline bool io_should_retry_thread(long err)
736 : {
737 : /*
738 : * Prevent perpetual task_work retry, if the task (or its group) is
739 : * exiting.
740 : */
741 0 : if (fatal_signal_pending(current))
742 : return false;
743 :
744 0 : switch (err) {
745 : case -EAGAIN:
746 : case -ERESTARTSYS:
747 : case -ERESTARTNOINTR:
748 : case -ERESTARTNOHAND:
749 : return true;
750 : default:
751 0 : return false;
752 : }
753 : }
754 :
755 0 : static void create_worker_cont(struct callback_head *cb)
756 : {
757 : struct io_worker *worker;
758 : struct task_struct *tsk;
759 : struct io_wqe *wqe;
760 :
761 0 : worker = container_of(cb, struct io_worker, create_work);
762 0 : clear_bit_unlock(0, &worker->create_state);
763 0 : wqe = worker->wqe;
764 0 : tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
765 0 : if (!IS_ERR(tsk)) {
766 0 : io_init_new_worker(wqe, worker, tsk);
767 0 : io_worker_release(worker);
768 0 : return;
769 0 : } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
770 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
771 :
772 0 : atomic_dec(&acct->nr_running);
773 0 : raw_spin_lock(&wqe->lock);
774 0 : acct->nr_workers--;
775 0 : if (!acct->nr_workers) {
776 0 : struct io_cb_cancel_data match = {
777 : .fn = io_wq_work_match_all,
778 : .cancel_all = true,
779 : };
780 :
781 0 : raw_spin_unlock(&wqe->lock);
782 0 : while (io_acct_cancel_pending_work(wqe, acct, &match))
783 : ;
784 : } else {
785 0 : raw_spin_unlock(&wqe->lock);
786 : }
787 0 : io_worker_ref_put(wqe->wq);
788 0 : kfree(worker);
789 0 : return;
790 : }
791 :
792 : /* re-create attempts grab a new worker ref, drop the existing one */
793 0 : io_worker_release(worker);
794 0 : schedule_work(&worker->work);
795 : }
796 :
797 0 : static void io_workqueue_create(struct work_struct *work)
798 : {
799 0 : struct io_worker *worker = container_of(work, struct io_worker, work);
800 0 : struct io_wqe_acct *acct = io_wqe_get_acct(worker);
801 :
802 0 : if (!io_queue_worker_create(worker, acct, create_worker_cont))
803 0 : kfree(worker);
804 0 : }
805 :
806 0 : static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
807 : {
808 0 : struct io_wqe_acct *acct = &wqe->acct[index];
809 : struct io_worker *worker;
810 : struct task_struct *tsk;
811 :
812 0 : __set_current_state(TASK_RUNNING);
813 :
814 0 : worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
815 0 : if (!worker) {
816 : fail:
817 0 : atomic_dec(&acct->nr_running);
818 0 : raw_spin_lock(&wqe->lock);
819 0 : acct->nr_workers--;
820 0 : raw_spin_unlock(&wqe->lock);
821 : io_worker_ref_put(wq);
822 : return false;
823 : }
824 :
825 0 : refcount_set(&worker->ref, 1);
826 0 : worker->wqe = wqe;
827 : raw_spin_lock_init(&worker->lock);
828 0 : init_completion(&worker->ref_done);
829 :
830 0 : if (index == IO_WQ_ACCT_BOUND)
831 0 : worker->flags |= IO_WORKER_F_BOUND;
832 :
833 0 : tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
834 0 : if (!IS_ERR(tsk)) {
835 0 : io_init_new_worker(wqe, worker, tsk);
836 0 : } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
837 0 : kfree(worker);
838 0 : goto fail;
839 : } else {
840 0 : INIT_WORK(&worker->work, io_workqueue_create);
841 0 : schedule_work(&worker->work);
842 : }
843 :
844 : return true;
845 : }
846 :
847 : /*
848 : * Iterate the passed in list and call the specific function for each
849 : * worker that isn't exiting
850 : */
851 0 : static bool io_wq_for_each_worker(struct io_wqe *wqe,
852 : bool (*func)(struct io_worker *, void *),
853 : void *data)
854 : {
855 : struct io_worker *worker;
856 0 : bool ret = false;
857 :
858 0 : list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
859 0 : if (io_worker_get(worker)) {
860 : /* no task if node is/was offline */
861 0 : if (worker->task)
862 0 : ret = func(worker, data);
863 0 : io_worker_release(worker);
864 0 : if (ret)
865 : break;
866 : }
867 : }
868 :
869 0 : return ret;
870 : }
871 :
872 0 : static bool io_wq_worker_wake(struct io_worker *worker, void *data)
873 : {
874 0 : set_notify_signal(worker->task);
875 0 : wake_up_process(worker->task);
876 0 : return false;
877 : }
878 :
879 : static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
880 : {
881 0 : struct io_wq *wq = wqe->wq;
882 :
883 : do {
884 0 : work->flags |= IO_WQ_WORK_CANCEL;
885 0 : wq->do_work(work);
886 0 : work = wq->free_work(work);
887 0 : } while (work);
888 : }
889 :
890 0 : static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
891 : {
892 0 : struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
893 : unsigned int hash;
894 : struct io_wq_work *tail;
895 :
896 0 : if (!io_wq_is_hashed(work)) {
897 : append:
898 0 : wq_list_add_tail(&work->list, &acct->work_list);
899 : return;
900 : }
901 :
902 0 : hash = io_get_work_hash(work);
903 0 : tail = wqe->hash_tail[hash];
904 0 : wqe->hash_tail[hash] = work;
905 0 : if (!tail)
906 : goto append;
907 :
908 0 : wq_list_add_after(&work->list, &tail->list, &acct->work_list);
909 : }
910 :
911 0 : static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
912 : {
913 0 : return work == data;
914 : }
915 :
916 0 : static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
917 : {
918 0 : struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
919 : struct io_cb_cancel_data match;
920 0 : unsigned work_flags = work->flags;
921 : bool do_create;
922 :
923 : /*
924 : * If io-wq is exiting for this task, or if the request has explicitly
925 : * been marked as one that should not get executed, cancel it here.
926 : */
927 0 : if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
928 0 : (work->flags & IO_WQ_WORK_CANCEL)) {
929 : io_run_cancel(work, wqe);
930 0 : return;
931 : }
932 :
933 0 : raw_spin_lock(&acct->lock);
934 0 : io_wqe_insert_work(wqe, work);
935 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
936 0 : raw_spin_unlock(&acct->lock);
937 :
938 0 : raw_spin_lock(&wqe->lock);
939 : rcu_read_lock();
940 0 : do_create = !io_wqe_activate_free_worker(wqe, acct);
941 : rcu_read_unlock();
942 :
943 0 : raw_spin_unlock(&wqe->lock);
944 :
945 0 : if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
946 0 : !atomic_read(&acct->nr_running))) {
947 : bool did_create;
948 :
949 0 : did_create = io_wqe_create_worker(wqe, acct);
950 0 : if (likely(did_create))
951 : return;
952 :
953 0 : raw_spin_lock(&wqe->lock);
954 0 : if (acct->nr_workers) {
955 0 : raw_spin_unlock(&wqe->lock);
956 0 : return;
957 : }
958 0 : raw_spin_unlock(&wqe->lock);
959 :
960 : /* fatal condition, failed to create the first worker */
961 0 : match.fn = io_wq_work_match_item,
962 0 : match.data = work,
963 0 : match.cancel_all = false,
964 :
965 0 : io_acct_cancel_pending_work(wqe, acct, &match);
966 : }
967 : }
968 :
969 0 : void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
970 : {
971 0 : struct io_wqe *wqe = wq->wqes[numa_node_id()];
972 :
973 0 : io_wqe_enqueue(wqe, work);
974 0 : }
975 :
976 : /*
977 : * Work items that hash to the same value will not be done in parallel.
978 : * Used to limit concurrent writes, generally hashed by inode.
979 : */
980 0 : void io_wq_hash_work(struct io_wq_work *work, void *val)
981 : {
982 : unsigned int bit;
983 :
984 0 : bit = hash_ptr(val, IO_WQ_HASH_ORDER);
985 0 : work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
986 0 : }
987 :
988 0 : static bool __io_wq_worker_cancel(struct io_worker *worker,
989 : struct io_cb_cancel_data *match,
990 : struct io_wq_work *work)
991 : {
992 0 : if (work && match->fn(work, match->data)) {
993 0 : work->flags |= IO_WQ_WORK_CANCEL;
994 0 : set_notify_signal(worker->task);
995 : return true;
996 : }
997 :
998 : return false;
999 : }
1000 :
1001 0 : static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
1002 : {
1003 0 : struct io_cb_cancel_data *match = data;
1004 :
1005 : /*
1006 : * Hold the lock to avoid ->cur_work going out of scope, caller
1007 : * may dereference the passed in work.
1008 : */
1009 0 : raw_spin_lock(&worker->lock);
1010 0 : if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
1011 0 : __io_wq_worker_cancel(worker, match, worker->next_work))
1012 0 : match->nr_running++;
1013 0 : raw_spin_unlock(&worker->lock);
1014 :
1015 0 : return match->nr_running && !match->cancel_all;
1016 : }
1017 :
1018 0 : static inline void io_wqe_remove_pending(struct io_wqe *wqe,
1019 : struct io_wq_work *work,
1020 : struct io_wq_work_node *prev)
1021 : {
1022 0 : struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
1023 0 : unsigned int hash = io_get_work_hash(work);
1024 0 : struct io_wq_work *prev_work = NULL;
1025 :
1026 0 : if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
1027 0 : if (prev)
1028 0 : prev_work = container_of(prev, struct io_wq_work, list);
1029 0 : if (prev_work && io_get_work_hash(prev_work) == hash)
1030 0 : wqe->hash_tail[hash] = prev_work;
1031 : else
1032 0 : wqe->hash_tail[hash] = NULL;
1033 : }
1034 0 : wq_list_del(&acct->work_list, &work->list, prev);
1035 0 : }
1036 :
1037 0 : static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
1038 : struct io_wqe_acct *acct,
1039 : struct io_cb_cancel_data *match)
1040 : {
1041 : struct io_wq_work_node *node, *prev;
1042 : struct io_wq_work *work;
1043 :
1044 0 : raw_spin_lock(&acct->lock);
1045 0 : wq_list_for_each(node, prev, &acct->work_list) {
1046 0 : work = container_of(node, struct io_wq_work, list);
1047 0 : if (!match->fn(work, match->data))
1048 0 : continue;
1049 0 : io_wqe_remove_pending(wqe, work, prev);
1050 0 : raw_spin_unlock(&acct->lock);
1051 0 : io_run_cancel(work, wqe);
1052 0 : match->nr_pending++;
1053 : /* not safe to continue after unlock */
1054 : return true;
1055 : }
1056 0 : raw_spin_unlock(&acct->lock);
1057 :
1058 : return false;
1059 : }
1060 :
1061 0 : static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1062 : struct io_cb_cancel_data *match)
1063 : {
1064 : int i;
1065 : retry:
1066 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1067 0 : struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1068 :
1069 0 : if (io_acct_cancel_pending_work(wqe, acct, match)) {
1070 0 : if (match->cancel_all)
1071 : goto retry;
1072 : break;
1073 : }
1074 : }
1075 0 : }
1076 :
1077 : static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1078 : struct io_cb_cancel_data *match)
1079 : {
1080 : rcu_read_lock();
1081 0 : io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1082 : rcu_read_unlock();
1083 : }
1084 :
1085 0 : enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1086 : void *data, bool cancel_all)
1087 : {
1088 0 : struct io_cb_cancel_data match = {
1089 : .fn = cancel,
1090 : .data = data,
1091 : .cancel_all = cancel_all,
1092 : };
1093 : int node;
1094 :
1095 : /*
1096 : * First check pending list, if we're lucky we can just remove it
1097 : * from there. CANCEL_OK means that the work is returned as-new,
1098 : * no completion will be posted for it.
1099 : *
1100 : * Then check if a free (going busy) or busy worker has the work
1101 : * currently running. If we find it there, we'll return CANCEL_RUNNING
1102 : * as an indication that we attempt to signal cancellation. The
1103 : * completion will run normally in this case.
1104 : *
1105 : * Do both of these while holding the wqe->lock, to ensure that
1106 : * we'll find a work item regardless of state.
1107 : */
1108 0 : for_each_node(node) {
1109 0 : struct io_wqe *wqe = wq->wqes[node];
1110 :
1111 0 : io_wqe_cancel_pending_work(wqe, &match);
1112 0 : if (match.nr_pending && !match.cancel_all)
1113 : return IO_WQ_CANCEL_OK;
1114 :
1115 0 : raw_spin_lock(&wqe->lock);
1116 0 : io_wqe_cancel_running_work(wqe, &match);
1117 0 : raw_spin_unlock(&wqe->lock);
1118 0 : if (match.nr_running && !match.cancel_all)
1119 : return IO_WQ_CANCEL_RUNNING;
1120 : }
1121 :
1122 0 : if (match.nr_running)
1123 : return IO_WQ_CANCEL_RUNNING;
1124 0 : if (match.nr_pending)
1125 : return IO_WQ_CANCEL_OK;
1126 0 : return IO_WQ_CANCEL_NOTFOUND;
1127 : }
1128 :
1129 0 : static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1130 : int sync, void *key)
1131 : {
1132 0 : struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1133 : int i;
1134 :
1135 0 : list_del_init(&wait->entry);
1136 :
1137 : rcu_read_lock();
1138 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1139 0 : struct io_wqe_acct *acct = &wqe->acct[i];
1140 :
1141 0 : if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1142 0 : io_wqe_activate_free_worker(wqe, acct);
1143 : }
1144 : rcu_read_unlock();
1145 0 : return 1;
1146 : }
1147 :
1148 0 : struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1149 : {
1150 : int ret, node, i;
1151 : struct io_wq *wq;
1152 :
1153 0 : if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1154 : return ERR_PTR(-EINVAL);
1155 0 : if (WARN_ON_ONCE(!bounded))
1156 : return ERR_PTR(-EINVAL);
1157 :
1158 0 : wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1159 0 : if (!wq)
1160 : return ERR_PTR(-ENOMEM);
1161 0 : ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1162 0 : if (ret)
1163 : goto err_wq;
1164 :
1165 0 : refcount_inc(&data->hash->refs);
1166 0 : wq->hash = data->hash;
1167 0 : wq->free_work = data->free_work;
1168 0 : wq->do_work = data->do_work;
1169 :
1170 0 : ret = -ENOMEM;
1171 0 : for_each_node(node) {
1172 : struct io_wqe *wqe;
1173 0 : int alloc_node = node;
1174 :
1175 0 : if (!node_online(alloc_node))
1176 : alloc_node = NUMA_NO_NODE;
1177 0 : wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1178 0 : if (!wqe)
1179 : goto err;
1180 0 : if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1181 : goto err;
1182 0 : cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1183 0 : wq->wqes[node] = wqe;
1184 0 : wqe->node = alloc_node;
1185 0 : wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1186 0 : wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1187 0 : task_rlimit(current, RLIMIT_NPROC);
1188 0 : INIT_LIST_HEAD(&wqe->wait.entry);
1189 0 : wqe->wait.func = io_wqe_hash_wake;
1190 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1191 0 : struct io_wqe_acct *acct = &wqe->acct[i];
1192 :
1193 0 : acct->index = i;
1194 0 : atomic_set(&acct->nr_running, 0);
1195 0 : INIT_WQ_LIST(&acct->work_list);
1196 : raw_spin_lock_init(&acct->lock);
1197 : }
1198 0 : wqe->wq = wq;
1199 : raw_spin_lock_init(&wqe->lock);
1200 0 : INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1201 0 : INIT_LIST_HEAD(&wqe->all_list);
1202 : }
1203 :
1204 0 : wq->task = get_task_struct(data->task);
1205 0 : atomic_set(&wq->worker_refs, 1);
1206 0 : init_completion(&wq->worker_done);
1207 0 : return wq;
1208 : err:
1209 0 : io_wq_put_hash(data->hash);
1210 0 : cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1211 0 : for_each_node(node) {
1212 0 : if (!wq->wqes[node])
1213 0 : continue;
1214 0 : free_cpumask_var(wq->wqes[node]->cpu_mask);
1215 0 : kfree(wq->wqes[node]);
1216 : }
1217 : err_wq:
1218 0 : kfree(wq);
1219 0 : return ERR_PTR(ret);
1220 : }
1221 :
1222 0 : static bool io_task_work_match(struct callback_head *cb, void *data)
1223 : {
1224 : struct io_worker *worker;
1225 :
1226 0 : if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1227 : return false;
1228 0 : worker = container_of(cb, struct io_worker, create_work);
1229 0 : return worker->wqe->wq == data;
1230 : }
1231 :
1232 0 : void io_wq_exit_start(struct io_wq *wq)
1233 : {
1234 0 : set_bit(IO_WQ_BIT_EXIT, &wq->state);
1235 0 : }
1236 :
1237 0 : static void io_wq_cancel_tw_create(struct io_wq *wq)
1238 : {
1239 : struct callback_head *cb;
1240 :
1241 0 : while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1242 : struct io_worker *worker;
1243 :
1244 0 : worker = container_of(cb, struct io_worker, create_work);
1245 0 : io_worker_cancel_cb(worker);
1246 : }
1247 0 : }
1248 :
1249 0 : static void io_wq_exit_workers(struct io_wq *wq)
1250 : {
1251 : int node;
1252 :
1253 0 : if (!wq->task)
1254 : return;
1255 :
1256 0 : io_wq_cancel_tw_create(wq);
1257 :
1258 : rcu_read_lock();
1259 0 : for_each_node(node) {
1260 0 : struct io_wqe *wqe = wq->wqes[node];
1261 :
1262 0 : io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1263 : }
1264 0 : rcu_read_unlock();
1265 0 : io_worker_ref_put(wq);
1266 0 : wait_for_completion(&wq->worker_done);
1267 :
1268 0 : for_each_node(node) {
1269 0 : spin_lock_irq(&wq->hash->wait.lock);
1270 0 : list_del_init(&wq->wqes[node]->wait.entry);
1271 0 : spin_unlock_irq(&wq->hash->wait.lock);
1272 : }
1273 0 : put_task_struct(wq->task);
1274 0 : wq->task = NULL;
1275 : }
1276 :
1277 0 : static void io_wq_destroy(struct io_wq *wq)
1278 : {
1279 : int node;
1280 :
1281 0 : cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1282 :
1283 0 : for_each_node(node) {
1284 0 : struct io_wqe *wqe = wq->wqes[node];
1285 0 : struct io_cb_cancel_data match = {
1286 : .fn = io_wq_work_match_all,
1287 : .cancel_all = true,
1288 : };
1289 0 : io_wqe_cancel_pending_work(wqe, &match);
1290 0 : free_cpumask_var(wqe->cpu_mask);
1291 0 : kfree(wqe);
1292 : }
1293 0 : io_wq_put_hash(wq->hash);
1294 0 : kfree(wq);
1295 0 : }
1296 :
1297 0 : void io_wq_put_and_exit(struct io_wq *wq)
1298 : {
1299 0 : WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1300 :
1301 0 : io_wq_exit_workers(wq);
1302 0 : io_wq_destroy(wq);
1303 0 : }
1304 :
1305 : struct online_data {
1306 : unsigned int cpu;
1307 : bool online;
1308 : };
1309 :
1310 0 : static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1311 : {
1312 0 : struct online_data *od = data;
1313 :
1314 0 : if (od->online)
1315 0 : cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1316 : else
1317 0 : cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1318 0 : return false;
1319 : }
1320 :
1321 : static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1322 : {
1323 0 : struct online_data od = {
1324 : .cpu = cpu,
1325 : .online = online
1326 : };
1327 : int i;
1328 :
1329 : rcu_read_lock();
1330 0 : for_each_node(i)
1331 0 : io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1332 : rcu_read_unlock();
1333 : return 0;
1334 : }
1335 :
1336 0 : static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1337 : {
1338 0 : struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1339 :
1340 0 : return __io_wq_cpu_online(wq, cpu, true);
1341 : }
1342 :
1343 0 : static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1344 : {
1345 0 : struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1346 :
1347 0 : return __io_wq_cpu_online(wq, cpu, false);
1348 : }
1349 :
1350 0 : int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1351 : {
1352 : int i;
1353 :
1354 : rcu_read_lock();
1355 0 : for_each_node(i) {
1356 0 : struct io_wqe *wqe = wq->wqes[i];
1357 :
1358 0 : if (mask)
1359 0 : cpumask_copy(wqe->cpu_mask, mask);
1360 : else
1361 0 : cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1362 : }
1363 : rcu_read_unlock();
1364 0 : return 0;
1365 : }
1366 :
1367 : /*
1368 : * Set max number of unbounded workers, returns old value. If new_count is 0,
1369 : * then just return the old value.
1370 : */
1371 0 : int io_wq_max_workers(struct io_wq *wq, int *new_count)
1372 : {
1373 : int prev[IO_WQ_ACCT_NR];
1374 0 : bool first_node = true;
1375 : int i, node;
1376 :
1377 : BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1378 : BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1379 : BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1380 :
1381 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1382 0 : if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1383 0 : new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1384 : }
1385 :
1386 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++)
1387 0 : prev[i] = 0;
1388 :
1389 : rcu_read_lock();
1390 0 : for_each_node(node) {
1391 0 : struct io_wqe *wqe = wq->wqes[node];
1392 : struct io_wqe_acct *acct;
1393 :
1394 0 : raw_spin_lock(&wqe->lock);
1395 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1396 0 : acct = &wqe->acct[i];
1397 0 : if (first_node)
1398 0 : prev[i] = max_t(int, acct->max_workers, prev[i]);
1399 0 : if (new_count[i])
1400 0 : acct->max_workers = new_count[i];
1401 : }
1402 0 : raw_spin_unlock(&wqe->lock);
1403 0 : first_node = false;
1404 : }
1405 : rcu_read_unlock();
1406 :
1407 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++)
1408 0 : new_count[i] = prev[i];
1409 :
1410 0 : return 0;
1411 : }
1412 :
1413 1 : static __init int io_wq_init(void)
1414 : {
1415 : int ret;
1416 :
1417 1 : ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1418 : io_wq_cpu_online, io_wq_cpu_offline);
1419 1 : if (ret < 0)
1420 : return ret;
1421 1 : io_wq_online = ret;
1422 1 : return 0;
1423 : }
1424 : subsys_initcall(io_wq_init);
|