1/*
2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28/* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29
30#include <sys/cdefs.h>
31
32// <rdar://problem/26158937> panic() should be marked noreturn
33extern void panic(const char *string, ...) __printflike(1,2) __dead2;
34
35#include <kern/assert.h>
36#include <kern/ast.h>
37#include <kern/clock.h>
38#include <kern/cpu_data.h>
39#include <kern/kern_types.h>
40#include <kern/policy_internal.h>
41#include <kern/processor.h>
42#include <kern/sched_prim.h> /* for thread_exception_return */
43#include <kern/task.h>
44#include <kern/thread.h>
45#include <kern/zalloc.h>
46#include <mach/kern_return.h>
47#include <mach/mach_param.h>
48#include <mach/mach_port.h>
49#include <mach/mach_types.h>
50#include <mach/mach_vm.h>
51#include <mach/sync_policy.h>
52#include <mach/task.h>
53#include <mach/thread_act.h> /* for thread_resume */
54#include <mach/thread_policy.h>
55#include <mach/thread_status.h>
56#include <mach/vm_prot.h>
57#include <mach/vm_statistics.h>
58#include <machine/atomic.h>
59#include <machine/machine_routines.h>
60#include <vm/vm_map.h>
61#include <vm/vm_protos.h>
62
63#include <sys/eventvar.h>
64#include <sys/kdebug.h>
65#include <sys/kernel.h>
66#include <sys/lock.h>
67#include <sys/param.h>
68#include <sys/proc_info.h> /* for fill_procworkqueue */
69#include <sys/proc_internal.h>
70#include <sys/pthread_shims.h>
71#include <sys/resourcevar.h>
72#include <sys/signalvar.h>
73#include <sys/sysctl.h>
74#include <sys/sysproto.h>
75#include <sys/systm.h>
76#include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
77
78#include <pthread/bsdthread_private.h>
79#include <pthread/workqueue_syscalls.h>
80#include <pthread/workqueue_internal.h>
81#include <pthread/workqueue_trace.h>
82
83#include <os/log.h>
84
85extern thread_t port_name_to_thread(mach_port_name_t port_name); /* osfmk/kern/ipc_tt.h */
86
87static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
88static void workq_schedule_creator(proc_t p, struct workqueue *wq, int flags);
89
90static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
91 workq_threadreq_t req);
92
93static uint32_t workq_constrained_allowance(struct workqueue *wq,
94 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
95
96static bool workq_thread_is_busy(uint64_t cur_ts,
97 _Atomic uint64_t *lastblocked_tsp);
98
99static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100
101#pragma mark globals
102
103struct workq_usec_var {
104 uint32_t usecs;
105 uint64_t abstime;
106};
107
108#define WORKQ_SYSCTL_USECS(var, init) \
109 static struct workq_usec_var var = { .usecs = init }; \
110 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
111 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
112 workq_sysctl_handle_usecs, "I", "")
113
114static lck_grp_t *workq_lck_grp;
115static lck_attr_t *workq_lck_attr;
116static lck_grp_attr_t *workq_lck_grp_attr;
117os_refgrp_decl(static, workq_refgrp, "workq", NULL);
118
119static zone_t workq_zone_workqueue;
120static zone_t workq_zone_threadreq;
121
122WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
123WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
124WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
125static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
126static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
127static uint32_t wq_init_constrained_limit = 1;
128static uint16_t wq_death_max_load;
129static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
130
131#pragma mark sysctls
132
133static int
134workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
135{
136#pragma unused(arg2)
137 struct workq_usec_var *v = arg1;
138 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
139 if (error || !req->newptr)
140 return error;
141 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
142 &v->abstime);
143 return 0;
144}
145
146SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
147 &wq_max_threads, 0, "");
148
149SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
150 &wq_max_constrained_threads, 0, "");
151
152#pragma mark p_wqptr
153
154#define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
155
156static struct workqueue *
157proc_get_wqptr_fast(struct proc *p)
158{
159 return os_atomic_load(&p->p_wqptr, relaxed);
160}
161
162static struct workqueue *
163proc_get_wqptr(struct proc *p)
164{
165 struct workqueue *wq = proc_get_wqptr_fast(p);
166 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
167}
168
169static void
170proc_set_wqptr(struct proc *p, struct workqueue *wq)
171{
172 wq = os_atomic_xchg(&p->p_wqptr, wq, release);
173 if (wq == WQPTR_IS_INITING_VALUE) {
174 proc_lock(p);
175 thread_wakeup(&p->p_wqptr);
176 proc_unlock(p);
177 }
178}
179
180static bool
181proc_init_wqptr_or_wait(struct proc *p)
182{
183 struct workqueue *wq;
184
185 proc_lock(p);
186 wq = p->p_wqptr;
187
188 if (wq == NULL) {
189 p->p_wqptr = WQPTR_IS_INITING_VALUE;
190 proc_unlock(p);
191 return true;
192 }
193
194 if (wq == WQPTR_IS_INITING_VALUE) {
195 assert_wait(&p->p_wqptr, THREAD_UNINT);
196 proc_unlock(p);
197 thread_block(THREAD_CONTINUE_NULL);
198 } else {
199 proc_unlock(p);
200 }
201 return false;
202}
203
204static inline event_t
205workq_parked_wait_event(struct uthread *uth)
206{
207 return (event_t)&uth->uu_workq_stackaddr;
208}
209
210static inline void
211workq_thread_wakeup(struct uthread *uth)
212{
213 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
214 thread_wakeup_thread(workq_parked_wait_event(uth), uth->uu_thread);
215 }
216}
217
218#pragma mark wq_thactive
219
220#if defined(__LP64__)
221// Layout is:
222// 127 - 115 : 13 bits of zeroes
223// 114 - 112 : best QoS among all pending constrained requests
224// 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
225#define WQ_THACTIVE_BUCKET_WIDTH 16
226#define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
227#else
228// Layout is:
229// 63 - 61 : best QoS among all pending constrained requests
230// 60 : Manager bucket (0 or 1)
231// 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
232#define WQ_THACTIVE_BUCKET_WIDTH 10
233#define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
234#endif
235#define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
236#define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
237
238static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
239 "Make sure we have space to encode a QoS");
240
241static inline wq_thactive_t
242_wq_thactive(struct workqueue *wq)
243{
244 return os_atomic_load(&wq->wq_thactive, relaxed);
245}
246
247static inline int
248_wq_bucket(thread_qos_t qos)
249{
250 // Map both BG and MT to the same bucket by over-shifting down and
251 // clamping MT and BG together.
252 switch (qos) {
253 case THREAD_QOS_MAINTENANCE:
254 return 0;
255 default:
256 return qos - 2;
257 }
258}
259
260#define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
261 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
262
263static inline thread_qos_t
264_wq_thactive_best_constrained_req_qos(struct workqueue *wq)
265{
266 // Avoid expensive atomic operations: the three bits we're loading are in
267 // a single byte, and always updated under the workqueue lock
268 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
269 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
270}
271
272static void
273_wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
274{
275 thread_qos_t old_qos, new_qos;
276 workq_threadreq_t req;
277
278 req = priority_queue_max(&wq->wq_constrained_queue,
279 struct workq_threadreq_s, tr_entry);
280 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
281 old_qos = _wq_thactive_best_constrained_req_qos(wq);
282 if (old_qos != new_qos) {
283 long delta = (long)new_qos - (long)old_qos;
284 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
285 /*
286 * We can do an atomic add relative to the initial load because updates
287 * to this qos are always serialized under the workqueue lock.
288 */
289 v = os_atomic_add(&wq->wq_thactive, v, relaxed);
290#ifdef __LP64__
291 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
292 (uint64_t)(v >> 64), 0, 0);
293#else
294 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0, 0);
295#endif
296 }
297}
298
299static inline wq_thactive_t
300_wq_thactive_offset_for_qos(thread_qos_t qos)
301{
302 return (wq_thactive_t)1 << (_wq_bucket(qos) * WQ_THACTIVE_BUCKET_WIDTH);
303}
304
305static inline wq_thactive_t
306_wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
307{
308 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
309 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
310}
311
312static inline wq_thactive_t
313_wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
314{
315 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
316 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
317}
318
319static inline void
320_wq_thactive_move(struct workqueue *wq,
321 thread_qos_t old_qos, thread_qos_t new_qos)
322{
323 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
324 _wq_thactive_offset_for_qos(old_qos);
325 os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
326 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
327 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
328}
329
330static inline uint32_t
331_wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
332 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
333{
334 uint32_t count = 0, active;
335 uint64_t curtime;
336
337 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
338
339 if (busycount) {
340 curtime = mach_absolute_time();
341 *busycount = 0;
342 }
343 if (max_busycount) {
344 *max_busycount = THREAD_QOS_LAST - qos;
345 }
346
347 int i = _wq_bucket(qos);
348 v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
349 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
350 active = v & WQ_THACTIVE_BUCKET_MASK;
351 count += active;
352
353 if (busycount && wq->wq_thscheduled_count[i] > active) {
354 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
355 /*
356 * We only consider the last blocked thread for a given bucket
357 * as busy because we don't want to take the list lock in each
358 * sched callback. However this is an approximation that could
359 * contribute to thread creation storms.
360 */
361 (*busycount)++;
362 }
363 }
364 }
365
366 return count;
367}
368
369#pragma mark wq_flags
370
371static inline uint32_t
372_wq_flags(struct workqueue *wq)
373{
374 return os_atomic_load(&wq->wq_flags, relaxed);
375}
376
377static inline bool
378_wq_exiting(struct workqueue *wq)
379{
380 return _wq_flags(wq) & WQ_EXITING;
381}
382
383bool
384workq_is_exiting(struct proc *p)
385{
386 struct workqueue *wq = proc_get_wqptr(p);
387 return !wq || _wq_exiting(wq);
388}
389
390struct turnstile *
391workq_turnstile(struct proc *p)
392{
393 struct workqueue *wq = proc_get_wqptr(p);
394 return wq ? wq->wq_turnstile : TURNSTILE_NULL;
395}
396
397#pragma mark workqueue lock
398
399static bool
400workq_lock_spin_is_acquired_kdp(struct workqueue *wq)
401{
402 return kdp_lck_spin_is_acquired(&wq->wq_lock);
403}
404
405static inline void
406workq_lock_spin(struct workqueue *wq)
407{
408 lck_spin_lock(&wq->wq_lock);
409}
410
411static inline void
412workq_lock_held(__assert_only struct workqueue *wq)
413{
414 LCK_SPIN_ASSERT(&wq->wq_lock, LCK_ASSERT_OWNED);
415}
416
417static inline bool
418workq_lock_try(struct workqueue *wq)
419{
420 return lck_spin_try_lock(&wq->wq_lock);
421}
422
423static inline void
424workq_unlock(struct workqueue *wq)
425{
426 lck_spin_unlock(&wq->wq_lock);
427}
428
429#pragma mark idle thread lists
430
431#define WORKQ_POLICY_INIT(qos) \
432 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
433
434static inline thread_qos_t
435workq_pri_bucket(struct uu_workq_policy req)
436{
437 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
438}
439
440static inline thread_qos_t
441workq_pri_override(struct uu_workq_policy req)
442{
443 return MAX(workq_pri_bucket(req), req.qos_bucket);
444}
445
446static inline bool
447workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
448{
449 workq_threadreq_param_t cur_trp, req_trp = { };
450
451 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
452 if (req->tr_flags & TR_FLAG_WL_PARAMS) {
453 req_trp = kqueue_threadreq_workloop_param(req);
454 }
455
456 /*
457 * CPU percent flags are handled separately to policy changes, so ignore
458 * them for all of these checks.
459 */
460 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
461 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
462
463 if (!req_flags && !cur_flags) {
464 return false;
465 }
466
467 if (req_flags != cur_flags) {
468 return true;
469 }
470
471 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
472 return true;
473 }
474
475 if ((req_flags & TRP_POLICY) && cur_trp.trp_pol != cur_trp.trp_pol) {
476 return true;
477 }
478
479 return false;
480}
481
482static inline bool
483workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
484{
485 if (workq_thread_needs_params_change(req, uth)) {
486 return true;
487 }
488
489 return req->tr_qos != workq_pri_override(uth->uu_workq_pri);
490}
491
492static void
493workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
494 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
495 bool force_run)
496{
497 thread_qos_t old_bucket = old_pri.qos_bucket;
498 thread_qos_t new_bucket = workq_pri_bucket(new_pri);
499
500 if (old_bucket != new_bucket) {
501 _wq_thactive_move(wq, old_bucket, new_bucket);
502 }
503
504 new_pri.qos_bucket = new_bucket;
505 uth->uu_workq_pri = new_pri;
506
507 if (workq_pri_override(old_pri) != new_bucket) {
508 thread_set_workq_override(uth->uu_thread, new_bucket);
509 }
510
511 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
512 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
513 if (old_bucket > new_bucket) {
514 /*
515 * When lowering our bucket, we may unblock a thread request,
516 * but we can't drop our priority before we have evaluated
517 * whether this is the case, and if we ever drop the workqueue lock
518 * that would cause a priority inversion.
519 *
520 * We hence have to disallow thread creation in that case.
521 */
522 flags = 0;
523 }
524 workq_schedule_creator(p, wq, flags);
525 }
526}
527
528/*
529 * Sets/resets the cpu percent limits on the current thread. We can't set
530 * these limits from outside of the current thread, so this function needs
531 * to be called when we're executing on the intended
532 */
533static void
534workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
535{
536 assert(uth == current_uthread());
537 workq_threadreq_param_t trp = { };
538
539 if (req && (req->tr_flags & TR_FLAG_WL_PARAMS)) {
540 trp = kqueue_threadreq_workloop_param(req);
541 }
542
543 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
544 /*
545 * Going through disable when we have an existing CPU percent limit
546 * set will force the ledger to refill the token bucket of the current
547 * thread. Removing any penalty applied by previous thread use.
548 */
549 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
550 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
551 }
552
553 if (trp.trp_flags & TRP_CPUPERCENT) {
554 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
555 (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
556 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
557 }
558}
559
560static void
561workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
562 workq_threadreq_t req)
563{
564 thread_t th = uth->uu_thread;
565 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
566 workq_threadreq_param_t trp = { };
567 int priority = 31;
568 int policy = POLICY_TIMESHARE;
569
570 if (req && (req->tr_flags & TR_FLAG_WL_PARAMS)) {
571 trp = kqueue_threadreq_workloop_param(req);
572 }
573
574 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
575 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
576 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
577
578 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
579 uth->uu_save.uus_workq_park_data.qos = qos;
580
581 if (qos == WORKQ_THREAD_QOS_MANAGER) {
582 uint32_t mgr_pri = wq->wq_event_manager_priority;
583 assert(trp.trp_value == 0); // manager qos and thread policy don't mix
584
585 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
586 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
587 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
588 POLICY_TIMESHARE);
589 return;
590 }
591
592 qos = _pthread_priority_thread_qos(mgr_pri);
593 } else {
594 if (trp.trp_flags & TRP_PRIORITY) {
595 qos = THREAD_QOS_UNSPECIFIED;
596 priority = trp.trp_pri;
597 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
598 }
599
600 if (trp.trp_flags & TRP_POLICY) {
601 policy = trp.trp_pol;
602 }
603 }
604
605 thread_set_workq_pri(th, qos, priority, policy);
606}
607
608/*
609 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
610 * every time a servicer is being told about a new max QoS.
611 */
612void
613workq_thread_set_max_qos(struct proc *p, struct kqrequest *kqr)
614{
615 struct uu_workq_policy old_pri, new_pri;
616 struct uthread *uth = get_bsdthread_info(kqr->kqr_thread);
617 struct workqueue *wq = proc_get_wqptr_fast(p);
618 thread_qos_t qos = kqr->kqr_qos_index;
619
620 if (uth->uu_workq_pri.qos_max == qos)
621 return;
622
623 workq_lock_spin(wq);
624 old_pri = new_pri = uth->uu_workq_pri;
625 new_pri.qos_max = qos;
626 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
627 workq_unlock(wq);
628}
629
630#pragma mark idle threads accounting and handling
631
632static inline struct uthread *
633workq_oldest_killable_idle_thread(struct workqueue *wq)
634{
635 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
636
637 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
638 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
639 if (uth) {
640 assert(uth->uu_save.uus_workq_park_data.has_stack);
641 }
642 }
643 return uth;
644}
645
646static inline uint64_t
647workq_kill_delay_for_idle_thread(struct workqueue *wq)
648{
649 uint64_t delay = wq_reduce_pool_window.abstime;
650 uint16_t idle = wq->wq_thidlecount;
651
652 /*
653 * If we have less than wq_death_max_load threads, have a 5s timer.
654 *
655 * For the next wq_max_constrained_threads ones, decay linearly from
656 * from 5s to 50ms.
657 */
658 if (idle <= wq_death_max_load) {
659 return delay;
660 }
661
662 if (wq_max_constrained_threads > idle - wq_death_max_load) {
663 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
664 }
665 return delay / wq_max_constrained_threads;
666}
667
668static inline bool
669workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
670 uint64_t now)
671{
672 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
673 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
674}
675
676static void
677workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
678{
679 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
680
681 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
682 return;
683 }
684 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
685
686 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0, 0);
687
688 /*
689 * <rdar://problem/13139182> Due to how long term timers work, the leeway
690 * can't be too short, so use 500ms which is long enough that we will not
691 * wake up the CPU for killing threads, but short enough that it doesn't
692 * fall into long-term timer list shenanigans.
693 */
694 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
695 wq_reduce_pool_window.abstime / 10,
696 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
697}
698
699/*
700 * `decrement` is set to the number of threads that are no longer dying:
701 * - because they have been resuscitated just in time (workq_pop_idle_thread)
702 * - or have been killed (workq_thread_terminate).
703 */
704static void
705workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
706{
707 struct uthread *uth;
708
709 assert(wq->wq_thdying_count >= decrement);
710 if ((wq->wq_thdying_count -= decrement) > 0)
711 return;
712
713 if (wq->wq_thidlecount <= 1)
714 return;
715
716 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL)
717 return;
718
719 uint64_t now = mach_absolute_time();
720 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
721
722 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
723 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
724 wq, wq->wq_thidlecount, 0, 0, 0);
725 wq->wq_thdying_count++;
726 uth->uu_workq_flags |= UT_WORKQ_DYING;
727 workq_thread_wakeup(uth);
728 return;
729 }
730
731 workq_death_call_schedule(wq,
732 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
733}
734
735void
736workq_thread_terminate(struct proc *p, struct uthread *uth)
737{
738 struct workqueue *wq = proc_get_wqptr_fast(p);
739
740 workq_lock_spin(wq);
741 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
742 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
743 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
744 wq, wq->wq_thidlecount, 0, 0, 0);
745 workq_death_policy_evaluate(wq, 1);
746 }
747 if (wq->wq_nthreads-- == wq_max_threads) {
748 /*
749 * We got under the thread limit again, which may have prevented
750 * thread creation from happening, redrive if there are pending requests
751 */
752 if (wq->wq_reqcount) {
753 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
754 }
755 }
756 workq_unlock(wq);
757
758 thread_deallocate(uth->uu_thread);
759}
760
761static void
762workq_kill_old_threads_call(void *param0, void *param1 __unused)
763{
764 struct workqueue *wq = param0;
765
766 workq_lock_spin(wq);
767 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0, 0);
768 os_atomic_and(&wq->wq_flags, ~WQ_DEATH_CALL_SCHEDULED, relaxed);
769 workq_death_policy_evaluate(wq, 0);
770 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0, 0);
771 workq_unlock(wq);
772}
773
774static struct uthread *
775workq_pop_idle_thread(struct workqueue *wq)
776{
777 struct uthread *uth;
778
779 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
780 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
781 } else {
782 uth = TAILQ_FIRST(&wq->wq_thnewlist);
783 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
784 }
785 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
786
787 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
788 uth->uu_workq_flags |= UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT;
789 wq->wq_threads_scheduled++;
790 wq->wq_thidlecount--;
791
792 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
793 uth->uu_workq_flags ^= UT_WORKQ_DYING;
794 workq_death_policy_evaluate(wq, 1);
795 }
796 return uth;
797}
798
799/*
800 * Called by thread_create_workq_waiting() during thread initialization, before
801 * assert_wait, before the thread has been started.
802 */
803event_t
804workq_thread_init_and_wq_lock(task_t task, thread_t th)
805{
806 struct uthread *uth = get_bsdthread_info(th);
807
808 uth->uu_workq_flags = UT_WORKQ_NEW;
809 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
810 uth->uu_workq_thport = MACH_PORT_NULL;
811 uth->uu_workq_stackaddr = 0;
812
813 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
814 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
815
816 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
817 return workq_parked_wait_event(uth);
818}
819
820/**
821 * Try to add a new workqueue thread.
822 *
823 * - called with workq lock held
824 * - dropped and retaken around thread creation
825 * - return with workq lock held
826 */
827static bool
828workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
829{
830 mach_vm_offset_t th_stackaddr;
831 kern_return_t kret;
832 thread_t th;
833
834 wq->wq_nthreads++;
835
836 workq_unlock(wq);
837
838 vm_map_t vmap = get_task_map(p->task);
839
840 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
841 if (kret != KERN_SUCCESS) {
842 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
843 kret, 1, 0, 0);
844 goto out;
845 }
846
847 kret = thread_create_workq_waiting(p->task, workq_unpark_continue, &th);
848 if (kret != KERN_SUCCESS) {
849 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
850 kret, 0, 0, 0);
851 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
852 goto out;
853 }
854
855 // thread_create_workq_waiting() will return with the wq lock held
856 // on success, because it calls workq_thread_init_and_wq_lock() above
857
858 struct uthread *uth = get_bsdthread_info(th);
859
860 wq->wq_creations++;
861 wq->wq_thidlecount++;
862 uth->uu_workq_stackaddr = th_stackaddr;
863 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
864
865 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
866 return true;
867
868out:
869 workq_lock_spin(wq);
870 /*
871 * Do not redrive here if we went under wq_max_threads again,
872 * it is the responsibility of the callers of this function
873 * to do so when it fails.
874 */
875 wq->wq_nthreads--;
876 return false;
877}
878
879#define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
880
881__attribute__((noreturn, noinline))
882static void
883workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
884 struct uthread *uth, uint32_t death_flags)
885{
886 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
887 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
888
889 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
890 workq_thread_reset_pri(wq, uth, NULL);
891 qos = WORKQ_THREAD_QOS_CLEANUP;
892 }
893
894 workq_thread_reset_cpupercent(NULL, uth);
895
896 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
897 wq->wq_thidlecount--;
898 if (first_use) {
899 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
900 } else {
901 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
902 }
903 }
904 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
905
906 workq_unlock(wq);
907
908 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
909 uint32_t setup_flags = WQ_SETUP_EXIT_THREAD;
910 thread_t th = uth->uu_thread;
911 vm_map_t vmap = get_task_map(p->task);
912
913 if (!first_use) flags |= WQ_FLAG_THREAD_REUSE;
914
915 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
916 uth->uu_workq_thport, 0, setup_flags, flags);
917 __builtin_unreachable();
918}
919
920bool
921workq_is_current_thread_updating_turnstile(struct workqueue *wq)
922{
923 return wq->wq_turnstile_updater == current_thread();
924}
925
926__attribute__((always_inline))
927static inline void
928workq_perform_turnstile_operation_locked(struct workqueue *wq,
929 void (^operation)(void))
930{
931 workq_lock_held(wq);
932 wq->wq_turnstile_updater = current_thread();
933 operation();
934 wq->wq_turnstile_updater = THREAD_NULL;
935}
936
937static void
938workq_turnstile_update_inheritor(struct workqueue *wq,
939 turnstile_inheritor_t inheritor,
940 turnstile_update_flags_t flags)
941{
942 workq_perform_turnstile_operation_locked(wq, ^{
943 turnstile_update_inheritor(wq->wq_turnstile, inheritor,
944 flags | TURNSTILE_IMMEDIATE_UPDATE);
945 turnstile_update_inheritor_complete(wq->wq_turnstile,
946 TURNSTILE_INTERLOCK_HELD);
947 });
948}
949
950static void
951workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth)
952{
953 uint64_t now = mach_absolute_time();
954
955 uth->uu_workq_flags &= ~UT_WORKQ_RUNNING;
956 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
957 wq->wq_constrained_threads_scheduled--;
958 }
959 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
960 wq->wq_threads_scheduled--;
961
962 if (wq->wq_creator == uth) {
963 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
964 uth->uu_save.uus_workq_park_data.yields, 0);
965 wq->wq_creator = NULL;
966 if (wq->wq_reqcount) {
967 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
968 } else {
969 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
970 }
971 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
972 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
973 wq->wq_thidlecount++;
974 return;
975 }
976 } else {
977 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
978 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
979 assert(!(uth->uu_workq_flags & UT_WORKQ_NEW));
980 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
981 }
982
983 uth->uu_save.uus_workq_park_data.idle_stamp = now;
984
985 struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
986 uint16_t cur_idle = wq->wq_thidlecount;
987
988 if (cur_idle >= wq_max_constrained_threads ||
989 (wq->wq_thdying_count == 0 && oldest &&
990 workq_should_kill_idle_thread(wq, oldest, now))) {
991 /*
992 * Immediately kill threads if we have too may of them.
993 *
994 * And swap "place" with the oldest one we'd have woken up.
995 * This is a relatively desperate situation where we really
996 * need to kill threads quickly and it's best to kill
997 * the one that's currently on core than context switching.
998 */
999 if (oldest) {
1000 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1001 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1002 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1003 }
1004
1005 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1006 wq, cur_idle, 0, 0, 0);
1007 wq->wq_thdying_count++;
1008 uth->uu_workq_flags |= UT_WORKQ_DYING;
1009 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1010 workq_unpark_for_death_and_unlock(p, wq, uth, 0);
1011 __builtin_unreachable();
1012 }
1013
1014 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1015
1016 cur_idle += 1;
1017 wq->wq_thidlecount = cur_idle;
1018
1019 if (cur_idle >= wq_death_max_load && tail &&
1020 tail->uu_save.uus_workq_park_data.has_stack) {
1021 uth->uu_save.uus_workq_park_data.has_stack = false;
1022 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1023 } else {
1024 uth->uu_save.uus_workq_park_data.has_stack = true;
1025 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1026 }
1027
1028 if (!tail) {
1029 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1030 workq_death_call_schedule(wq, now + delay);
1031 }
1032}
1033
1034#pragma mark thread requests
1035
1036static inline int
1037workq_priority_for_req(workq_threadreq_t req)
1038{
1039 thread_qos_t qos = req->tr_qos;
1040
1041 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
1042 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1043 assert(trp.trp_flags & TRP_PRIORITY);
1044 return trp.trp_pri;
1045 }
1046 return thread_workq_pri_for_qos(qos);
1047}
1048
1049static inline struct priority_queue *
1050workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1051{
1052 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
1053 return &wq->wq_special_queue;
1054 } else if (req->tr_flags & TR_FLAG_OVERCOMMIT) {
1055 return &wq->wq_overcommit_queue;
1056 } else {
1057 return &wq->wq_constrained_queue;
1058 }
1059}
1060
1061/*
1062 * returns true if the the enqueued request is the highest priority item
1063 * in its priority queue.
1064 */
1065static bool
1066workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1067{
1068 assert(req->tr_state == TR_STATE_NEW);
1069
1070 req->tr_state = TR_STATE_QUEUED;
1071 wq->wq_reqcount += req->tr_count;
1072
1073 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1074 assert(wq->wq_event_manager_threadreq == NULL);
1075 assert(req->tr_flags & TR_FLAG_KEVENT);
1076 assert(req->tr_count == 1);
1077 wq->wq_event_manager_threadreq = req;
1078 return true;
1079 }
1080 if (priority_queue_insert(workq_priority_queue_for_req(wq, req),
1081 &req->tr_entry, workq_priority_for_req(req),
1082 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
1083 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
1084 _wq_thactive_refresh_best_constrained_req_qos(wq);
1085 }
1086 return true;
1087 }
1088 return false;
1089}
1090
1091/*
1092 * returns true if the the dequeued request was the highest priority item
1093 * in its priority queue.
1094 */
1095static bool
1096workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req)
1097{
1098 wq->wq_reqcount--;
1099
1100 if (--req->tr_count == 0) {
1101 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1102 assert(wq->wq_event_manager_threadreq == req);
1103 assert(req->tr_count == 0);
1104 wq->wq_event_manager_threadreq = NULL;
1105 return true;
1106 }
1107 if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1108 &req->tr_entry, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
1109 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
1110 _wq_thactive_refresh_best_constrained_req_qos(wq);
1111 }
1112 return true;
1113 }
1114 }
1115 return false;
1116}
1117
1118static void
1119workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1120{
1121 req->tr_state = TR_STATE_IDLE;
1122 if (req->tr_flags & (TR_FLAG_WORKLOOP | TR_FLAG_KEVENT)) {
1123 kqueue_threadreq_cancel(p, req);
1124 } else {
1125 zfree(workq_zone_threadreq, req);
1126 }
1127}
1128
1129/*
1130 * Mark a thread request as complete. At this point, it is treated as owned by
1131 * the submitting subsystem and you should assume it could be freed.
1132 *
1133 * Called with the workqueue lock held.
1134 */
1135static void
1136workq_threadreq_bind_and_unlock(proc_t p, struct workqueue *wq,
1137 workq_threadreq_t req, struct uthread *uth)
1138{
1139 uint8_t tr_flags = req->tr_flags;
1140 bool needs_commit = false;
1141 int creator_flags = 0;
1142
1143 wq->wq_fulfilled++;
1144
1145 if (req->tr_state == TR_STATE_QUEUED) {
1146 workq_threadreq_dequeue(wq, req);
1147 creator_flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
1148 }
1149
1150 if (wq->wq_creator == uth) {
1151 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
1152 uth->uu_save.uus_workq_park_data.yields, 0);
1153 creator_flags = WORKQ_THREADREQ_CAN_CREATE_THREADS |
1154 WORKQ_THREADREQ_CREATOR_TRANSFER;
1155 wq->wq_creator = NULL;
1156 _wq_thactive_inc(wq, req->tr_qos);
1157 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
1158 } else if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
1159 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
1160 }
1161 workq_thread_reset_pri(wq, uth, req);
1162
1163 if (tr_flags & TR_FLAG_OVERCOMMIT) {
1164 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
1165 uth->uu_workq_flags |= UT_WORKQ_OVERCOMMIT;
1166 wq->wq_constrained_threads_scheduled--;
1167 }
1168 } else {
1169 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0) {
1170 uth->uu_workq_flags &= ~UT_WORKQ_OVERCOMMIT;
1171 wq->wq_constrained_threads_scheduled++;
1172 }
1173 }
1174
1175 if (tr_flags & (TR_FLAG_KEVENT | TR_FLAG_WORKLOOP)) {
1176 if (req->tr_state == TR_STATE_NEW) {
1177 /*
1178 * We're called from workq_kern_threadreq_initiate()
1179 * due to an unbind, with the kq req held.
1180 */
1181 assert(!creator_flags);
1182 req->tr_state = TR_STATE_IDLE;
1183 kqueue_threadreq_bind(p, req, uth->uu_thread, 0);
1184 } else {
1185 assert(req->tr_count == 0);
1186 workq_perform_turnstile_operation_locked(wq, ^{
1187 kqueue_threadreq_bind_prepost(p, req, uth->uu_thread);
1188 });
1189 needs_commit = true;
1190 }
1191 req = NULL;
1192 } else if (req->tr_count > 0) {
1193 req = NULL;
1194 }
1195
1196 if (creator_flags) {
1197 /* This can drop the workqueue lock, and take it again */
1198 workq_schedule_creator(p, wq, creator_flags);
1199 }
1200
1201 workq_unlock(wq);
1202
1203 if (req) {
1204 zfree(workq_zone_threadreq, req);
1205 }
1206 if (needs_commit) {
1207 kqueue_threadreq_bind_commit(p, uth->uu_thread);
1208 }
1209
1210 /*
1211 * Run Thread, Run!
1212 */
1213 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
1214 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1215 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
1216 } else if (tr_flags & TR_FLAG_OVERCOMMIT) {
1217 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
1218 }
1219 if (tr_flags & TR_FLAG_KEVENT) {
1220 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
1221 }
1222 if (tr_flags & TR_FLAG_WORKLOOP) {
1223 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
1224 }
1225 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
1226}
1227
1228#pragma mark workqueue thread creation thread calls
1229
1230static inline bool
1231workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1232 uint32_t fail_mask)
1233{
1234 uint32_t old_flags, new_flags;
1235
1236 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1237 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1238 os_atomic_rmw_loop_give_up(return false);
1239 }
1240 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1241 new_flags = old_flags | pend;
1242 } else {
1243 new_flags = old_flags | sched;
1244 }
1245 });
1246
1247 return (old_flags & WQ_PROC_SUSPENDED) == 0;
1248}
1249
1250#define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1251
1252static bool
1253workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1254{
1255 assert(!preemption_enabled());
1256
1257 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1258 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1259 WQ_IMMEDIATE_CALL_SCHEDULED)) {
1260 return false;
1261 }
1262
1263 uint64_t now = mach_absolute_time();
1264
1265 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1266 /* do not change the window */
1267 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1268 wq->wq_timer_interval *= 2;
1269 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1270 wq->wq_timer_interval = wq_max_timer_interval.abstime;
1271 }
1272 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1273 wq->wq_timer_interval /= 2;
1274 if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1275 wq->wq_timer_interval = wq_stalled_window.abstime;
1276 }
1277 }
1278
1279 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1280 _wq_flags(wq), wq->wq_timer_interval, 0);
1281
1282 thread_call_t call = wq->wq_delayed_call;
1283 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1284 uint64_t deadline = now + wq->wq_timer_interval;
1285 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1286 panic("delayed_call was already enqueued");
1287 }
1288 return true;
1289}
1290
1291static void
1292workq_schedule_immediate_thread_creation(struct workqueue *wq)
1293{
1294 assert(!preemption_enabled());
1295
1296 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1297 WQ_IMMEDIATE_CALL_PENDED, 0)) {
1298 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1299 _wq_flags(wq), 0, 0);
1300
1301 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1302 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1303 panic("immediate_call was already enqueued");
1304 }
1305 }
1306}
1307
1308void
1309workq_proc_suspended(struct proc *p)
1310{
1311 struct workqueue *wq = proc_get_wqptr(p);
1312
1313 if (wq) os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1314}
1315
1316void
1317workq_proc_resumed(struct proc *p)
1318{
1319 struct workqueue *wq = proc_get_wqptr(p);
1320 uint32_t wq_flags;
1321
1322 if (!wq) return;
1323
1324 wq_flags = os_atomic_and_orig(&wq->wq_flags, ~(WQ_PROC_SUSPENDED |
1325 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED), relaxed);
1326 if ((wq_flags & WQ_EXITING) == 0) {
1327 disable_preemption();
1328 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1329 workq_schedule_immediate_thread_creation(wq);
1330 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1331 workq_schedule_delayed_thread_creation(wq,
1332 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1333 }
1334 enable_preemption();
1335 }
1336}
1337
1338/**
1339 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1340 */
1341static bool
1342workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1343{
1344 uint64_t lastblocked_ts = os_atomic_load(lastblocked_tsp, relaxed);
1345 if (now <= lastblocked_ts) {
1346 /*
1347 * Because the update of the timestamp when a thread blocks
1348 * isn't serialized against us looking at it (i.e. we don't hold
1349 * the workq lock), it's possible to have a timestamp that matches
1350 * the current time or that even looks to be in the future relative
1351 * to when we grabbed the current time...
1352 *
1353 * Just treat this as a busy thread since it must have just blocked.
1354 */
1355 return true;
1356 }
1357 return (now - lastblocked_ts) < wq_stalled_window.abstime;
1358}
1359
1360static void
1361workq_add_new_threads_call(void *_p, void *flags)
1362{
1363 proc_t p = _p;
1364 struct workqueue *wq = proc_get_wqptr(p);
1365 uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1366
1367 /*
1368 * workq_exit() will set the workqueue to NULL before
1369 * it cancels thread calls.
1370 */
1371 if (!wq) return;
1372
1373 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1374 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1375
1376 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1377 wq->wq_nthreads, wq->wq_thidlecount, 0);
1378
1379 workq_lock_spin(wq);
1380
1381 wq->wq_thread_call_last_run = mach_absolute_time();
1382 os_atomic_and(&wq->wq_flags, ~my_flag, release);
1383
1384 /* This can drop the workqueue lock, and take it again */
1385 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1386
1387 workq_unlock(wq);
1388
1389 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1390 wq->wq_nthreads, wq->wq_thidlecount, 0);
1391}
1392
1393#pragma mark thread state tracking
1394
1395static void
1396workq_sched_callback(int type, thread_t thread)
1397{
1398 struct uthread *uth = get_bsdthread_info(thread);
1399 proc_t proc = get_bsdtask_info(get_threadtask(thread));
1400 struct workqueue *wq = proc_get_wqptr(proc);
1401 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1402 wq_thactive_t old_thactive;
1403 bool start_timer = false;
1404
1405 if (qos == WORKQ_THREAD_QOS_MANAGER) {
1406 return;
1407 }
1408
1409 switch (type) {
1410 case SCHED_CALL_BLOCK:
1411 old_thactive = _wq_thactive_dec(wq, qos);
1412 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1413
1414 /*
1415 * Remember the timestamp of the last thread that blocked in this
1416 * bucket, it used used by admission checks to ignore one thread
1417 * being inactive if this timestamp is recent enough.
1418 *
1419 * If we collide with another thread trying to update the
1420 * last_blocked (really unlikely since another thread would have to
1421 * get scheduled and then block after we start down this path), it's
1422 * not a problem. Either timestamp is adequate, so no need to retry
1423 */
1424 os_atomic_store(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1425 thread_last_run_time(thread), relaxed);
1426
1427 if (req_qos == THREAD_QOS_UNSPECIFIED) {
1428 /*
1429 * No pending request at the moment we could unblock, move on.
1430 */
1431 } else if (qos < req_qos) {
1432 /*
1433 * The blocking thread is at a lower QoS than the highest currently
1434 * pending constrained request, nothing has to be redriven
1435 */
1436 } else {
1437 uint32_t max_busycount, old_req_count;
1438 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1439 req_qos, NULL, &max_busycount);
1440 /*
1441 * If it is possible that may_start_constrained_thread had refused
1442 * admission due to being over the max concurrency, we may need to
1443 * spin up a new thread.
1444 *
1445 * We take into account the maximum number of busy threads
1446 * that can affect may_start_constrained_thread as looking at the
1447 * actual number may_start_constrained_thread will see is racy.
1448 *
1449 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1450 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1451 */
1452 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1453 if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1454 start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1455 }
1456 }
1457 if (__improbable(kdebug_enable)) {
1458 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1459 old_thactive, qos, NULL, NULL);
1460 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1461 old - 1, qos | (req_qos << 8),
1462 wq->wq_reqcount << 1 | start_timer, 0);
1463 }
1464 break;
1465
1466 case SCHED_CALL_UNBLOCK:
1467 /*
1468 * we cannot take the workqueue_lock here...
1469 * an UNBLOCK can occur from a timer event which
1470 * is run from an interrupt context... if the workqueue_lock
1471 * is already held by this processor, we'll deadlock...
1472 * the thread lock for the thread being UNBLOCKED
1473 * is also held
1474 */
1475 old_thactive = _wq_thactive_inc(wq, qos);
1476 if (__improbable(kdebug_enable)) {
1477 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1478 old_thactive, qos, NULL, NULL);
1479 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1480 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1481 old + 1, qos | (req_qos << 8),
1482 wq->wq_threads_scheduled, 0);
1483 }
1484 break;
1485 }
1486}
1487
1488#pragma mark workq lifecycle
1489
1490void
1491workq_reference(struct workqueue *wq)
1492{
1493 os_ref_retain(&wq->wq_refcnt);
1494}
1495
1496void
1497workq_destroy(struct workqueue *wq)
1498{
1499 struct turnstile *ts;
1500
1501 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts);
1502 assert(ts);
1503 turnstile_cleanup();
1504 turnstile_deallocate(ts);
1505
1506 lck_spin_destroy(&wq->wq_lock, workq_lck_grp);
1507 zfree(workq_zone_workqueue, wq);
1508}
1509
1510static void
1511workq_deallocate(struct workqueue *wq)
1512{
1513 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
1514 workq_destroy(wq);
1515 }
1516}
1517
1518void
1519workq_deallocate_safe(struct workqueue *wq)
1520{
1521 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
1522 workq_deallocate_enqueue(wq);
1523 }
1524}
1525
1526/**
1527 * Setup per-process state for the workqueue.
1528 */
1529int
1530workq_open(struct proc *p, __unused struct workq_open_args *uap,
1531 __unused int32_t *retval)
1532{
1533 struct workqueue *wq;
1534 int error = 0;
1535
1536 if ((p->p_lflag & P_LREGISTER) == 0) {
1537 return EINVAL;
1538 }
1539
1540 if (wq_init_constrained_limit) {
1541 uint32_t limit, num_cpus = ml_get_max_cpus();
1542
1543 /*
1544 * set up the limit for the constrained pool
1545 * this is a virtual pool in that we don't
1546 * maintain it on a separate idle and run list
1547 */
1548 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
1549
1550 if (limit > wq_max_constrained_threads)
1551 wq_max_constrained_threads = limit;
1552
1553 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
1554 wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
1555 }
1556 if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
1557 wq_max_threads = CONFIG_THREAD_MAX - 20;
1558 }
1559
1560 wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
1561
1562 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
1563 wq_max_parallelism[_wq_bucket(qos)] =
1564 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
1565 }
1566
1567 wq_init_constrained_limit = 0;
1568 }
1569
1570 if (proc_get_wqptr(p) == NULL) {
1571 if (proc_init_wqptr_or_wait(p) == FALSE) {
1572 assert(proc_get_wqptr(p) != NULL);
1573 goto out;
1574 }
1575
1576 wq = (struct workqueue *)zalloc(workq_zone_workqueue);
1577 bzero(wq, sizeof(struct workqueue));
1578
1579 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
1580
1581 // Start the event manager at the priority hinted at by the policy engine
1582 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
1583 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
1584 wq->wq_event_manager_priority = (uint32_t)pp;
1585 wq->wq_timer_interval = wq_stalled_window.abstime;
1586 wq->wq_proc = p;
1587 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
1588 TURNSTILE_WORKQS);
1589
1590 TAILQ_INIT(&wq->wq_thrunlist);
1591 TAILQ_INIT(&wq->wq_thnewlist);
1592 TAILQ_INIT(&wq->wq_thidlelist);
1593 priority_queue_init(&wq->wq_overcommit_queue,
1594 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1595 priority_queue_init(&wq->wq_constrained_queue,
1596 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1597 priority_queue_init(&wq->wq_special_queue,
1598 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1599
1600 wq->wq_delayed_call = thread_call_allocate_with_options(
1601 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
1602 THREAD_CALL_OPTIONS_ONCE);
1603 wq->wq_immediate_call = thread_call_allocate_with_options(
1604 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
1605 THREAD_CALL_OPTIONS_ONCE);
1606 wq->wq_death_call = thread_call_allocate_with_options(
1607 workq_kill_old_threads_call, wq,
1608 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
1609
1610 lck_spin_init(&wq->wq_lock, workq_lck_grp, workq_lck_attr);
1611
1612 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
1613 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0);
1614 proc_set_wqptr(p, wq);
1615 }
1616out:
1617
1618 return error;
1619}
1620
1621/*
1622 * Routine: workq_mark_exiting
1623 *
1624 * Function: Mark the work queue such that new threads will not be added to the
1625 * work queue after we return.
1626 *
1627 * Conditions: Called against the current process.
1628 */
1629void
1630workq_mark_exiting(struct proc *p)
1631{
1632 struct workqueue *wq = proc_get_wqptr(p);
1633 uint32_t wq_flags;
1634 workq_threadreq_t mgr_req;
1635
1636 if (!wq) return;
1637
1638 WQ_TRACE_WQ(TRACE_wq_pthread_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
1639
1640 workq_lock_spin(wq);
1641
1642 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
1643 if (__improbable(wq_flags & WQ_EXITING)) {
1644 panic("workq_mark_exiting called twice");
1645 }
1646
1647 /*
1648 * Opportunistically try to cancel thread calls that are likely in flight.
1649 * workq_exit() will do the proper cleanup.
1650 */
1651 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
1652 thread_call_cancel(wq->wq_immediate_call);
1653 }
1654 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
1655 thread_call_cancel(wq->wq_delayed_call);
1656 }
1657 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
1658 thread_call_cancel(wq->wq_death_call);
1659 }
1660
1661 mgr_req = wq->wq_event_manager_threadreq;
1662 wq->wq_event_manager_threadreq = NULL;
1663 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
1664 workq_turnstile_update_inheritor(wq, NULL, 0);
1665
1666 workq_unlock(wq);
1667
1668 if (mgr_req) {
1669 kqueue_threadreq_cancel(p, mgr_req);
1670 }
1671 /*
1672 * No one touches the priority queues once WQ_EXITING is set.
1673 * It is hence safe to do the tear down without holding any lock.
1674 */
1675 priority_queue_destroy(&wq->wq_overcommit_queue,
1676 struct workq_threadreq_s, tr_entry, ^(void *e){
1677 workq_threadreq_destroy(p, e);
1678 });
1679 priority_queue_destroy(&wq->wq_constrained_queue,
1680 struct workq_threadreq_s, tr_entry, ^(void *e){
1681 workq_threadreq_destroy(p, e);
1682 });
1683 priority_queue_destroy(&wq->wq_special_queue,
1684 struct workq_threadreq_s, tr_entry, ^(void *e){
1685 workq_threadreq_destroy(p, e);
1686 });
1687
1688 WQ_TRACE(TRACE_wq_pthread_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
1689}
1690
1691/*
1692 * Routine: workq_exit
1693 *
1694 * Function: clean up the work queue structure(s) now that there are no threads
1695 * left running inside the work queue (except possibly current_thread).
1696 *
1697 * Conditions: Called by the last thread in the process.
1698 * Called against current process.
1699 */
1700void
1701workq_exit(struct proc *p)
1702{
1703 struct workqueue *wq;
1704 struct uthread *uth, *tmp;
1705
1706 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
1707 if (wq != NULL) {
1708 thread_t th = current_thread();
1709
1710 WQ_TRACE_WQ(TRACE_wq_workqueue_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
1711
1712 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
1713 /*
1714 * <rdar://problem/40111515> Make sure we will no longer call the
1715 * sched call, if we ever block this thread, which the cancel_wait
1716 * below can do.
1717 */
1718 thread_sched_call(th, NULL);
1719 }
1720
1721 /*
1722 * Thread calls are always scheduled by the proc itself or under the
1723 * workqueue spinlock if WQ_EXITING is not yet set.
1724 *
1725 * Either way, when this runs, the proc has no threads left beside
1726 * the one running this very code, so we know no thread call can be
1727 * dispatched anymore.
1728 */
1729 thread_call_cancel_wait(wq->wq_delayed_call);
1730 thread_call_cancel_wait(wq->wq_immediate_call);
1731 thread_call_cancel_wait(wq->wq_death_call);
1732 thread_call_free(wq->wq_delayed_call);
1733 thread_call_free(wq->wq_immediate_call);
1734 thread_call_free(wq->wq_death_call);
1735
1736 /*
1737 * Clean up workqueue data structures for threads that exited and
1738 * didn't get a chance to clean up after themselves.
1739 *
1740 * idle/new threads should have been interrupted and died on their own
1741 */
1742 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
1743 thread_sched_call(uth->uu_thread, NULL);
1744 thread_deallocate(uth->uu_thread);
1745 }
1746 assert(TAILQ_EMPTY(&wq->wq_thnewlist));
1747 assert(TAILQ_EMPTY(&wq->wq_thidlelist));
1748
1749 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
1750 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0);
1751
1752 workq_deallocate(wq);
1753
1754 WQ_TRACE(TRACE_wq_workqueue_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
1755 }
1756}
1757
1758
1759#pragma mark bsd thread control
1760
1761static bool
1762_pthread_priority_to_policy(pthread_priority_t priority,
1763 thread_qos_policy_data_t *data)
1764{
1765 data->qos_tier = _pthread_priority_thread_qos(priority);
1766 data->tier_importance = _pthread_priority_relpri(priority);
1767 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
1768 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
1769 return false;
1770 }
1771 return true;
1772}
1773
1774static int
1775bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
1776 mach_port_name_t voucher, enum workq_set_self_flags flags)
1777{
1778 struct uthread *uth = get_bsdthread_info(th);
1779 struct workqueue *wq = proc_get_wqptr(p);
1780
1781 kern_return_t kr;
1782 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
1783 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
1784
1785 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
1786 if (!is_wq_thread) {
1787 unbind_rv = EINVAL;
1788 goto qos;
1789 }
1790
1791 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1792 unbind_rv = EINVAL;
1793 goto qos;
1794 }
1795
1796 struct kqrequest *kqr = uth->uu_kqr_bound;
1797 if (kqr == NULL) {
1798 unbind_rv = EALREADY;
1799 goto qos;
1800 }
1801
1802 if (kqr->kqr_state & KQR_WORKLOOP) {
1803 unbind_rv = EINVAL;
1804 goto qos;
1805 }
1806
1807 kqueue_threadreq_unbind(p, uth->uu_kqr_bound);
1808 }
1809
1810qos:
1811 if (flags & WORKQ_SET_SELF_QOS_FLAG) {
1812 thread_qos_policy_data_t new_policy;
1813
1814 if (!_pthread_priority_to_policy(priority, &new_policy)) {
1815 qos_rv = EINVAL;
1816 goto voucher;
1817 }
1818
1819 if (!is_wq_thread) {
1820 /*
1821 * Threads opted out of QoS can't change QoS
1822 */
1823 if (!thread_has_qos_policy(th)) {
1824 qos_rv = EPERM;
1825 goto voucher;
1826 }
1827 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1828 /*
1829 * Workqueue manager threads can't change QoS
1830 */
1831 qos_rv = EINVAL;
1832 goto voucher;
1833 } else {
1834 /*
1835 * For workqueue threads, possibly adjust buckets and redrive thread
1836 * requests.
1837 */
1838 bool old_overcommit = uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT;
1839 bool new_overcommit = priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
1840 struct uu_workq_policy old_pri, new_pri;
1841 bool force_run = false;
1842
1843 workq_lock_spin(wq);
1844
1845 if (old_overcommit != new_overcommit) {
1846 uth->uu_workq_flags ^= UT_WORKQ_OVERCOMMIT;
1847 if (old_overcommit) {
1848 wq->wq_constrained_threads_scheduled++;
1849 } else if (wq->wq_constrained_threads_scheduled-- ==
1850 wq_max_constrained_threads) {
1851 force_run = true;
1852 }
1853 }
1854
1855 old_pri = new_pri = uth->uu_workq_pri;
1856 new_pri.qos_req = new_policy.qos_tier;
1857 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
1858 workq_unlock(wq);
1859 }
1860
1861 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
1862 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
1863 if (kr != KERN_SUCCESS) {
1864 qos_rv = EINVAL;
1865 }
1866 }
1867
1868voucher:
1869 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
1870 kr = thread_set_voucher_name(voucher);
1871 if (kr != KERN_SUCCESS) {
1872 voucher_rv = ENOENT;
1873 goto fixedpri;
1874 }
1875 }
1876
1877fixedpri:
1878 if (qos_rv) goto done;
1879 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
1880 thread_extended_policy_data_t extpol = {.timeshare = 0};
1881
1882 if (is_wq_thread) {
1883 /* Not allowed on workqueue threads */
1884 fixedpri_rv = ENOTSUP;
1885 goto done;
1886 }
1887
1888 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
1889 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
1890 if (kr != KERN_SUCCESS) {
1891 fixedpri_rv = EINVAL;
1892 goto done;
1893 }
1894 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
1895 thread_extended_policy_data_t extpol = {.timeshare = 1};
1896
1897 if (is_wq_thread) {
1898 /* Not allowed on workqueue threads */
1899 fixedpri_rv = ENOTSUP;
1900 goto done;
1901 }
1902
1903 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
1904 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
1905 if (kr != KERN_SUCCESS) {
1906 fixedpri_rv = EINVAL;
1907 goto done;
1908 }
1909 }
1910
1911done:
1912 if (qos_rv && voucher_rv) {
1913 /* Both failed, give that a unique error. */
1914 return EBADMSG;
1915 }
1916
1917 if (unbind_rv) {
1918 return unbind_rv;
1919 }
1920
1921 if (qos_rv) {
1922 return qos_rv;
1923 }
1924
1925 if (voucher_rv) {
1926 return voucher_rv;
1927 }
1928
1929 if (fixedpri_rv) {
1930 return fixedpri_rv;
1931 }
1932
1933 return 0;
1934}
1935
1936static int
1937bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
1938 pthread_priority_t pp, user_addr_t resource)
1939{
1940 thread_qos_t qos = _pthread_priority_thread_qos(pp);
1941 if (qos == THREAD_QOS_UNSPECIFIED) {
1942 return EINVAL;
1943 }
1944
1945 thread_t th = port_name_to_thread(kport);
1946 if (th == THREAD_NULL) {
1947 return ESRCH;
1948 }
1949
1950 int rv = proc_thread_qos_add_override(p->task, th, 0, qos, TRUE,
1951 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
1952
1953 thread_deallocate(th);
1954 return rv;
1955}
1956
1957static int
1958bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
1959 user_addr_t resource)
1960{
1961 thread_t th = port_name_to_thread(kport);
1962 if (th == THREAD_NULL) {
1963 return ESRCH;
1964 }
1965
1966 int rv = proc_thread_qos_remove_override(p->task, th, 0, resource,
1967 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
1968
1969 thread_deallocate(th);
1970 return rv;
1971}
1972
1973static int
1974workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
1975 pthread_priority_t pp, user_addr_t ulock_addr)
1976{
1977 struct uu_workq_policy old_pri, new_pri;
1978 struct workqueue *wq = proc_get_wqptr(p);
1979
1980 thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
1981 if (qos_override == THREAD_QOS_UNSPECIFIED) {
1982 return EINVAL;
1983 }
1984
1985 thread_t thread = port_name_to_thread(kport);
1986 if (thread == THREAD_NULL) {
1987 return ESRCH;
1988 }
1989
1990 struct uthread *uth = get_bsdthread_info(thread);
1991 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
1992 thread_deallocate(thread);
1993 return EPERM;
1994 }
1995
1996 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
1997 wq, thread_tid(thread), 1, pp, 0);
1998
1999 thread_mtx_lock(thread);
2000
2001 if (ulock_addr) {
2002 uint64_t val;
2003 int rc;
2004 /*
2005 * Workaround lack of explicit support for 'no-fault copyin'
2006 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2007 */
2008 disable_preemption();
2009 rc = copyin_word(ulock_addr, &val, sizeof(kport));
2010 enable_preemption();
2011 if (rc == 0 && ulock_owner_value_to_port_name((uint32_t)val) != kport) {
2012 goto out;
2013 }
2014 }
2015
2016 workq_lock_spin(wq);
2017
2018 old_pri = uth->uu_workq_pri;
2019 if (old_pri.qos_override >= qos_override) {
2020 /* Nothing to do */
2021 } else if (thread == current_thread()) {
2022 new_pri = old_pri;
2023 new_pri.qos_override = qos_override;
2024 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2025 } else {
2026 uth->uu_workq_pri.qos_override = qos_override;
2027 if (qos_override > workq_pri_override(old_pri)) {
2028 thread_set_workq_override(thread, qos_override);
2029 }
2030 }
2031
2032 workq_unlock(wq);
2033
2034out:
2035 thread_mtx_unlock(thread);
2036 thread_deallocate(thread);
2037 return 0;
2038}
2039
2040static int
2041workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2042{
2043 struct uu_workq_policy old_pri, new_pri;
2044 struct workqueue *wq = proc_get_wqptr(p);
2045 struct uthread *uth = get_bsdthread_info(thread);
2046
2047 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2048 return EPERM;
2049 }
2050
2051 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
2052
2053 workq_lock_spin(wq);
2054 old_pri = new_pri = uth->uu_workq_pri;
2055 new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2056 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2057 workq_unlock(wq);
2058 return 0;
2059}
2060
2061static int
2062bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2063 int *retval)
2064{
2065 static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2066 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2067 static_assert(QOS_PARALLELISM_REALTIME ==
2068 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2069
2070 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL)) {
2071 return EINVAL;
2072 }
2073
2074 if (flags & QOS_PARALLELISM_REALTIME) {
2075 if (qos) {
2076 return EINVAL;
2077 }
2078 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2079 return EINVAL;
2080 }
2081
2082 *retval = qos_max_parallelism(qos, flags);
2083 return 0;
2084}
2085
2086#define ENSURE_UNUSED(arg) \
2087 ({ if ((arg) != 0) { return EINVAL; } })
2088
2089int
2090bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2091{
2092 switch (uap->cmd) {
2093 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2094 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2095 (pthread_priority_t)uap->arg2, uap->arg3);
2096 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2097 ENSURE_UNUSED(uap->arg3);
2098 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2099 (user_addr_t)uap->arg2);
2100
2101 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2102 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2103 (pthread_priority_t)uap->arg2, uap->arg3);
2104 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2105 return workq_thread_reset_dispatch_override(p, current_thread());
2106
2107 case BSDTHREAD_CTL_SET_SELF:
2108 return bsdthread_set_self(p, current_thread(),
2109 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2110 (enum workq_set_self_flags)uap->arg3);
2111
2112 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2113 ENSURE_UNUSED(uap->arg3);
2114 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2115 (unsigned long)uap->arg2, retval);
2116
2117 case BSDTHREAD_CTL_SET_QOS:
2118 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2119 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2120 /* no longer supported */
2121 return ENOTSUP;
2122
2123 default:
2124 return EINVAL;
2125 }
2126}
2127
2128#pragma mark workqueue thread manipulation
2129
2130static void __dead2
2131workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2132 struct uthread *uth);
2133
2134static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2135
2136#if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2137static inline uint64_t
2138workq_trace_req_id(workq_threadreq_t req)
2139{
2140 struct kqworkloop *kqwl;
2141 if (req->tr_flags & TR_FLAG_WORKLOOP) {
2142 kqwl = __container_of(req, struct kqworkloop, kqwl_request.kqr_req);
2143 return kqwl->kqwl_dynamicid;
2144 }
2145
2146 return VM_KERNEL_ADDRHIDE(req);
2147}
2148#endif
2149
2150/**
2151 * Entry point for libdispatch to ask for threads
2152 */
2153static int
2154workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp)
2155{
2156 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2157 struct workqueue *wq = proc_get_wqptr(p);
2158 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2159
2160 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2161 qos == THREAD_QOS_UNSPECIFIED) {
2162 return EINVAL;
2163 }
2164
2165 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2166 wq, reqcount, pp, 0, 0);
2167
2168 workq_threadreq_t req = zalloc(workq_zone_threadreq);
2169 priority_queue_entry_init(&req->tr_entry);
2170 req->tr_state = TR_STATE_NEW;
2171 req->tr_flags = 0;
2172 req->tr_qos = qos;
2173
2174 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2175 req->tr_flags |= TR_FLAG_OVERCOMMIT;
2176 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2177 }
2178
2179 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2180 wq, workq_trace_req_id(req), req->tr_qos, reqcount, 0);
2181
2182 workq_lock_spin(wq);
2183 do {
2184 if (_wq_exiting(wq)) {
2185 goto exiting;
2186 }
2187
2188 /*
2189 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2190 * threads without pacing, to inform the scheduler of that workload.
2191 *
2192 * The last requests, or the ones that failed the admission checks are
2193 * enqueued and go through the regular creator codepath.
2194 *
2195 * If there aren't enough threads, add one, but re-evaluate everything
2196 * as conditions may now have changed.
2197 */
2198 if (reqcount > 1 && (req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2199 unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2200 if (unpaced >= reqcount - 1) {
2201 unpaced = reqcount - 1;
2202 }
2203 } else {
2204 unpaced = reqcount - 1;
2205 }
2206
2207 /*
2208 * This path does not currently handle custom workloop parameters
2209 * when creating threads for parallelism.
2210 */
2211 assert(!(req->tr_flags & TR_FLAG_WL_PARAMS));
2212
2213 /*
2214 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2215 */
2216 while (unpaced > 0 && wq->wq_thidlecount) {
2217 struct uthread *uth = workq_pop_idle_thread(wq);
2218
2219 _wq_thactive_inc(wq, qos);
2220 wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2221 workq_thread_reset_pri(wq, uth, req);
2222 wq->wq_fulfilled++;
2223
2224 uth->uu_workq_flags |= UT_WORKQ_EARLY_BOUND;
2225 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2226 uth->uu_workq_flags &= ~UT_WORKQ_OVERCOMMIT;
2227 wq->wq_constrained_threads_scheduled++;
2228 }
2229 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2230 uth->uu_save.uus_workq_park_data.thread_request = req;
2231 workq_thread_wakeup(uth);
2232 unpaced--;
2233 reqcount--;
2234 }
2235 } while (unpaced && wq->wq_nthreads < wq_max_threads &&
2236 workq_add_new_idle_thread(p, wq));
2237
2238 if (_wq_exiting(wq)) {
2239 goto exiting;
2240 }
2241
2242 req->tr_count = reqcount;
2243 if (workq_threadreq_enqueue(wq, req)) {
2244 /* This can drop the workqueue lock, and take it again */
2245 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2246 }
2247 workq_unlock(wq);
2248 return 0;
2249
2250exiting:
2251 workq_unlock(wq);
2252 zfree(workq_zone_threadreq, req);
2253 return ECANCELED;
2254}
2255
2256bool
2257workq_kern_threadreq_initiate(struct proc *p, struct kqrequest *kqr,
2258 struct turnstile *workloop_ts, thread_qos_t qos, int flags)
2259{
2260 struct workqueue *wq = proc_get_wqptr_fast(p);
2261 workq_threadreq_t req = &kqr->kqr_req;
2262 struct uthread *uth = NULL;
2263 uint8_t tr_flags = 0;
2264
2265 if (kqr->kqr_state & KQR_WORKLOOP) {
2266 tr_flags = TR_FLAG_WORKLOOP;
2267
2268 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2269 if (trp.trp_flags & TRP_PRIORITY) {
2270 tr_flags |= TR_FLAG_WL_OUTSIDE_QOS;
2271 qos = thread_workq_qos_for_pri(trp.trp_pri);
2272 if (qos == THREAD_QOS_UNSPECIFIED) {
2273 qos = WORKQ_THREAD_QOS_ABOVEUI;
2274 }
2275 }
2276 if (trp.trp_flags) {
2277 tr_flags |= TR_FLAG_WL_PARAMS;
2278 }
2279 } else {
2280 tr_flags = TR_FLAG_KEVENT;
2281 }
2282 if (qos != WORKQ_THREAD_QOS_MANAGER &&
2283 (kqr->kqr_state & KQR_THOVERCOMMIT)) {
2284 tr_flags |= TR_FLAG_OVERCOMMIT;
2285 }
2286
2287 assert(req->tr_state == TR_STATE_IDLE);
2288 priority_queue_entry_init(&req->tr_entry);
2289 req->tr_count = 1;
2290 req->tr_state = TR_STATE_NEW;
2291 req->tr_flags = tr_flags;
2292 req->tr_qos = qos;
2293
2294 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
2295 workq_trace_req_id(req), qos, 1, 0);
2296
2297 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
2298 /*
2299 * we're called back synchronously from the context of
2300 * kqueue_threadreq_unbind from within workq_thread_return()
2301 * we can try to match up this thread with this request !
2302 */
2303 uth = current_uthread();
2304 assert(uth->uu_kqr_bound == NULL);
2305 }
2306
2307 workq_lock_spin(wq);
2308 if (_wq_exiting(wq)) {
2309 workq_unlock(wq);
2310 return false;
2311 }
2312
2313 if (uth && workq_threadreq_admissible(wq, uth, req)) {
2314 assert(uth != wq->wq_creator);
2315 workq_threadreq_bind_and_unlock(p, wq, req, uth);
2316 } else {
2317 if (workloop_ts) {
2318 workq_perform_turnstile_operation_locked(wq, ^{
2319 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
2320 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
2321 turnstile_update_inheritor_complete(workloop_ts,
2322 TURNSTILE_INTERLOCK_HELD);
2323 });
2324 }
2325 if (workq_threadreq_enqueue(wq, req)) {
2326 workq_schedule_creator(p, wq, flags);
2327 }
2328 workq_unlock(wq);
2329 }
2330
2331 return true;
2332}
2333
2334void
2335workq_kern_threadreq_modify(struct proc *p, struct kqrequest *kqr,
2336 thread_qos_t qos, int flags)
2337{
2338 struct workqueue *wq = proc_get_wqptr_fast(p);
2339 workq_threadreq_t req = &kqr->kqr_req;
2340 bool change_overcommit = false;
2341
2342 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
2343 /* Requests outside-of-QoS shouldn't accept modify operations */
2344 return;
2345 }
2346
2347 workq_lock_spin(wq);
2348
2349 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
2350 assert(req->tr_flags & (TR_FLAG_KEVENT | TR_FLAG_WORKLOOP));
2351
2352 if (req->tr_state == TR_STATE_BINDING) {
2353 kqueue_threadreq_bind(p, req, req->tr_binding_thread, 0);
2354 workq_unlock(wq);
2355 return;
2356 }
2357
2358 change_overcommit = (bool)(kqr->kqr_state & KQR_THOVERCOMMIT) !=
2359 (bool)(req->tr_flags & TR_FLAG_OVERCOMMIT);
2360
2361 if (_wq_exiting(wq) || (req->tr_qos == qos && !change_overcommit)) {
2362 workq_unlock(wq);
2363 return;
2364 }
2365
2366 assert(req->tr_count == 1);
2367 if (req->tr_state != TR_STATE_QUEUED) {
2368 panic("Invalid thread request (%p) state %d", req, req->tr_state);
2369 }
2370
2371 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
2372 workq_trace_req_id(req), qos, 0, 0);
2373
2374 struct priority_queue *pq = workq_priority_queue_for_req(wq, req);
2375 workq_threadreq_t req_max;
2376
2377 /*
2378 * Stage 1: Dequeue the request from its priority queue.
2379 *
2380 * If we dequeue the root item of the constrained priority queue,
2381 * maintain the best constrained request qos invariant.
2382 */
2383 if (priority_queue_remove(pq, &req->tr_entry,
2384 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
2385 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2386 _wq_thactive_refresh_best_constrained_req_qos(wq);
2387 }
2388 }
2389
2390 /*
2391 * Stage 2: Apply changes to the thread request
2392 *
2393 * If the item will not become the root of the priority queue it belongs to,
2394 * then we need to wait in line, just enqueue and return quickly.
2395 */
2396 if (__improbable(change_overcommit)) {
2397 req->tr_flags ^= TR_FLAG_OVERCOMMIT;
2398 pq = workq_priority_queue_for_req(wq, req);
2399 }
2400 req->tr_qos = qos;
2401
2402 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
2403 if (req_max && req_max->tr_qos >= qos) {
2404 priority_queue_insert(pq, &req->tr_entry, workq_priority_for_req(req),
2405 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE);
2406 workq_unlock(wq);
2407 return;
2408 }
2409
2410 /*
2411 * Stage 3: Reevaluate whether we should run the thread request.
2412 *
2413 * Pretend the thread request is new again:
2414 * - adjust wq_reqcount to not count it anymore.
2415 * - make its state TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2416 * properly attempts a synchronous bind)
2417 */
2418 wq->wq_reqcount--;
2419 req->tr_state = TR_STATE_NEW;
2420 if (workq_threadreq_enqueue(wq, req)) {
2421 workq_schedule_creator(p, wq, flags);
2422 }
2423 workq_unlock(wq);
2424}
2425
2426void
2427workq_kern_threadreq_lock(struct proc *p)
2428{
2429 workq_lock_spin(proc_get_wqptr_fast(p));
2430}
2431
2432void
2433workq_kern_threadreq_unlock(struct proc *p)
2434{
2435 workq_unlock(proc_get_wqptr_fast(p));
2436}
2437
2438void
2439workq_kern_threadreq_update_inheritor(struct proc *p, struct kqrequest *kqr,
2440 thread_t owner, struct turnstile *wl_ts,
2441 turnstile_update_flags_t flags)
2442{
2443 struct workqueue *wq = proc_get_wqptr_fast(p);
2444 workq_threadreq_t req = &kqr->kqr_req;
2445 turnstile_inheritor_t inheritor;
2446
2447 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
2448 assert(req->tr_flags & TR_FLAG_WORKLOOP);
2449 workq_lock_held(wq);
2450
2451 if (req->tr_state == TR_STATE_BINDING) {
2452 kqueue_threadreq_bind(p, req, req->tr_binding_thread,
2453 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
2454 return;
2455 }
2456
2457 if (_wq_exiting(wq)) {
2458 inheritor = TURNSTILE_INHERITOR_NULL;
2459 } else {
2460 if (req->tr_state != TR_STATE_QUEUED) {
2461 panic("Invalid thread request (%p) state %d", req, req->tr_state);
2462 }
2463
2464 if (owner) {
2465 inheritor = owner;
2466 flags |= TURNSTILE_INHERITOR_THREAD;
2467 } else {
2468 inheritor = wq->wq_turnstile;
2469 flags |= TURNSTILE_INHERITOR_TURNSTILE;
2470 }
2471 }
2472
2473 workq_perform_turnstile_operation_locked(wq, ^{
2474 turnstile_update_inheritor(wl_ts, inheritor, flags);
2475 });
2476}
2477
2478void
2479workq_kern_threadreq_redrive(struct proc *p, int flags)
2480{
2481 struct workqueue *wq = proc_get_wqptr_fast(p);
2482
2483 workq_lock_spin(wq);
2484 workq_schedule_creator(p, wq, flags);
2485 workq_unlock(wq);
2486}
2487
2488void
2489workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
2490{
2491 if (!locked) workq_lock_spin(wq);
2492 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_CREATOR_SYNC_UPDATE);
2493 if (!locked) workq_unlock(wq);
2494}
2495
2496static int
2497workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
2498 struct workqueue *wq)
2499{
2500 thread_t th = current_thread();
2501 struct uthread *uth = get_bsdthread_info(th);
2502 struct kqrequest *kqr = uth->uu_kqr_bound;
2503 workq_threadreq_param_t trp = { };
2504 int nevents = uap->affinity, error;
2505 user_addr_t eventlist = uap->item;
2506
2507 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
2508 (uth->uu_workq_flags & UT_WORKQ_DYING)) {
2509 return EINVAL;
2510 }
2511
2512 if (eventlist && nevents && kqr == NULL) {
2513 return EINVAL;
2514 }
2515
2516 /* reset signal mask on the workqueue thread to default state */
2517 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
2518 proc_lock(p);
2519 uth->uu_sigmask = ~workq_threadmask;
2520 proc_unlock(p);
2521 }
2522
2523 if (kqr && kqr->kqr_req.tr_flags & TR_FLAG_WL_PARAMS) {
2524 /*
2525 * Ensure we store the threadreq param before unbinding
2526 * the kqr from this thread.
2527 */
2528 trp = kqueue_threadreq_workloop_param(&kqr->kqr_req);
2529 }
2530
2531 if (kqr) {
2532 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
2533 if (kqr->kqr_state & KQR_WORKLOOP) {
2534 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
2535 } else {
2536 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
2537 }
2538 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2539 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
2540 } else {
2541 if (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) {
2542 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2543 }
2544 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
2545 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
2546 } else {
2547 upcall_flags |= uth->uu_workq_pri.qos_req |
2548 WQ_FLAG_THREAD_PRIO_QOS;
2549 }
2550 }
2551
2552 error = pthread_functions->workq_handle_stack_events(p, th,
2553 get_task_map(p->task), uth->uu_workq_stackaddr,
2554 uth->uu_workq_thport, eventlist, nevents, upcall_flags);
2555 if (error) return error;
2556
2557 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2558 // which should cause the above call to either:
2559 // - not return
2560 // - return an error
2561 // - return 0 and have unbound properly
2562 assert(uth->uu_kqr_bound == NULL);
2563 }
2564
2565 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0, 0);
2566
2567 thread_sched_call(th, NULL);
2568 thread_will_park_or_terminate(th);
2569#if CONFIG_WORKLOOP_DEBUG
2570 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
2571#endif
2572
2573 workq_lock_spin(wq);
2574 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2575 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
2576 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
2577 __builtin_unreachable();
2578}
2579
2580/**
2581 * Multiplexed call to interact with the workqueue mechanism
2582 */
2583int
2584workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
2585{
2586 int options = uap->options;
2587 int arg2 = uap->affinity;
2588 int arg3 = uap->prio;
2589 struct workqueue *wq = proc_get_wqptr(p);
2590 int error = 0;
2591
2592 if ((p->p_lflag & P_LREGISTER) == 0) {
2593 return EINVAL;
2594 }
2595
2596 switch (options) {
2597 case WQOPS_QUEUE_NEWSPISUPP: {
2598 /*
2599 * arg2 = offset of serialno into dispatch queue
2600 * arg3 = kevent support
2601 */
2602 int offset = arg2;
2603 if (arg3 & 0x01){
2604 // If we get here, then userspace has indicated support for kevent delivery.
2605 }
2606
2607 p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
2608 break;
2609 }
2610 case WQOPS_QUEUE_REQTHREADS: {
2611 /*
2612 * arg2 = number of threads to start
2613 * arg3 = priority
2614 */
2615 error = workq_reqthreads(p, arg2, arg3);
2616 break;
2617 }
2618 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
2619 /*
2620 * arg2 = priority for the manager thread
2621 *
2622 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2623 * the low bits of the value contains a scheduling priority
2624 * instead of a QOS value
2625 */
2626 pthread_priority_t pri = arg2;
2627
2628 if (wq == NULL) {
2629 error = EINVAL;
2630 break;
2631 }
2632
2633 /*
2634 * Normalize the incoming priority so that it is ordered numerically.
2635 */
2636 if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
2637 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
2638 _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
2639 } else {
2640 thread_qos_t qos = _pthread_priority_thread_qos(pri);
2641 int relpri = _pthread_priority_relpri(pri);
2642 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
2643 qos == THREAD_QOS_UNSPECIFIED) {
2644 error = EINVAL;
2645 break;
2646 }
2647 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
2648 }
2649
2650 /*
2651 * If userspace passes a scheduling priority, that wins over any QoS.
2652 * Userspace should takes care not to lower the priority this way.
2653 */
2654 workq_lock_spin(wq);
2655 if (wq->wq_event_manager_priority < (uint32_t)pri) {
2656 wq->wq_event_manager_priority = (uint32_t)pri;
2657 }
2658 workq_unlock(wq);
2659 break;
2660 }
2661 case WQOPS_THREAD_KEVENT_RETURN:
2662 case WQOPS_THREAD_WORKLOOP_RETURN:
2663 case WQOPS_THREAD_RETURN: {
2664 error = workq_thread_return(p, uap, wq);
2665 break;
2666 }
2667
2668 case WQOPS_SHOULD_NARROW: {
2669 /*
2670 * arg2 = priority to test
2671 * arg3 = unused
2672 */
2673 thread_t th = current_thread();
2674 struct uthread *uth = get_bsdthread_info(th);
2675 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
2676 (uth->uu_workq_flags & (UT_WORKQ_DYING|UT_WORKQ_OVERCOMMIT))) {
2677 error = EINVAL;
2678 break;
2679 }
2680
2681 thread_qos_t qos = _pthread_priority_thread_qos(arg2);
2682 if (qos == THREAD_QOS_UNSPECIFIED) {
2683 error = EINVAL;
2684 break;
2685 }
2686 workq_lock_spin(wq);
2687 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
2688 workq_unlock(wq);
2689
2690 *retval = should_narrow;
2691 break;
2692 }
2693 default:
2694 error = EINVAL;
2695 break;
2696 }
2697
2698 return (error);
2699}
2700
2701/*
2702 * We have no work to do, park ourselves on the idle list.
2703 *
2704 * Consumes the workqueue lock and does not return.
2705 */
2706__attribute__((noreturn, noinline))
2707static void
2708workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth)
2709{
2710 assert(uth == current_uthread());
2711 assert(uth->uu_kqr_bound == NULL);
2712 workq_push_idle_thread(p, wq, uth); // may not return
2713
2714 workq_thread_reset_cpupercent(NULL, uth);
2715
2716 if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
2717 workq_unlock(wq);
2718
2719 /*
2720 * workq_push_idle_thread() will unset `has_stack`
2721 * if it wants us to free the stack before parking.
2722 */
2723 if (!uth->uu_save.uus_workq_park_data.has_stack) {
2724 pthread_functions->workq_markfree_threadstack(p, uth->uu_thread,
2725 get_task_map(p->task), uth->uu_workq_stackaddr);
2726 }
2727
2728 /*
2729 * When we remove the voucher from the thread, we may lose our importance
2730 * causing us to get preempted, so we do this after putting the thread on
2731 * the idle list. Then, when we get our importance back we'll be able to
2732 * use this thread from e.g. the kevent call out to deliver a boosting
2733 * message.
2734 */
2735 __assert_only kern_return_t kr;
2736 kr = thread_set_voucher_name(MACH_PORT_NULL);
2737 assert(kr == KERN_SUCCESS);
2738
2739 workq_lock_spin(wq);
2740 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
2741 }
2742
2743 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
2744 /*
2745 * While we'd dropped the lock to unset our voucher, someone came
2746 * around and made us runnable. But because we weren't waiting on the
2747 * event their thread_wakeup() was ineffectual. To correct for that,
2748 * we just run the continuation ourselves.
2749 */
2750 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2751 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
2752 __builtin_unreachable();
2753 }
2754
2755 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2756 workq_unpark_for_death_and_unlock(p, wq, uth,
2757 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE);
2758 __builtin_unreachable();
2759 }
2760
2761 thread_set_pending_block_hint(uth->uu_thread, kThreadWaitParkedWorkQueue);
2762 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
2763 workq_unlock(wq);
2764 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2765 thread_block(workq_unpark_continue);
2766 __builtin_unreachable();
2767}
2768
2769static inline bool
2770workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
2771{
2772 /*
2773 * There's an event manager request and either:
2774 * - no event manager currently running
2775 * - we are re-using the event manager
2776 */
2777 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
2778 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
2779}
2780
2781static uint32_t
2782workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
2783 struct uthread *uth, bool may_start_timer)
2784{
2785 assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
2786 uint32_t count = 0;
2787
2788 uint32_t max_count = wq->wq_constrained_threads_scheduled;
2789 if (uth && (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
2790 /*
2791 * don't count the current thread as scheduled
2792 */
2793 assert(max_count > 0);
2794 max_count--;
2795 }
2796 if (max_count >= wq_max_constrained_threads) {
2797 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
2798 wq->wq_constrained_threads_scheduled,
2799 wq_max_constrained_threads, 0);
2800 /*
2801 * we need 1 or more constrained threads to return to the kernel before
2802 * we can dispatch additional work
2803 */
2804 return 0;
2805 }
2806 max_count -= wq_max_constrained_threads;
2807
2808 /*
2809 * Compute a metric for many how many threads are active. We find the
2810 * highest priority request outstanding and then add up the number of
2811 * active threads in that and all higher-priority buckets. We'll also add
2812 * any "busy" threads which are not active but blocked recently enough that
2813 * we can't be sure they've gone idle yet. We'll then compare this metric
2814 * to our max concurrency to decide whether to add a new thread.
2815 */
2816
2817 uint32_t busycount, thactive_count;
2818
2819 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
2820 at_qos, &busycount, NULL);
2821
2822 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
2823 at_qos <= uth->uu_workq_pri.qos_bucket) {
2824 /*
2825 * Don't count this thread as currently active, but only if it's not
2826 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2827 * managers.
2828 */
2829 assert(thactive_count > 0);
2830 thactive_count--;
2831 }
2832
2833 count = wq_max_parallelism[_wq_bucket(at_qos)];
2834 if (count > thactive_count + busycount) {
2835 count -= thactive_count + busycount;
2836 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
2837 thactive_count, busycount, 0);
2838 return MIN(count, max_count);
2839 } else {
2840 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
2841 thactive_count, busycount, 0);
2842 }
2843
2844 if (busycount && may_start_timer) {
2845 /*
2846 * If this is called from the add timer, we won't have another timer
2847 * fire when the thread exits the "busy" state, so rearm the timer.
2848 */
2849 workq_schedule_delayed_thread_creation(wq, 0);
2850 }
2851
2852 return 0;
2853}
2854
2855static bool
2856workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
2857 workq_threadreq_t req)
2858{
2859 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
2860 return workq_may_start_event_mgr_thread(wq, uth);
2861 }
2862 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2863 return workq_constrained_allowance(wq, req->tr_qos, uth, true);
2864 }
2865 return true;
2866}
2867
2868static workq_threadreq_t
2869workq_threadreq_select_for_creator(struct workqueue *wq)
2870{
2871 workq_threadreq_t req_qos, req_pri, req_tmp;
2872 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
2873 uint8_t pri = 0;
2874
2875 req_tmp = wq->wq_event_manager_threadreq;
2876 if (req_tmp && workq_may_start_event_mgr_thread(wq, NULL)) {
2877 return req_tmp;
2878 }
2879
2880 /*
2881 * Compute the best priority request, and ignore the turnstile for now
2882 */
2883
2884 req_pri = priority_queue_max(&wq->wq_special_queue,
2885 struct workq_threadreq_s, tr_entry);
2886 if (req_pri) {
2887 pri = priority_queue_entry_key(&wq->wq_special_queue, &req_pri->tr_entry);
2888 }
2889
2890 /*
2891 * Compute the best QoS Request, and check whether it beats the "pri" one
2892 */
2893
2894 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
2895 struct workq_threadreq_s, tr_entry);
2896 if (req_qos) {
2897 qos = req_qos->tr_qos;
2898 }
2899
2900 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
2901 struct workq_threadreq_s, tr_entry);
2902
2903 if (req_tmp && qos < req_tmp->tr_qos) {
2904 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
2905 return req_pri;
2906 }
2907
2908 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
2909 /*
2910 * If the constrained thread request is the best one and passes
2911 * the admission check, pick it.
2912 */
2913 return req_tmp;
2914 }
2915 }
2916
2917 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
2918 return req_pri;
2919 }
2920
2921 if (req_qos) {
2922 return req_qos;
2923 }
2924
2925 /*
2926 * If we had no eligible request but we have a turnstile push,
2927 * it must be a non overcommit thread request that failed
2928 * the admission check.
2929 *
2930 * Just fake a BG thread request so that if the push stops the creator
2931 * priority just drops to 4.
2932 */
2933 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
2934 static struct workq_threadreq_s workq_sync_push_fake_req = {
2935 .tr_qos = THREAD_QOS_BACKGROUND,
2936 };
2937
2938 return &workq_sync_push_fake_req;
2939 }
2940
2941 return NULL;
2942}
2943
2944static workq_threadreq_t
2945workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
2946{
2947 workq_threadreq_t req_qos, req_pri, req_tmp;
2948 uintptr_t proprietor;
2949 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
2950 uint8_t pri = 0;
2951
2952 if (uth == wq->wq_creator) uth = NULL;
2953
2954 req_tmp = wq->wq_event_manager_threadreq;
2955 if (req_tmp && workq_may_start_event_mgr_thread(wq, uth)) {
2956 return req_tmp;
2957 }
2958
2959 /*
2960 * Compute the best priority request (special or turnstile)
2961 */
2962
2963 pri = turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
2964 &proprietor);
2965 if (pri) {
2966 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
2967 req_pri = &kqwl->kqwl_request.kqr_req;
2968 if (req_pri->tr_state != TR_STATE_QUEUED) {
2969 panic("Invalid thread request (%p) state %d",
2970 req_pri, req_pri->tr_state);
2971 }
2972 } else {
2973 req_pri = NULL;
2974 }
2975
2976 req_tmp = priority_queue_max(&wq->wq_special_queue,
2977 struct workq_threadreq_s, tr_entry);
2978 if (req_tmp && pri < priority_queue_entry_key(&wq->wq_special_queue,
2979 &req_tmp->tr_entry)) {
2980 req_pri = req_tmp;
2981 pri = priority_queue_entry_key(&wq->wq_special_queue, &req_tmp->tr_entry);
2982 }
2983
2984 /*
2985 * Compute the best QoS Request, and check whether it beats the "pri" one
2986 */
2987
2988 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
2989 struct workq_threadreq_s, tr_entry);
2990 if (req_qos) {
2991 qos = req_qos->tr_qos;
2992 }
2993
2994 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
2995 struct workq_threadreq_s, tr_entry);
2996
2997 if (req_tmp && qos < req_tmp->tr_qos) {
2998 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
2999 return req_pri;
3000 }
3001
3002 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
3003 /*
3004 * If the constrained thread request is the best one and passes
3005 * the admission check, pick it.
3006 */
3007 return req_tmp;
3008 }
3009 }
3010
3011 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3012 return req_pri;
3013 }
3014
3015 return req_qos;
3016}
3017
3018/*
3019 * The creator is an anonymous thread that is counted as scheduled,
3020 * but otherwise without its scheduler callback set or tracked as active
3021 * that is used to make other threads.
3022 *
3023 * When more requests are added or an existing one is hurried along,
3024 * a creator is elected and setup, or the existing one overridden accordingly.
3025 *
3026 * While this creator is in flight, because no request has been dequeued,
3027 * already running threads have a chance at stealing thread requests avoiding
3028 * useless context switches, and the creator once scheduled may not find any
3029 * work to do and will then just park again.
3030 *
3031 * The creator serves the dual purpose of informing the scheduler of work that
3032 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3033 * for thread creation.
3034 *
3035 * By being anonymous (and not bound to anything) it means that thread requests
3036 * can be stolen from this creator by threads already on core yielding more
3037 * efficient scheduling and reduced context switches.
3038 */
3039static void
3040workq_schedule_creator(proc_t p, struct workqueue *wq, int flags)
3041{
3042 workq_threadreq_t req;
3043 struct uthread *uth;
3044
3045 workq_lock_held(wq);
3046 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
3047
3048again:
3049 uth = wq->wq_creator;
3050
3051 if (!wq->wq_reqcount) {
3052 if (uth == NULL) {
3053 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
3054 }
3055 return;
3056 }
3057
3058 req = workq_threadreq_select_for_creator(wq);
3059 if (req == NULL) {
3060 if (flags & WORKQ_THREADREQ_CREATOR_SYNC_UPDATE) {
3061 assert((flags & WORKQ_THREADREQ_CREATOR_TRANSFER) == 0);
3062 /*
3063 * turnstile propagation code is reaching out to us,
3064 * and we still don't want to do anything, do not recurse.
3065 */
3066 } else {
3067 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
3068 }
3069 return;
3070 }
3071
3072 if (uth) {
3073 /*
3074 * We need to maybe override the creator we already have
3075 */
3076 if (workq_thread_needs_priority_change(req, uth)) {
3077 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
3078 wq, 1, thread_tid(uth->uu_thread), req->tr_qos, 0);
3079 workq_thread_reset_pri(wq, uth, req);
3080 }
3081 } else if (wq->wq_thidlecount) {
3082 /*
3083 * We need to unpark a creator thread
3084 */
3085 wq->wq_creator = uth = workq_pop_idle_thread(wq);
3086 if (workq_thread_needs_priority_change(req, uth)) {
3087 workq_thread_reset_pri(wq, uth, req);
3088 }
3089 workq_turnstile_update_inheritor(wq, uth->uu_thread,
3090 TURNSTILE_INHERITOR_THREAD);
3091 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
3092 wq, 2, thread_tid(uth->uu_thread), req->tr_qos, 0);
3093 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
3094 uth->uu_save.uus_workq_park_data.yields = 0;
3095 workq_thread_wakeup(uth);
3096 } else {
3097 /*
3098 * We need to allocate a thread...
3099 */
3100 if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
3101 /* out of threads, just go away */
3102 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
3103 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
3104 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
3105 /* This can drop the workqueue lock, and take it again */
3106 workq_schedule_immediate_thread_creation(wq);
3107 } else if (workq_add_new_idle_thread(p, wq)) {
3108 goto again;
3109 } else {
3110 workq_schedule_delayed_thread_creation(wq, 0);
3111 }
3112
3113 if (flags & WORKQ_THREADREQ_CREATOR_TRANSFER) {
3114 /*
3115 * workq_schedule_creator() failed at creating a thread,
3116 * and the responsibility of redriving is now with a thread-call.
3117 *
3118 * We still need to tell the turnstile the previous creator is gone.
3119 */
3120 workq_turnstile_update_inheritor(wq, NULL, 0);
3121 }
3122 }
3123}
3124
3125/**
3126 * Runs a thread request on a thread
3127 *
3128 * - if thread is THREAD_NULL, will find a thread and run the request there.
3129 * Otherwise, the thread must be the current thread.
3130 *
3131 * - if req is NULL, will find the highest priority request and run that. If
3132 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3133 * be run immediately, it will be enqueued and moved to state QUEUED.
3134 *
3135 * Either way, the thread request object serviced will be moved to state
3136 * BINDING and attached to the uthread.
3137 *
3138 * Should be called with the workqueue lock held. Will drop it.
3139 */
3140__attribute__((noreturn, noinline))
3141static void
3142workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
3143 struct uthread *uth)
3144{
3145 uint32_t setup_flags = 0;
3146 workq_threadreq_t req;
3147
3148 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
3149 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
3150 setup_flags |= WQ_SETUP_FIRST_USE;
3151 }
3152 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
3153 /*
3154 * This pointer is possibly freed and only used for tracing purposes.
3155 */
3156 req = uth->uu_save.uus_workq_park_data.thread_request;
3157 workq_unlock(wq);
3158 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3159 VM_KERNEL_ADDRHIDE(req), 0, 0, 0);
3160 goto run;
3161 } else if (_wq_exiting(wq)) {
3162 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
3163 } else if (wq->wq_reqcount == 0) {
3164 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0, 0);
3165 } else if ((req = workq_threadreq_select(wq, uth)) == NULL) {
3166 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0, 0);
3167 } else {
3168 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3169 workq_trace_req_id(req), 0, 0, 0);
3170 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
3171 uth->uu_workq_flags ^= UT_WORKQ_NEW;
3172 setup_flags |= WQ_SETUP_FIRST_USE;
3173 }
3174 workq_thread_reset_cpupercent(req, uth);
3175 workq_threadreq_bind_and_unlock(p, wq, req, uth);
3176run:
3177 workq_setup_and_run(p, uth, setup_flags);
3178 __builtin_unreachable();
3179 }
3180
3181 workq_park_and_unlock(p, wq, uth);
3182 __builtin_unreachable();
3183}
3184
3185static bool
3186workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
3187{
3188 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
3189
3190 if (qos >= THREAD_QOS_USER_INTERACTIVE) {
3191 return false;
3192 }
3193
3194 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
3195 if (wq->wq_fulfilled == snapshot) {
3196 return false;
3197 }
3198
3199 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
3200 if (wq->wq_fulfilled - snapshot > conc) {
3201 /* we fulfilled more than NCPU requests since being dispatched */
3202 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
3203 wq->wq_fulfilled, snapshot, 0);
3204 return true;
3205 }
3206
3207 for (int i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
3208 cnt += wq->wq_thscheduled_count[i];
3209 }
3210 if (conc <= cnt) {
3211 /* We fulfilled requests and have more than NCPU scheduled threads */
3212 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
3213 wq->wq_fulfilled, snapshot, 0);
3214 return true;
3215 }
3216
3217 return false;
3218}
3219
3220/**
3221 * parked thread wakes up
3222 */
3223__attribute__((noreturn, noinline))
3224static void
3225workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
3226{
3227 struct uthread *uth = current_uthread();
3228 proc_t p = current_proc();
3229 struct workqueue *wq = proc_get_wqptr_fast(p);
3230
3231 workq_lock_spin(wq);
3232
3233 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
3234 /*
3235 * If the number of threads we have out are able to keep up with the
3236 * demand, then we should avoid sending this creator thread to
3237 * userspace.
3238 */
3239 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
3240 uth->uu_save.uus_workq_park_data.yields++;
3241 workq_unlock(wq);
3242 thread_yield_with_continuation(workq_unpark_continue, NULL);
3243 __builtin_unreachable();
3244 }
3245
3246 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3247 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
3248 __builtin_unreachable();
3249 }
3250
3251 if (__probable(wr == THREAD_AWAKENED)) {
3252 /*
3253 * We were set running, but for the purposes of dying.
3254 */
3255 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3256 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3257 } else {
3258 /*
3259 * workaround for <rdar://problem/38647347>,
3260 * in case we do hit userspace, make sure calling
3261 * workq_thread_terminate() does the right thing here,
3262 * and if we never call it, that workq_exit() will too because it sees
3263 * this thread on the runlist.
3264 */
3265 assert(wr == THREAD_INTERRUPTED);
3266 wq->wq_thdying_count++;
3267 uth->uu_workq_flags |= UT_WORKQ_DYING;
3268 }
3269
3270 workq_unpark_for_death_and_unlock(p, wq, uth,
3271 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE);
3272 __builtin_unreachable();
3273}
3274
3275__attribute__((noreturn, noinline))
3276static void
3277workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
3278{
3279 thread_t th = uth->uu_thread;
3280 vm_map_t vmap = get_task_map(p->task);
3281
3282 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
3283 /*
3284 * For preemption reasons, we want to reset the voucher as late as
3285 * possible, so we do it in two places:
3286 * - Just before parking (i.e. in workq_park_and_unlock())
3287 * - Prior to doing the setup for the next workitem (i.e. here)
3288 *
3289 * Those two places are sufficient to ensure we always reset it before
3290 * it goes back out to user space, but be careful to not break that
3291 * guarantee.
3292 */
3293 __assert_only kern_return_t kr;
3294 kr = thread_set_voucher_name(MACH_PORT_NULL);
3295 assert(kr == KERN_SUCCESS);
3296 }
3297
3298 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
3299 if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
3300 upcall_flags |= WQ_FLAG_THREAD_REUSE;
3301 }
3302
3303 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3304 /*
3305 * For threads that have an outside-of-QoS thread priority, indicate
3306 * to userspace that setting QoS should only affect the TSD and not
3307 * change QOS in the kernel.
3308 */
3309 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3310 } else {
3311 /*
3312 * Put the QoS class value into the lower bits of the reuse_thread
3313 * register, this is where the thread priority used to be stored
3314 * anyway.
3315 */
3316 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
3317 WQ_FLAG_THREAD_PRIO_QOS;
3318 }
3319
3320 if (uth->uu_workq_thport == MACH_PORT_NULL) {
3321 /* convert_thread_to_port() consumes a reference */
3322 thread_reference(th);
3323 ipc_port_t port = convert_thread_to_port(th);
3324 uth->uu_workq_thport = ipc_port_copyout_send(port, get_task_ipcspace(p->task));
3325 }
3326
3327 /*
3328 * Call out to pthread, this sets up the thread, pulls in kevent structs
3329 * onto the stack, sets up the thread state and then returns to userspace.
3330 */
3331 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
3332 proc_get_wqptr_fast(p), 0, 0, 0, 0);
3333 thread_sched_call(th, workq_sched_callback);
3334 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
3335 uth->uu_workq_thport, 0, setup_flags, upcall_flags);
3336
3337 __builtin_unreachable();
3338}
3339
3340#pragma mark misc
3341
3342int
3343fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
3344{
3345 struct workqueue *wq = proc_get_wqptr(p);
3346 int error = 0;
3347 int activecount;
3348
3349 if (wq == NULL) {
3350 return EINVAL;
3351 }
3352
3353 /*
3354 * This is sometimes called from interrupt context by the kperf sampler.
3355 * In that case, it's not safe to spin trying to take the lock since we
3356 * might already hold it. So, we just try-lock it and error out if it's
3357 * already held. Since this is just a debugging aid, and all our callers
3358 * are able to handle an error, that's fine.
3359 */
3360 bool locked = workq_lock_try(wq);
3361 if (!locked) {
3362 return EBUSY;
3363 }
3364
3365 wq_thactive_t act = _wq_thactive(wq);
3366 activecount = _wq_thactive_aggregate_downto_qos(wq, act,
3367 WORKQ_THREAD_QOS_MIN, NULL, NULL);
3368 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
3369 activecount++;
3370 }
3371 pwqinfo->pwq_nthreads = wq->wq_nthreads;
3372 pwqinfo->pwq_runthreads = activecount;
3373 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
3374 pwqinfo->pwq_state = 0;
3375
3376 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
3377 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
3378 }
3379
3380 if (wq->wq_nthreads >= wq_max_threads) {
3381 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
3382 }
3383
3384 workq_unlock(wq);
3385 return error;
3386}
3387
3388boolean_t
3389workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
3390 boolean_t *exceeded_constrained)
3391{
3392 proc_t p = v;
3393 struct proc_workqueueinfo pwqinfo;
3394 int err;
3395
3396 assert(p != NULL);
3397 assert(exceeded_total != NULL);
3398 assert(exceeded_constrained != NULL);
3399
3400 err = fill_procworkqueue(p, &pwqinfo);
3401 if (err) {
3402 return FALSE;
3403 }
3404 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
3405 return FALSE;
3406 }
3407
3408 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
3409 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
3410
3411 return TRUE;
3412}
3413
3414uint32_t
3415workqueue_get_pwq_state_kdp(void * v)
3416{
3417 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
3418 kTaskWqExceededConstrainedThreadLimit);
3419 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
3420 kTaskWqExceededTotalThreadLimit);
3421 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
3422 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
3423 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
3424
3425 if (v == NULL) {
3426 return 0;
3427 }
3428
3429 proc_t p = v;
3430 struct workqueue *wq = proc_get_wqptr(p);
3431
3432 if (wq == NULL || workq_lock_spin_is_acquired_kdp(wq)) {
3433 return 0;
3434 }
3435
3436 uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
3437
3438 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
3439 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
3440 }
3441
3442 if (wq->wq_nthreads >= wq_max_threads) {
3443 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
3444 }
3445
3446 return pwq_state;
3447}
3448
3449void
3450workq_init(void)
3451{
3452 workq_lck_grp_attr = lck_grp_attr_alloc_init();
3453 workq_lck_attr = lck_attr_alloc_init();
3454 workq_lck_grp = lck_grp_alloc_init("workq", workq_lck_grp_attr);
3455
3456 workq_zone_workqueue = zinit(sizeof(struct workqueue),
3457 1024 * sizeof(struct workqueue), 8192, "workq.wq");
3458 workq_zone_threadreq = zinit(sizeof(struct workq_threadreq_s),
3459 1024 * sizeof(struct workq_threadreq_s), 8192, "workq.threadreq");
3460
3461 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
3462 NSEC_PER_USEC, &wq_stalled_window.abstime);
3463 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
3464 NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
3465 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
3466 NSEC_PER_USEC, &wq_max_timer_interval.abstime);
3467}
3468