1/*
2 * Copyright (c) 2018 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29#include <machine/machine_cpu.h>
30#include <kern/locks.h>
31#include <kern/mpsc_queue.h>
32#include <kern/queue.h>
33#include <kern/thread.h>
34
35#pragma mark Validaation panics for queues in general
36
37__abortlike
38void
39__queue_element_linkage_invalid(queue_entry_t elt)
40{
41 queue_entry_t prev = elt->prev;
42 queue_entry_t next = elt->next;
43
44 panic("Invalid queue linkage: elt:%p {prev:%p, next:%p, "
45 "prev->next:%p, next->prev:%p}",
46 elt, prev, next, prev->next, next->prev);
47}
48
49#pragma mark Single Consumer calls
50
51__attribute__((noinline))
52static mpsc_queue_chain_t
53_mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr)
54{
55 return hw_wait_while_equals_long(ptr, NULL);
56}
57
58void
59mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first,
60 mpsc_queue_chain_t last)
61{
62 mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
63
64 os_atomic_store(&last->mpqc_next, head, relaxed);
65
66 if (head == NULL &&
67 !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) {
68 head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
69 if (__improbable(head == NULL)) {
70 head = _mpsc_queue_wait_for_enqueuer(ptr: &q->mpqh_head.mpqc_next);
71 }
72 os_atomic_store(&last->mpqc_next, head, relaxed);
73 }
74
75 os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed);
76}
77
78mpsc_queue_chain_t
79mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out,
80 os_atomic_dependency_t dependency)
81{
82 mpsc_queue_chain_t head, tail;
83
84 q = os_atomic_inject_dependency(q, dependency);
85
86 tail = os_atomic_load(&q->mpqh_tail, relaxed);
87 if (__improbable(tail == &q->mpqh_head)) {
88 *tail_out = NULL;
89 return NULL;
90 }
91
92 head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
93 if (__improbable(head == NULL)) {
94 head = _mpsc_queue_wait_for_enqueuer(ptr: &q->mpqh_head.mpqc_next);
95 }
96 os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed);
97 /*
98 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
99 * to head above doesn't clobber the head set by concurrent enqueuer
100 *
101 * The other half of the seq_cst is required to pair with any enqueuer that
102 * contributed to an element in this list (pairs with the release fence in
103 * __mpsc_queue_append_update_tail().
104 *
105 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
106 * visibility transitive (when items hop from one queue to the next)
107 * which is expected by clients implicitly.
108 *
109 * Note that this is the same number of fences that a traditional lock
110 * would have, but as a once-per-batch cost.
111 */
112 *tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst);
113
114 return head;
115}
116
117mpsc_queue_chain_t
118mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail)
119{
120 mpsc_queue_chain_t elm = NULL;
121 if (cur == tail || cur == NULL) {
122 return elm;
123 }
124
125 elm = os_atomic_load(&cur->mpqc_next, relaxed);
126 if (__improbable(elm == NULL)) {
127 elm = _mpsc_queue_wait_for_enqueuer(ptr: &cur->mpqc_next);
128 }
129 return elm;
130}
131
132#pragma mark "GCD"-like facilities
133
134static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t);
135static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t);
136
137/* thread based queues */
138
139static void
140_mpsc_daemon_queue_init(mpsc_daemon_queue_t dq, mpsc_daemon_init_options_t flags)
141{
142 if (flags & MPSC_DAEMON_INIT_INACTIVE) {
143 os_atomic_init(&dq->mpd_state, MPSC_QUEUE_STATE_INACTIVE);
144 }
145}
146
147static void
148_mpsc_queue_thread_continue(void *param, wait_result_t wr __unused)
149{
150 mpsc_daemon_queue_t dq = param;
151 mpsc_daemon_queue_kind_t kind = dq->mpd_kind;
152 thread_t self = dq->mpd_thread;
153
154 __builtin_assume(self != THREAD_NULL);
155
156 if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
157 self->options |= TH_OPT_SYSTEM_CRITICAL;
158 }
159
160 assert(dq->mpd_thread == current_thread());
161 _mpsc_daemon_queue_drain(dq, self);
162
163 if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
164 self->options &= ~TH_OPT_SYSTEM_CRITICAL;
165 }
166
167 thread_block_parameter(continuation: _mpsc_queue_thread_continue, parameter: dq);
168}
169
170static void
171_mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)
172{
173 thread_wakeup_thread(event: (event_t)dq, thread: dq->mpd_thread);
174}
175
176static kern_return_t
177_mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
178 mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
179 mpsc_daemon_queue_kind_t kind, mpsc_daemon_init_options_t flags)
180{
181 kern_return_t kr;
182
183 *dq = (struct mpsc_daemon_queue){
184 .mpd_kind = kind,
185 .mpd_invoke = invoke,
186 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
187 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
188 };
189 _mpsc_daemon_queue_init(dq, flags);
190
191 kr = kernel_thread_create(continuation: _mpsc_queue_thread_continue, parameter: dq, priority: pri,
192 new_thread: &dq->mpd_thread);
193 if (kr == KERN_SUCCESS) {
194 thread_set_thread_name(th: dq->mpd_thread, name);
195 thread_start_in_assert_wait(thread: dq->mpd_thread,
196 waitq: assert_wait_queue(event: dq), CAST_EVENT64_T(dq),
197 THREAD_UNINT);
198 thread_deallocate(thread: dq->mpd_thread);
199 }
200 return kr;
201}
202
203kern_return_t
204mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
205 mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
206 mpsc_daemon_init_options_t flags)
207{
208 return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name,
209 kind: MPSC_QUEUE_KIND_THREAD, flags);
210}
211
212/* thread-call based queues */
213
214static void
215_mpsc_queue_thread_call_drain(thread_call_param_t arg0,
216 thread_call_param_t arg1 __unused)
217{
218 _mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL);
219}
220
221static void
222_mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)
223{
224 thread_call_enter(call: dq->mpd_call);
225}
226
227void
228mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,
229 mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri,
230 mpsc_daemon_init_options_t flags)
231{
232 *dq = (struct mpsc_daemon_queue){
233 .mpd_kind = MPSC_QUEUE_KIND_THREAD_CALL,
234 .mpd_invoke = invoke,
235 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
236 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
237 };
238 _mpsc_daemon_queue_init(dq, flags);
239 dq->mpd_call = thread_call_allocate_with_options(
240 func: _mpsc_queue_thread_call_drain, param0: dq, pri, options: THREAD_CALL_OPTIONS_ONCE);
241}
242
243/* nested queues */
244
245void
246mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,
247 __unused mpsc_daemon_queue_t tq)
248{
249 mpsc_daemon_queue_t dq;
250 dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain);
251 _mpsc_daemon_queue_drain(dq, NULL);
252}
253
254static void
255_mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)
256{
257 _mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain);
258}
259
260void
261mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,
262 mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target,
263 mpsc_daemon_init_options_t flags)
264{
265 *dq = (struct mpsc_daemon_queue){
266 .mpd_kind = MPSC_QUEUE_KIND_NESTED,
267 .mpd_invoke = invoke,
268 .mpd_target = target,
269 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
270 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
271 };
272 _mpsc_daemon_queue_init(dq, flags);
273}
274
275/* enqueue, drain & cancelation */
276
277static void
278_mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self)
279{
280 mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke;
281 mpsc_queue_chain_t head, cur, tail;
282 mpsc_daemon_queue_state_t st;
283
284again:
285 /*
286 * Most of the time we're woken up because we're dirty,
287 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
288 * in that case.
289 *
290 * However, if we're woken up for cancelation, the state may be reduced to
291 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
292 * We need to correct this and clear it back to avoid looping below.
293 * This is safe to do as no one is allowed to enqueue more work after
294 * cancelation has happened.
295 *
296 * We use `st` as a dependency token to pair with the release fence in
297 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
298 * to the tail of the MPSC queue that made it non empty is visible to us.
299 */
300 st = os_atomic_xor(&dq->mpd_state,
301 MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency);
302 assert(st & MPSC_QUEUE_STATE_DRAINING);
303 if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
304 assert(st & MPSC_QUEUE_STATE_CANCELED);
305 os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed);
306 }
307
308 os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st);
309 if ((head = mpsc_queue_dequeue_batch(q: &dq->mpd_queue, tail_out: &tail, dependency: dep))) {
310 do {
311 mpsc_queue_batch_foreach_safe(cur, head, tail) {
312 os_atomic_store(&cur->mpqc_next,
313 MPSC_QUEUE_NOTQUEUED_MARKER, relaxed);
314 invoke(cur, dq);
315 }
316 } while ((head = mpsc_queue_dequeue_batch(q: &dq->mpd_queue, tail_out: &tail, dependency: dep)));
317
318 if (dq->mpd_options & MPSC_QUEUE_OPTION_BATCH) {
319 invoke(MPSC_QUEUE_BATCH_END, dq);
320 }
321 }
322
323 if (self) {
324 assert_wait(event: (event_t)dq, THREAD_UNINT);
325 }
326
327 /*
328 * Unlike GCD no fence is necessary here: there is no concept similar
329 * to "dispatch_sync()" that would require changes this thread made to be
330 * visible to other threads as part of the mpsc_daemon_queue machinery.
331 *
332 * Making updates that happened on the daemon queue visible to other threads
333 * is the responsibility of the client.
334 */
335 st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed);
336
337 /*
338 * A wakeup has happened while we were draining,
339 * which means that the queue did an [ empty -> non empty ]
340 * transition during our drain.
341 *
342 * Chances are we already observed and drained everything,
343 * but we need to be absolutely sure, so start a drain again
344 * as the enqueuer observed the DRAINING bit and has skipped calling
345 * _mpsc_daemon_queue_wakeup().
346 */
347 if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
348 if (self) {
349 clear_wait(thread: self, THREAD_AWAKENED);
350 }
351 goto again;
352 }
353
354 /* dereferencing `dq` past this point is unsafe */
355
356 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
357 thread_wakeup(&dq->mpd_state);
358 if (self) {
359 clear_wait(thread: self, THREAD_AWAKENED);
360 thread_terminate_self();
361 __builtin_unreachable();
362 }
363 }
364}
365
366static void
367_mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)
368{
369 switch (dq->mpd_kind) {
370 case MPSC_QUEUE_KIND_NESTED:
371 _mpsc_daemon_queue_nested_wakeup(dq);
372 break;
373 case MPSC_QUEUE_KIND_THREAD:
374 case MPSC_QUEUE_KIND_THREAD_CRITICAL:
375 _mpsc_queue_thread_wakeup(dq);
376 break;
377 case MPSC_QUEUE_KIND_THREAD_CALL:
378 _mpsc_queue_thread_call_wakeup(dq);
379 break;
380 default:
381 panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
382 }
383}
384
385static void
386_mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm)
387{
388 mpsc_daemon_queue_state_t st;
389
390 if (mpsc_queue_append(q: &dq->mpd_queue, elm)) {
391 /*
392 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
393 */
394 st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release);
395 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
396 panic("mpsc_queue[%p]: use after cancelation", dq);
397 }
398
399 if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP |
400 MPSC_QUEUE_STATE_INACTIVE)) == 0) {
401 _mpsc_daemon_queue_wakeup(dq);
402 }
403 }
404}
405
406void
407mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm,
408 mpsc_queue_options_t options)
409{
410 if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
411 disable_preemption();
412 }
413
414 _mpsc_daemon_queue_enqueue(dq, elm);
415
416 if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
417 enable_preemption();
418 }
419}
420
421void
422mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq)
423{
424 mpsc_daemon_queue_state_t st;
425
426 st = os_atomic_andnot_orig(&dq->mpd_state,
427 MPSC_QUEUE_STATE_INACTIVE, relaxed);
428 if ((st & MPSC_QUEUE_STATE_WAKEUP) && (st & MPSC_QUEUE_STATE_INACTIVE)) {
429 _mpsc_daemon_queue_wakeup(dq);
430 }
431}
432
433void
434mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)
435{
436 mpsc_daemon_queue_state_t st;
437
438 assert_wait(event: (event_t)&dq->mpd_state, THREAD_UNINT);
439
440 st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed);
441 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
442 panic("mpsc_queue[%p]: cancelled twice (%x)", dq, st);
443 }
444 if (__improbable(st & MPSC_QUEUE_STATE_INACTIVE)) {
445 panic("mpsc_queue[%p]: queue is inactive (%x)", dq, st);
446 }
447
448 if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) {
449 clear_wait(thread: current_thread(), THREAD_AWAKENED);
450 } else {
451 disable_preemption();
452 _mpsc_daemon_queue_wakeup(dq);
453 enable_preemption();
454 thread_block(THREAD_CONTINUE_NULL);
455 }
456
457 switch (dq->mpd_kind) {
458 case MPSC_QUEUE_KIND_NESTED:
459 dq->mpd_target = NULL;
460 break;
461 case MPSC_QUEUE_KIND_THREAD:
462 case MPSC_QUEUE_KIND_THREAD_CRITICAL:
463 dq->mpd_thread = NULL;
464 break;
465 case MPSC_QUEUE_KIND_THREAD_CALL:
466 thread_call_cancel_wait(call: dq->mpd_call);
467 thread_call_free(call: dq->mpd_call);
468 dq->mpd_call = NULL;
469 break;
470 default:
471 panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
472 }
473 dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN;
474}
475
476#pragma mark deferred deallocation daemon
477
478static struct mpsc_daemon_queue thread_deferred_deallocation_queue;
479
480void
481thread_deallocate_daemon_init(void)
482{
483 kern_return_t kr;
484
485 kr = _mpsc_daemon_queue_init_with_thread(dq: &thread_deferred_deallocation_queue,
486 invoke: mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL,
487 name: "daemon.deferred-deallocation", kind: MPSC_QUEUE_KIND_THREAD_CRITICAL,
488 flags: MPSC_DAEMON_INIT_NONE);
489 if (kr != KERN_SUCCESS) {
490 panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr);
491 }
492}
493
494void
495thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,
496 mpsc_daemon_invoke_fn_t invoke)
497{
498 mpsc_daemon_queue_init_with_target(dq, invoke,
499 target: &thread_deferred_deallocation_queue, flags: MPSC_DAEMON_INIT_NONE);
500}
501