1 | /* |
2 | * Copyright (c) 2018 Apple Inc. All rights reserved. |
3 | * |
4 | * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ |
5 | * |
6 | * This file contains Original Code and/or Modifications of Original Code |
7 | * as defined in and that are subject to the Apple Public Source License |
8 | * Version 2.0 (the 'License'). You may not use this file except in |
9 | * compliance with the License. The rights granted to you under the License |
10 | * may not be used to create, or enable the creation or redistribution of, |
11 | * unlawful or unlicensed copies of an Apple operating system, or to |
12 | * circumvent, violate, or enable the circumvention or violation of, any |
13 | * terms of an Apple operating system software license agreement. |
14 | * |
15 | * Please obtain a copy of the License at |
16 | * http://www.opensource.apple.com/apsl/ and read it before using this file. |
17 | * |
18 | * The Original Code and all software distributed under the License are |
19 | * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER |
20 | * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, |
21 | * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, |
22 | * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. |
23 | * Please see the License for the specific language governing rights and |
24 | * limitations under the License. |
25 | * |
26 | * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ |
27 | */ |
28 | |
29 | #include <machine/machine_cpu.h> |
30 | #include <kern/locks.h> |
31 | #include <kern/mpsc_queue.h> |
32 | #include <kern/queue.h> |
33 | #include <kern/thread.h> |
34 | |
35 | #pragma mark Validaation panics for queues in general |
36 | |
37 | __abortlike |
38 | void |
39 | __queue_element_linkage_invalid(queue_entry_t elt) |
40 | { |
41 | queue_entry_t prev = elt->prev; |
42 | queue_entry_t next = elt->next; |
43 | |
44 | panic("Invalid queue linkage: elt:%p {prev:%p, next:%p, " |
45 | "prev->next:%p, next->prev:%p}" , |
46 | elt, prev, next, prev->next, next->prev); |
47 | } |
48 | |
49 | #pragma mark Single Consumer calls |
50 | |
51 | __attribute__((noinline)) |
52 | static mpsc_queue_chain_t |
53 | _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr) |
54 | { |
55 | return hw_wait_while_equals_long(ptr, NULL); |
56 | } |
57 | |
58 | void |
59 | mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first, |
60 | mpsc_queue_chain_t last) |
61 | { |
62 | mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); |
63 | |
64 | os_atomic_store(&last->mpqc_next, head, relaxed); |
65 | |
66 | if (head == NULL && |
67 | !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) { |
68 | head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); |
69 | if (__improbable(head == NULL)) { |
70 | head = _mpsc_queue_wait_for_enqueuer(ptr: &q->mpqh_head.mpqc_next); |
71 | } |
72 | os_atomic_store(&last->mpqc_next, head, relaxed); |
73 | } |
74 | |
75 | os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed); |
76 | } |
77 | |
78 | mpsc_queue_chain_t |
79 | mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out, |
80 | os_atomic_dependency_t dependency) |
81 | { |
82 | mpsc_queue_chain_t head, tail; |
83 | |
84 | q = os_atomic_inject_dependency(q, dependency); |
85 | |
86 | tail = os_atomic_load(&q->mpqh_tail, relaxed); |
87 | if (__improbable(tail == &q->mpqh_head)) { |
88 | *tail_out = NULL; |
89 | return NULL; |
90 | } |
91 | |
92 | head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); |
93 | if (__improbable(head == NULL)) { |
94 | head = _mpsc_queue_wait_for_enqueuer(ptr: &q->mpqh_head.mpqc_next); |
95 | } |
96 | os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed); |
97 | /* |
98 | * 22708742: set tail to &q->mpqh_head with release, so that NULL write |
99 | * to head above doesn't clobber the head set by concurrent enqueuer |
100 | * |
101 | * The other half of the seq_cst is required to pair with any enqueuer that |
102 | * contributed to an element in this list (pairs with the release fence in |
103 | * __mpsc_queue_append_update_tail(). |
104 | * |
105 | * Making this seq_cst instead of acq_rel makes mpsc_queue_append*() |
106 | * visibility transitive (when items hop from one queue to the next) |
107 | * which is expected by clients implicitly. |
108 | * |
109 | * Note that this is the same number of fences that a traditional lock |
110 | * would have, but as a once-per-batch cost. |
111 | */ |
112 | *tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst); |
113 | |
114 | return head; |
115 | } |
116 | |
117 | mpsc_queue_chain_t |
118 | mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail) |
119 | { |
120 | mpsc_queue_chain_t elm = NULL; |
121 | if (cur == tail || cur == NULL) { |
122 | return elm; |
123 | } |
124 | |
125 | elm = os_atomic_load(&cur->mpqc_next, relaxed); |
126 | if (__improbable(elm == NULL)) { |
127 | elm = _mpsc_queue_wait_for_enqueuer(ptr: &cur->mpqc_next); |
128 | } |
129 | return elm; |
130 | } |
131 | |
132 | #pragma mark "GCD"-like facilities |
133 | |
134 | static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t); |
135 | static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t); |
136 | |
137 | /* thread based queues */ |
138 | |
139 | static void |
140 | _mpsc_daemon_queue_init(mpsc_daemon_queue_t dq, mpsc_daemon_init_options_t flags) |
141 | { |
142 | if (flags & MPSC_DAEMON_INIT_INACTIVE) { |
143 | os_atomic_init(&dq->mpd_state, MPSC_QUEUE_STATE_INACTIVE); |
144 | } |
145 | } |
146 | |
147 | static void |
148 | _mpsc_queue_thread_continue(void *param, wait_result_t wr __unused) |
149 | { |
150 | mpsc_daemon_queue_t dq = param; |
151 | mpsc_daemon_queue_kind_t kind = dq->mpd_kind; |
152 | thread_t self = dq->mpd_thread; |
153 | |
154 | __builtin_assume(self != THREAD_NULL); |
155 | |
156 | if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) { |
157 | self->options |= TH_OPT_SYSTEM_CRITICAL; |
158 | } |
159 | |
160 | assert(dq->mpd_thread == current_thread()); |
161 | _mpsc_daemon_queue_drain(dq, self); |
162 | |
163 | if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) { |
164 | self->options &= ~TH_OPT_SYSTEM_CRITICAL; |
165 | } |
166 | |
167 | thread_block_parameter(continuation: _mpsc_queue_thread_continue, parameter: dq); |
168 | } |
169 | |
170 | static void |
171 | _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq) |
172 | { |
173 | thread_wakeup_thread(event: (event_t)dq, thread: dq->mpd_thread); |
174 | } |
175 | |
176 | static kern_return_t |
177 | _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq, |
178 | mpsc_daemon_invoke_fn_t invoke, int pri, const char *name, |
179 | mpsc_daemon_queue_kind_t kind, mpsc_daemon_init_options_t flags) |
180 | { |
181 | kern_return_t kr; |
182 | |
183 | *dq = (struct mpsc_daemon_queue){ |
184 | .mpd_kind = kind, |
185 | .mpd_invoke = invoke, |
186 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), |
187 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, |
188 | }; |
189 | _mpsc_daemon_queue_init(dq, flags); |
190 | |
191 | kr = kernel_thread_create(continuation: _mpsc_queue_thread_continue, parameter: dq, priority: pri, |
192 | new_thread: &dq->mpd_thread); |
193 | if (kr == KERN_SUCCESS) { |
194 | thread_set_thread_name(th: dq->mpd_thread, name); |
195 | thread_start_in_assert_wait(thread: dq->mpd_thread, |
196 | waitq: assert_wait_queue(event: dq), CAST_EVENT64_T(dq), |
197 | THREAD_UNINT); |
198 | thread_deallocate(thread: dq->mpd_thread); |
199 | } |
200 | return kr; |
201 | } |
202 | |
203 | kern_return_t |
204 | mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq, |
205 | mpsc_daemon_invoke_fn_t invoke, int pri, const char *name, |
206 | mpsc_daemon_init_options_t flags) |
207 | { |
208 | return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name, |
209 | kind: MPSC_QUEUE_KIND_THREAD, flags); |
210 | } |
211 | |
212 | /* thread-call based queues */ |
213 | |
214 | static void |
215 | _mpsc_queue_thread_call_drain(thread_call_param_t arg0, |
216 | thread_call_param_t arg1 __unused) |
217 | { |
218 | _mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL); |
219 | } |
220 | |
221 | static void |
222 | _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq) |
223 | { |
224 | thread_call_enter(call: dq->mpd_call); |
225 | } |
226 | |
227 | void |
228 | mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq, |
229 | mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri, |
230 | mpsc_daemon_init_options_t flags) |
231 | { |
232 | *dq = (struct mpsc_daemon_queue){ |
233 | .mpd_kind = MPSC_QUEUE_KIND_THREAD_CALL, |
234 | .mpd_invoke = invoke, |
235 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), |
236 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, |
237 | }; |
238 | _mpsc_daemon_queue_init(dq, flags); |
239 | dq->mpd_call = thread_call_allocate_with_options( |
240 | func: _mpsc_queue_thread_call_drain, param0: dq, pri, options: THREAD_CALL_OPTIONS_ONCE); |
241 | } |
242 | |
243 | /* nested queues */ |
244 | |
245 | void |
246 | mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm, |
247 | __unused mpsc_daemon_queue_t tq) |
248 | { |
249 | mpsc_daemon_queue_t dq; |
250 | dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain); |
251 | _mpsc_daemon_queue_drain(dq, NULL); |
252 | } |
253 | |
254 | static void |
255 | _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq) |
256 | { |
257 | _mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain); |
258 | } |
259 | |
260 | void |
261 | mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq, |
262 | mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target, |
263 | mpsc_daemon_init_options_t flags) |
264 | { |
265 | *dq = (struct mpsc_daemon_queue){ |
266 | .mpd_kind = MPSC_QUEUE_KIND_NESTED, |
267 | .mpd_invoke = invoke, |
268 | .mpd_target = target, |
269 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), |
270 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, |
271 | }; |
272 | _mpsc_daemon_queue_init(dq, flags); |
273 | } |
274 | |
275 | /* enqueue, drain & cancelation */ |
276 | |
277 | static void |
278 | _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self) |
279 | { |
280 | mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke; |
281 | mpsc_queue_chain_t head, cur, tail; |
282 | mpsc_daemon_queue_state_t st; |
283 | |
284 | again: |
285 | /* |
286 | * Most of the time we're woken up because we're dirty, |
287 | * This atomic xor sets DRAINING and clears WAKEUP in a single atomic |
288 | * in that case. |
289 | * |
290 | * However, if we're woken up for cancelation, the state may be reduced to |
291 | * the CANCELED bit set only, and then the xor will actually set WAKEUP. |
292 | * We need to correct this and clear it back to avoid looping below. |
293 | * This is safe to do as no one is allowed to enqueue more work after |
294 | * cancelation has happened. |
295 | * |
296 | * We use `st` as a dependency token to pair with the release fence in |
297 | * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update |
298 | * to the tail of the MPSC queue that made it non empty is visible to us. |
299 | */ |
300 | st = os_atomic_xor(&dq->mpd_state, |
301 | MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency); |
302 | assert(st & MPSC_QUEUE_STATE_DRAINING); |
303 | if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) { |
304 | assert(st & MPSC_QUEUE_STATE_CANCELED); |
305 | os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed); |
306 | } |
307 | |
308 | os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st); |
309 | if ((head = mpsc_queue_dequeue_batch(q: &dq->mpd_queue, tail_out: &tail, dependency: dep))) { |
310 | do { |
311 | mpsc_queue_batch_foreach_safe(cur, head, tail) { |
312 | os_atomic_store(&cur->mpqc_next, |
313 | MPSC_QUEUE_NOTQUEUED_MARKER, relaxed); |
314 | invoke(cur, dq); |
315 | } |
316 | } while ((head = mpsc_queue_dequeue_batch(q: &dq->mpd_queue, tail_out: &tail, dependency: dep))); |
317 | |
318 | if (dq->mpd_options & MPSC_QUEUE_OPTION_BATCH) { |
319 | invoke(MPSC_QUEUE_BATCH_END, dq); |
320 | } |
321 | } |
322 | |
323 | if (self) { |
324 | assert_wait(event: (event_t)dq, THREAD_UNINT); |
325 | } |
326 | |
327 | /* |
328 | * Unlike GCD no fence is necessary here: there is no concept similar |
329 | * to "dispatch_sync()" that would require changes this thread made to be |
330 | * visible to other threads as part of the mpsc_daemon_queue machinery. |
331 | * |
332 | * Making updates that happened on the daemon queue visible to other threads |
333 | * is the responsibility of the client. |
334 | */ |
335 | st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed); |
336 | |
337 | /* |
338 | * A wakeup has happened while we were draining, |
339 | * which means that the queue did an [ empty -> non empty ] |
340 | * transition during our drain. |
341 | * |
342 | * Chances are we already observed and drained everything, |
343 | * but we need to be absolutely sure, so start a drain again |
344 | * as the enqueuer observed the DRAINING bit and has skipped calling |
345 | * _mpsc_daemon_queue_wakeup(). |
346 | */ |
347 | if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) { |
348 | if (self) { |
349 | clear_wait(thread: self, THREAD_AWAKENED); |
350 | } |
351 | goto again; |
352 | } |
353 | |
354 | /* dereferencing `dq` past this point is unsafe */ |
355 | |
356 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { |
357 | thread_wakeup(&dq->mpd_state); |
358 | if (self) { |
359 | clear_wait(thread: self, THREAD_AWAKENED); |
360 | thread_terminate_self(); |
361 | __builtin_unreachable(); |
362 | } |
363 | } |
364 | } |
365 | |
366 | static void |
367 | _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq) |
368 | { |
369 | switch (dq->mpd_kind) { |
370 | case MPSC_QUEUE_KIND_NESTED: |
371 | _mpsc_daemon_queue_nested_wakeup(dq); |
372 | break; |
373 | case MPSC_QUEUE_KIND_THREAD: |
374 | case MPSC_QUEUE_KIND_THREAD_CRITICAL: |
375 | _mpsc_queue_thread_wakeup(dq); |
376 | break; |
377 | case MPSC_QUEUE_KIND_THREAD_CALL: |
378 | _mpsc_queue_thread_call_wakeup(dq); |
379 | break; |
380 | default: |
381 | panic("mpsc_queue[%p]: invalid kind (%d)" , dq, dq->mpd_kind); |
382 | } |
383 | } |
384 | |
385 | static void |
386 | _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm) |
387 | { |
388 | mpsc_daemon_queue_state_t st; |
389 | |
390 | if (mpsc_queue_append(q: &dq->mpd_queue, elm)) { |
391 | /* |
392 | * Pairs with the acquire fence in _mpsc_daemon_queue_drain(). |
393 | */ |
394 | st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release); |
395 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { |
396 | panic("mpsc_queue[%p]: use after cancelation" , dq); |
397 | } |
398 | |
399 | if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP | |
400 | MPSC_QUEUE_STATE_INACTIVE)) == 0) { |
401 | _mpsc_daemon_queue_wakeup(dq); |
402 | } |
403 | } |
404 | } |
405 | |
406 | void |
407 | mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm, |
408 | mpsc_queue_options_t options) |
409 | { |
410 | if (options & MPSC_QUEUE_DISABLE_PREEMPTION) { |
411 | disable_preemption(); |
412 | } |
413 | |
414 | _mpsc_daemon_queue_enqueue(dq, elm); |
415 | |
416 | if (options & MPSC_QUEUE_DISABLE_PREEMPTION) { |
417 | enable_preemption(); |
418 | } |
419 | } |
420 | |
421 | void |
422 | mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq) |
423 | { |
424 | mpsc_daemon_queue_state_t st; |
425 | |
426 | st = os_atomic_andnot_orig(&dq->mpd_state, |
427 | MPSC_QUEUE_STATE_INACTIVE, relaxed); |
428 | if ((st & MPSC_QUEUE_STATE_WAKEUP) && (st & MPSC_QUEUE_STATE_INACTIVE)) { |
429 | _mpsc_daemon_queue_wakeup(dq); |
430 | } |
431 | } |
432 | |
433 | void |
434 | mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq) |
435 | { |
436 | mpsc_daemon_queue_state_t st; |
437 | |
438 | assert_wait(event: (event_t)&dq->mpd_state, THREAD_UNINT); |
439 | |
440 | st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed); |
441 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { |
442 | panic("mpsc_queue[%p]: cancelled twice (%x)" , dq, st); |
443 | } |
444 | if (__improbable(st & MPSC_QUEUE_STATE_INACTIVE)) { |
445 | panic("mpsc_queue[%p]: queue is inactive (%x)" , dq, st); |
446 | } |
447 | |
448 | if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) { |
449 | clear_wait(thread: current_thread(), THREAD_AWAKENED); |
450 | } else { |
451 | disable_preemption(); |
452 | _mpsc_daemon_queue_wakeup(dq); |
453 | enable_preemption(); |
454 | thread_block(THREAD_CONTINUE_NULL); |
455 | } |
456 | |
457 | switch (dq->mpd_kind) { |
458 | case MPSC_QUEUE_KIND_NESTED: |
459 | dq->mpd_target = NULL; |
460 | break; |
461 | case MPSC_QUEUE_KIND_THREAD: |
462 | case MPSC_QUEUE_KIND_THREAD_CRITICAL: |
463 | dq->mpd_thread = NULL; |
464 | break; |
465 | case MPSC_QUEUE_KIND_THREAD_CALL: |
466 | thread_call_cancel_wait(call: dq->mpd_call); |
467 | thread_call_free(call: dq->mpd_call); |
468 | dq->mpd_call = NULL; |
469 | break; |
470 | default: |
471 | panic("mpsc_queue[%p]: invalid kind (%d)" , dq, dq->mpd_kind); |
472 | } |
473 | dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN; |
474 | } |
475 | |
476 | #pragma mark deferred deallocation daemon |
477 | |
478 | static struct mpsc_daemon_queue thread_deferred_deallocation_queue; |
479 | |
480 | void |
481 | thread_deallocate_daemon_init(void) |
482 | { |
483 | kern_return_t kr; |
484 | |
485 | kr = _mpsc_daemon_queue_init_with_thread(dq: &thread_deferred_deallocation_queue, |
486 | invoke: mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL, |
487 | name: "daemon.deferred-deallocation" , kind: MPSC_QUEUE_KIND_THREAD_CRITICAL, |
488 | flags: MPSC_DAEMON_INIT_NONE); |
489 | if (kr != KERN_SUCCESS) { |
490 | panic("thread_deallocate_daemon_init: creating daemon failed (%d)" , kr); |
491 | } |
492 | } |
493 | |
494 | void |
495 | thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq, |
496 | mpsc_daemon_invoke_fn_t invoke) |
497 | { |
498 | mpsc_daemon_queue_init_with_target(dq, invoke, |
499 | target: &thread_deferred_deallocation_queue, flags: MPSC_DAEMON_INIT_NONE); |
500 | } |
501 | |