1/*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counter.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
81#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
85#include <kern/waitq.h>
86
87#include <ipc/port.h>
88#include <ipc/ipc_mqueue.h>
89#include <ipc/ipc_kmsg.h>
90#include <ipc/ipc_right.h>
91#include <ipc/ipc_port.h>
92#include <ipc/ipc_pset.h>
93#include <ipc/ipc_space.h>
94
95#if MACH_FLIPC
96#include <ipc/flipc.h>
97#endif
98
99#ifdef __LP64__
100#include <vm/vm_map.h>
101#endif
102
103#include <sys/event.h>
104
105extern char *proc_name_address(void *p);
106
107int ipc_mqueue_full; /* address is event for queue space */
108int ipc_mqueue_rcv; /* address is event for message arrival */
109
110/* forward declarations */
111static void ipc_mqueue_receive_results(wait_result_t result);
112
113#if MACH_FLIPC
114static void ipc_mqueue_peek_on_thread_locked(
115 ipc_mqueue_t port_mq,
116 mach_msg_option64_t option,
117 thread_t thread);
118#endif /* MACH_FLIPC */
119
120/* Deliver message to message queue or waiting receiver */
121static void ipc_mqueue_post(
122 ipc_mqueue_t mqueue,
123 ipc_kmsg_t kmsg,
124 mach_msg_option_t option);
125
126/*
127 * Routine: ipc_mqueue_init
128 * Purpose:
129 * Initialize a newly-allocated message queue.
130 */
131void
132ipc_mqueue_init(
133 ipc_mqueue_t mqueue)
134{
135 ipc_kmsg_queue_init(&mqueue->imq_messages);
136 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
137 klist_init(list: &mqueue->imq_klist);
138}
139
140/*
141 * Routine: ipc_mqueue_add_locked.
142 * Purpose:
143 * Associate the portset's mqueue with the port's mqueue.
144 * This has to be done so that posting the port will wakeup
145 * a portset waiter. If there are waiters on the portset
146 * mqueue and messages on the port mqueue, try to match them
147 * up now.
148 * Conditions:
149 * Port and Pset both locked.
150 */
151kern_return_t
152ipc_mqueue_add_locked(
153 ipc_mqueue_t port_mqueue,
154 ipc_pset_t pset,
155 waitq_link_t *linkp)
156{
157 ipc_port_t port = ip_from_mq(port_mqueue);
158 struct waitq_set *wqset = &pset->ips_wqset;
159 circle_queue_t kmsgq = &port_mqueue->imq_messages;
160 kern_return_t kr = KERN_SUCCESS;
161 ipc_kmsg_t kmsg;
162
163 kr = waitq_link_locked(waitq: &port->ip_waitq, wqset, link: linkp);
164 if (kr != KERN_SUCCESS) {
165 return kr;
166 }
167
168 /*
169 * Now that the set has been added to the port, there may be
170 * messages queued on the port and threads waiting on the set
171 * waitq. Lets get them together.
172 *
173 * Only consider this set however, as the other ones have been
174 * posted to already.
175 */
176 while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
177 thread_t th;
178 mach_msg_size_t msize, asize;
179
180 th = waitq_wakeup64_identify_locked(waitq: wqset, IPC_MQUEUE_RECEIVE,
181 THREAD_AWAKENED, flags: WAITQ_KEEP_LOCKED);
182 /* port and pset still locked, thread not runnable */
183
184 if (th == THREAD_NULL) {
185 /*
186 * Didn't find a thread to wake up but messages
187 * are enqueued, prepost the set instead,
188 * as calling waitq_wakeup64_identify_locked()
189 * on the set directly will not take care of it.
190 */
191 waitq_link_prepost_locked(waitq: &port->ip_waitq, wqset);
192 break;
193 }
194
195 /*
196 * Because we hold the thread off the runqueue at this point,
197 * it's safe to modify ith_ fields on the thread, as
198 * until it is resumed, it must be off core or in between
199 * the assert wait and returning from the continuation.
200 */
201
202 /*
203 * If the receiver waited with a facility not directly
204 * related to Mach messaging, then it isn't prepared to get
205 * handed the message directly. Just set it running, and
206 * go look for another thread that can.
207 */
208 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
209#if MACH_FLIPC
210 if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
211 /*
212 * wakeup the peeking thread, but
213 * continue to loop over the threads
214 * waiting on the port's mqueue to see
215 * if there are any actual receivers
216 */
217 ipc_mqueue_peek_on_thread_locked(port_mqueue,
218 th->ith_option, th);
219 }
220#endif /* MACH_FLIPC */
221
222 waitq_resume_identified_thread(waitq: wqset, thread: th,
223 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
224 continue;
225 }
226
227 /*
228 * Found a receiver. see if they can handle the message
229 * correctly (the message is not too large for them, or
230 * they didn't care to be informed that the message was
231 * too large). If they can't handle it, take them off
232 * the list and let them go back and figure it out and
233 * just move onto the next.
234 */
235 msize = ipc_kmsg_copyout_size(kmsg, map: th->map);
236 asize = ipc_kmsg_aux_data_size(kmsg);
237
238 if (ipc_kmsg_too_large(msg_size: msize, aux_size: asize, options: th->ith_option,
239 max_msg_size: th->ith_max_msize, max_aux_size: th->ith_max_asize, receiver: th)) {
240 th->ith_state = MACH_RCV_TOO_LARGE;
241 th->ith_msize = msize;
242 th->ith_asize = asize;
243 if (th->ith_option & MACH_RCV_LARGE) {
244 /*
245 * let him go without message
246 */
247 th->ith_receiver_name = port_mqueue->imq_receiver_name;
248 th->ith_kmsg = IKM_NULL;
249 th->ith_seqno = 0;
250
251 waitq_resume_identified_thread(waitq: wqset, thread: th,
252 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
253
254 continue; /* find another thread */
255 }
256 } else {
257 th->ith_state = MACH_MSG_SUCCESS;
258 }
259
260 /*
261 * This thread is going to take this message,
262 * so give it the message.
263 */
264 ipc_kmsg_rmqueue(kmsgq, kmsg);
265
266#if MACH_FLIPC
267 mach_node_t node = kmsg->ikm_node;
268#endif
269
270 ipc_mqueue_release_msgcount(port_mq: port_mqueue);
271
272 th->ith_kmsg = kmsg;
273 th->ith_seqno = port_mqueue->imq_seqno++;
274
275 waitq_resume_identified_thread(waitq: wqset, thread: th,
276 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
277
278#if MACH_FLIPC
279 if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
280 flipc_msg_ack(node, port_mqueue, TRUE);
281 }
282#endif
283 }
284
285 return KERN_SUCCESS;
286}
287
288
289/*
290 * Routine: ipc_port_has_klist
291 * Purpose:
292 * Returns whether the given port imq_klist field can be used as a klist.
293 */
294bool
295ipc_port_has_klist(ipc_port_t port)
296{
297 return !port->ip_specialreply &&
298 port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
299}
300
301static inline struct klist *
302ipc_object_klist(ipc_object_t object)
303{
304 if (io_otype(object) == IOT_PORT) {
305 ipc_port_t port = ip_object_to_port(object);
306
307 return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
308 }
309 return &ips_object_to_pset(object)->ips_klist;
310}
311
312/*
313 * Routine: ipc_mqueue_changed
314 * Purpose:
315 * Wake up receivers waiting in a message queue.
316 * Conditions:
317 * The object containing the message queue is locked.
318 */
319void
320ipc_mqueue_changed(
321 ipc_space_t space,
322 struct waitq *waitq)
323{
324 ipc_object_t object = io_from_waitq(waitq);
325 struct klist *klist = ipc_object_klist(object);
326
327 if (klist && SLIST_FIRST(klist)) {
328 /*
329 * Indicate that this message queue is vanishing
330 *
331 * When this is called, the associated receive right may be in flight
332 * between two tasks: the one it used to live in, and the one that armed
333 * a port destroyed notification for it.
334 *
335 * The new process may want to register the port it gets back with an
336 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
337 * port pending already, in which case we want the imq_klist field to be
338 * reusable for nefarious purposes.
339 *
340 * Fortunately, we really don't need this linkage anymore after this
341 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
342 *
343 * Note: we don't have the space lock here, however, this covers the
344 * case of when a task is terminating the space, triggering
345 * several knote_vanish() calls.
346 *
347 * We don't need the lock to observe that the space is inactive as
348 * we just deactivated it on the same thread.
349 *
350 * We still need to call knote_vanish() so that the knote is
351 * marked with EV_VANISHED or EV_EOF so that the detach step
352 * in filt_machportdetach is skipped correctly.
353 */
354 assert(space);
355 knote_vanish(list: klist, is_active(space));
356 }
357
358 if (io_otype(object) == IOT_PORT) {
359 ipc_port_t port = ip_object_to_port(object);
360 if (!port->ip_specialreply) {
361 ipc_port_adjust_sync_link_state_locked(port,
362 PORT_SYNC_LINK_ANY, NULL);
363 }
364 } else {
365 klist_init(list: klist);
366 }
367
368 /*
369 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
370 * needs to handle this manually, and the port lock
371 * is the waitq lock, so there's really no inefficiency there.
372 */
373 waitq_wakeup64_all_locked(waitq: waitq, IPC_MQUEUE_RECEIVE,
374 THREAD_RESTART, flags: WAITQ_KEEP_LOCKED);
375}
376
377
378
379
380/*
381 * Routine: ipc_mqueue_send
382 * Purpose:
383 * Send a message to a message queue. The message holds a reference
384 * for the destination port for this message queue in the
385 * msgh_remote_port field.
386 *
387 * If unsuccessful, the caller still has possession of
388 * the message and must do something with it. If successful,
389 * the message is queued, given to a receiver, or destroyed.
390 * Conditions:
391 * port is locked.
392 * Returns:
393 * MACH_MSG_SUCCESS The message was accepted.
394 * MACH_SEND_TIMED_OUT Caller still has message.
395 * MACH_SEND_INTERRUPTED Caller still has message.
396 */
397mach_msg_return_t
398ipc_mqueue_send_locked(
399 ipc_mqueue_t mqueue,
400 ipc_kmsg_t kmsg,
401 mach_msg_option_t option,
402 mach_msg_timeout_t send_timeout)
403{
404 ipc_port_t port = ip_from_mq(mqueue);
405 int wresult;
406
407 /*
408 * Don't block if:
409 * 1) We're under the queue limit.
410 * 2) Caller used the MACH_SEND_ALWAYS internal option.
411 * 3) Message is sent to a send-once right.
412 */
413 if (!imq_full(mqueue) ||
414 (!imq_full_kernel(mqueue) &&
415 ((option & MACH_SEND_ALWAYS) ||
416 (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
417 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
418 mqueue->imq_msgcount++;
419 assert(mqueue->imq_msgcount > 0);
420 ip_mq_unlock(port);
421 } else {
422 thread_t cur_thread = current_thread();
423 struct turnstile *send_turnstile = TURNSTILE_NULL;
424 uint64_t deadline;
425
426 /*
427 * We have to wait for space to be granted to us.
428 */
429 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
430 ip_mq_unlock(port);
431 return MACH_SEND_TIMED_OUT;
432 }
433 if (imq_full_kernel(mqueue)) {
434 ip_mq_unlock(port);
435 return MACH_SEND_NO_BUFFER;
436 }
437 port->ip_fullwaiters = true;
438
439 if (option & MACH_SEND_TIMEOUT) {
440 clock_interval_to_deadline(interval: send_timeout, scale_factor: 1000 * NSEC_PER_USEC, result: &deadline);
441 } else {
442 deadline = 0;
443 }
444
445 thread_set_pending_block_hint(thread: cur_thread, block_hint: kThreadWaitPortSend);
446
447 send_turnstile = turnstile_prepare(proprietor: (uintptr_t)port,
448 port_send_turnstile_address(port),
449 TURNSTILE_NULL, type: TURNSTILE_SYNC_IPC);
450
451 ipc_port_send_update_inheritor(port, turnstile: send_turnstile,
452 flags: TURNSTILE_DELAYED_UPDATE);
453
454 wresult = waitq_assert_wait64_leeway(
455 waitq: &send_turnstile->ts_waitq,
456 IPC_MQUEUE_FULL,
457 THREAD_ABORTSAFE,
458 TIMEOUT_URGENCY_USER_NORMAL,
459 deadline,
460 TIMEOUT_NO_LEEWAY);
461
462 ip_mq_unlock(port);
463 turnstile_update_inheritor_complete(turnstile: send_turnstile,
464 flags: TURNSTILE_INTERLOCK_NOT_HELD);
465
466 if (wresult == THREAD_WAITING) {
467 wresult = thread_block(THREAD_CONTINUE_NULL);
468 }
469
470 /* Call turnstile complete with interlock held */
471 ip_mq_lock(port);
472 turnstile_complete(proprietor: (uintptr_t)port, port_send_turnstile_address(port), NULL, type: TURNSTILE_SYNC_IPC);
473 ip_mq_unlock(port);
474
475 /* Call cleanup after dropping the interlock */
476 turnstile_cleanup();
477
478 switch (wresult) {
479 case THREAD_AWAKENED:
480 /*
481 * we can proceed - inherited msgcount from waker
482 * or the message queue has been destroyed and the msgcount
483 * has been reset to zero (will detect in ipc_mqueue_post()).
484 */
485 break;
486
487 case THREAD_TIMED_OUT:
488 assert(option & MACH_SEND_TIMEOUT);
489 return MACH_SEND_TIMED_OUT;
490
491 case THREAD_INTERRUPTED:
492 return MACH_SEND_INTERRUPTED;
493
494 case THREAD_RESTART:
495 /* mqueue is being destroyed */
496 return MACH_SEND_INVALID_DEST;
497 default:
498 panic("ipc_mqueue_send");
499 }
500 }
501
502 ipc_mqueue_post(mqueue, kmsg, option);
503 return MACH_MSG_SUCCESS;
504}
505
506/*
507 * Routine: ipc_mqueue_override_send_locked
508 * Purpose:
509 * Set an override qos on the first message in the queue
510 * (if the queue is full). This is a send-possible override
511 * that will go away as soon as we drain a message from the
512 * queue.
513 *
514 * Conditions:
515 * The port corresponding to mqueue is locked.
516 * The caller holds a reference on the message queue.
517 */
518void
519ipc_mqueue_override_send_locked(
520 ipc_mqueue_t mqueue,
521 mach_msg_qos_t qos_ovr)
522{
523 ipc_port_t port = ip_from_mq(mqueue);
524
525 assert(waitq_is_valid(&port->ip_waitq));
526
527 if (imq_full(mqueue)) {
528 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
529
530 if (first && ipc_kmsg_override_qos(queue: &mqueue->imq_messages, kmsg: first, qos_ovr)) {
531 if (ip_in_a_space(port) &&
532 is_active(ip_get_receiver(port)) &&
533 ipc_port_has_klist(port)) {
534 KNOTE(&port->ip_klist, 0);
535 }
536 }
537 }
538}
539
540/*
541 * Routine: ipc_mqueue_release_msgcount
542 * Purpose:
543 * Release a message queue reference in the case where we
544 * found a waiter.
545 *
546 * Conditions:
547 * The port corresponding to message queue is locked.
548 * The message corresponding to this reference is off the queue.
549 * There is no need to pass reserved preposts because this will
550 * never prepost to anyone
551 */
552void
553ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
554{
555 ipc_port_t port = ip_from_mq(port_mq);
556 struct turnstile *send_turnstile = port_send_turnstile(port);
557
558 ip_mq_lock_held(port);
559 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
560
561 port_mq->imq_msgcount--;
562
563 if (!imq_full(port_mq) && port->ip_fullwaiters &&
564 send_turnstile != TURNSTILE_NULL) {
565 /*
566 * boost the priority of the awoken thread
567 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
568 * the message queue slot we've just reserved.
569 *
570 * NOTE: this will never prepost
571 *
572 * The wakeup happens on a turnstile waitq
573 * which will wakeup the highest priority waiter.
574 * A potential downside of this would be starving low
575 * priority senders if there is a constant churn of
576 * high priority threads trying to send to this port.
577 */
578 if (waitq_wakeup64_one(waitq: &send_turnstile->ts_waitq,
579 IPC_MQUEUE_FULL,
580 THREAD_AWAKENED,
581 flags: WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
582 port->ip_fullwaiters = false;
583 } else {
584 /* gave away our slot - add reference back */
585 port_mq->imq_msgcount++;
586 }
587 }
588
589 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
590 waitq_clear_prepost_locked(waitq: &port->ip_waitq);
591 }
592}
593
594/*
595 * Routine: ipc_mqueue_post
596 * Purpose:
597 * Post a message to a waiting receiver or enqueue it. If a
598 * receiver is waiting, we can release our reserved space in
599 * the message queue.
600 *
601 * Conditions:
602 * port is unlocked
603 * If we need to queue, our space in the message queue is reserved.
604 */
605static void
606ipc_mqueue_post(
607 ipc_mqueue_t mqueue,
608 ipc_kmsg_t kmsg,
609 mach_msg_option_t __unused option)
610{
611 ipc_port_t port = ip_from_mq(mqueue);
612 struct waitq *waitq = &port->ip_waitq;
613 boolean_t destroy_msg = FALSE;
614
615 ipc_kmsg_trace_send(kmsg, option);
616
617 /*
618 * While the msg queue is locked, we have control of the
619 * kmsg, so the ref in it for the port is still good.
620 *
621 * Check for a receiver for the message.
622 */
623 ip_mq_lock(port);
624
625 /* we may have raced with port destruction! */
626 if (!waitq_is_valid(wq: &port->ip_waitq)) {
627 destroy_msg = TRUE;
628 goto out_unlock;
629 }
630
631 for (;;) {
632 thread_t receiver;
633 mach_msg_size_t msize, asize;
634
635 receiver = waitq_wakeup64_identify_locked(waitq: waitq,
636 IPC_MQUEUE_RECEIVE, THREAD_AWAKENED, flags: WAITQ_KEEP_LOCKED);
637 /* waitq still locked, thread not runnable */
638
639 if (receiver == THREAD_NULL) {
640 /*
641 * no receivers; queue kmsg if space still reserved
642 * Reservations are cancelled when the port goes inactive.
643 * note that this will enqueue the message for any
644 * "peeking" receivers.
645 *
646 * Also, post the knote to wake up any threads waiting
647 * on that style of interface if this insertion is of
648 * note (first insertion, or adjusted override qos all
649 * the way to the head of the queue).
650 *
651 * This is just for ports. port-sets knotes are being
652 * posted to by the waitq_wakeup64_identify_locked()
653 * above already.
654 */
655 if (mqueue->imq_msgcount == 0) {
656 /*
657 * The message queue must belong
658 * to an inactive port, so just destroy
659 * the message and pretend it was posted.
660 */
661 destroy_msg = TRUE;
662 } else if (!ipc_kmsg_enqueue_qos(queue: &mqueue->imq_messages, kmsg)) {
663 /*
664 * queue was not empty and qos
665 * didn't change, nothing to do.
666 */
667 } else if (ip_in_a_space(port) &&
668 is_active(ip_get_receiver(port)) &&
669 ipc_port_has_klist(port)) {
670 /*
671 * queue was empty or qos changed
672 * we need to tell kqueue, unless
673 * the space is getting torn down
674 */
675 KNOTE(&port->ip_klist, 0);
676 }
677 break;
678 }
679
680#if MACH_FLIPC
681 /*
682 * If a thread is attempting a "peek" into the message queue
683 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
684 * thread running. A successful peek is essentially the same as
685 * message delivery since the peeking thread takes responsibility
686 * for delivering the message and (eventually) removing it from
687 * the mqueue. Only one thread can successfully use the peek
688 * facility on any given port, so we exit the waitq loop after
689 * encountering such a thread.
690 */
691 if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
692 ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
693 ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
694 waitq_resume_identified_thread(waitq, receiver,
695 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
696 break; /* Message was posted, so break out of loop */
697 }
698#endif /* MACH_FLIPC */
699
700 /*
701 * If the receiver waited with a facility not directly related
702 * to Mach messaging, then it isn't prepared to get handed the
703 * message directly. Just set it running, and go look for
704 * another thread that can.
705 */
706 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
707 waitq_resume_identified_thread(waitq: waitq, thread: receiver,
708 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
709
710 continue;
711 }
712
713
714 /*
715 * We found a waiting thread.
716 * If the message is too large or the scatter list is too small
717 * the thread we wake up will get that as its status.
718 */
719 msize = ipc_kmsg_copyout_size(kmsg, map: receiver->map);
720 asize = ipc_kmsg_aux_data_size(kmsg);
721
722 if (ipc_kmsg_too_large(msg_size: msize, aux_size: asize, options: receiver->ith_option,
723 max_msg_size: receiver->ith_max_msize, max_aux_size: receiver->ith_max_asize, receiver)) {
724 receiver->ith_msize = msize;
725 receiver->ith_asize = asize;
726 receiver->ith_state = MACH_RCV_TOO_LARGE;
727 } else {
728 receiver->ith_state = MACH_MSG_SUCCESS;
729 }
730
731 /*
732 * If there is no problem with the upcoming receive, or the
733 * receiver thread didn't specifically ask for special too
734 * large error condition, go ahead and select it anyway.
735 */
736 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
737 !(receiver->ith_option & MACH_RCV_LARGE)) {
738 receiver->ith_kmsg = kmsg;
739 receiver->ith_seqno = mqueue->imq_seqno++;
740#if MACH_FLIPC
741 mach_node_t node = kmsg->ikm_node;
742#endif
743 waitq_resume_identified_thread(waitq: waitq, thread: receiver,
744 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
745
746 /* we didn't need our reserved spot in the queue */
747 ipc_mqueue_release_msgcount(port_mq: mqueue);
748
749#if MACH_FLIPC
750 if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
751 flipc_msg_ack(node, mqueue, TRUE);
752 }
753#endif
754 break;
755 }
756
757 /*
758 * Otherwise, this thread needs to be released to run
759 * and handle its error without getting the message. We
760 * need to go back and pick another one.
761 */
762 receiver->ith_receiver_name = mqueue->imq_receiver_name;
763 receiver->ith_kmsg = IKM_NULL;
764 receiver->ith_seqno = 0;
765
766 waitq_resume_identified_thread(waitq: waitq, thread: receiver,
767 THREAD_AWAKENED, flags: WAITQ_WAKEUP_DEFAULT);
768 }
769
770out_unlock:
771 /* clear the waitq boost we may have been given */
772 waitq_clear_promotion_locked(waitq: waitq, thread: current_thread());
773 waitq_unlock(wq: waitq);
774
775 if (destroy_msg) {
776 ipc_kmsg_destroy(kmsg, flags: IPC_KMSG_DESTROY_ALL);
777 }
778
779 counter_inc(&current_task()->messages_sent);
780 return;
781}
782
783
784static void
785ipc_mqueue_receive_results(wait_result_t saved_wait_result)
786{
787 thread_t self = current_thread();
788 mach_msg_option64_t option64 = self->ith_option;
789
790 /*
791 * why did we wake up?
792 */
793 switch (saved_wait_result) {
794 case THREAD_TIMED_OUT:
795 self->ith_state = MACH_RCV_TIMED_OUT;
796 return;
797
798 case THREAD_INTERRUPTED:
799 self->ith_state = MACH_RCV_INTERRUPTED;
800 return;
801
802 case THREAD_RESTART:
803 /* something bad happened to the port/set */
804 self->ith_state = MACH_RCV_PORT_CHANGED;
805 return;
806
807 case THREAD_AWAKENED:
808 /*
809 * We do not need to go select a message, somebody
810 * handed us one (or a too-large indication).
811 */
812 switch (self->ith_state) {
813 case MACH_RCV_SCATTER_SMALL:
814 case MACH_RCV_TOO_LARGE:
815 /*
816 * Somebody tried to give us a too large
817 * message. If we indicated that we cared,
818 * then they only gave us the indication,
819 * otherwise they gave us the indication
820 * AND the message anyway.
821 */
822 if (option64 & MACH_RCV_LARGE) {
823 return;
824 }
825 return;
826 case MACH_MSG_SUCCESS:
827 return;
828#if MACH_FLIPC
829 case MACH_PEEK_READY:
830 return;
831#endif /* MACH_FLIPC */
832
833 default:
834 panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
835 }
836
837 default:
838 panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
839 }
840}
841
842void
843ipc_mqueue_receive_continue(
844 __unused void *param,
845 wait_result_t wresult)
846{
847 ipc_mqueue_receive_results(saved_wait_result: wresult);
848 mach_msg_receive_continue(); /* hard-coded for now */
849}
850
851/*
852 * Routine: ipc_mqueue_receive
853 * Purpose:
854 * Receive a message from a message queue.
855 *
856 * Conditions:
857 * Our caller must hold a reference for the port or port set
858 * to which this queue belongs, to keep the queue
859 * from being deallocated.
860 *
861 * The kmsg is returned with clean header fields
862 * and with the circular bit turned off through the ith_kmsg
863 * field of the thread's receive continuation state.
864 * Returns:
865 * MACH_MSG_SUCCESS Message returned in ith_kmsg.
866 * MACH_RCV_TOO_LARGE Message size returned in ith_msize,
867 * Auxiliary data size returned in ith_asize
868 * MACH_RCV_TIMED_OUT No message obtained.
869 * MACH_RCV_INTERRUPTED No message obtained.
870 * MACH_RCV_PORT_DIED Port/set died; no message.
871 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
872 *
873 */
874
875void
876ipc_mqueue_receive(
877 struct waitq *waitq,
878 mach_msg_option64_t option64,
879 mach_msg_size_t max_size,
880 mach_msg_size_t max_aux_size, /* 0 if no aux buffer */
881 mach_msg_timeout_t rcv_timeout,
882 int interruptible,
883 bool has_continuation)
884{
885 wait_result_t wresult;
886 thread_t self = current_thread();
887
888 waitq_lock(wq: waitq);
889
890 wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq: waitq, option64, max_size,
891 max_aux_size, rcv_timeout, interruptible, thread: self);
892 /* object unlocked */
893 if (wresult == THREAD_NOT_WAITING) {
894 return;
895 }
896
897 if (wresult == THREAD_WAITING) {
898 if (has_continuation) {
899 wresult = thread_block(continuation: ipc_mqueue_receive_continue);
900 /* NOTREACHED */
901 }
902 wresult = thread_block(THREAD_CONTINUE_NULL);
903 }
904 ipc_mqueue_receive_results(saved_wait_result: wresult);
905}
906
907/*
908 * Routine: ipc_mqueue_receive_on_thread_and_unlock
909 * Purpose:
910 * Receive a message from a message queue using a specified thread.
911 * If no message available, assert_wait on the appropriate waitq.
912 *
913 * Conditions:
914 * Assumes thread is self.
915 * The port/port-set waitq is locked on entry, unlocked on return.
916 * May have assert-waited. Caller must block in those cases.
917 */
918wait_result_t
919ipc_mqueue_receive_on_thread_and_unlock(
920 struct waitq *waitq,
921 mach_msg_option64_t option64,
922 mach_msg_size_t max_msg_size,
923 mach_msg_size_t max_aux_size,
924 mach_msg_timeout_t rcv_timeout,
925 int interruptible,
926 thread_t thread)
927{
928 ipc_object_t object = io_from_waitq(waitq);
929 ipc_port_t port = IP_NULL;
930 wait_result_t wresult;
931 uint64_t deadline;
932 struct turnstile *rcv_turnstile = TURNSTILE_NULL;
933
934 if (waitq_type(wq: waitq) == WQT_PORT_SET) {
935 ipc_pset_t pset = ips_object_to_pset(object);
936 wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
937 struct waitq *port_wq;
938
939 /*
940 * Put the message at the back of the prepost list
941 * if it's not a PEEK.
942 *
943 * Might drop the pset lock temporarily.
944 */
945#if MACH_FLIPC
946 if (option64 & MACH64_PEEK_MSG) {
947 wqs_flags |= WQS_PREPOST_PEEK;
948 }
949#endif /* MACH_FLIPC */
950 port_wq = waitq_set_first_prepost(wqset: &pset->ips_wqset, flags: wqs_flags);
951
952 /* Returns with port locked */
953
954 if (port_wq != NULL) {
955 /*
956 * We get here if there is at least one message
957 * waiting on port_wq. We have instructed the prepost
958 * iteration logic to leave both the port_wq and the
959 * set waitq locked.
960 *
961 * Continue on to handling the message with just
962 * the port waitq locked.
963 */
964 io_unlock(object);
965 port = ip_from_waitq(port_wq);
966 }
967 } else if (waitq_type(wq: waitq) == WQT_PORT) {
968 port = ip_from_waitq(waitq);
969 if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
970 port = IP_NULL;
971 }
972 } else {
973 panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
974 }
975
976 if (port) {
977#if MACH_FLIPC
978 if (option64 & MACH64_PEEK_MSG) {
979 ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
980 option64, thread);
981 } else
982#endif /* MACH_FLIPC */
983 {
984 ipc_mqueue_select_on_thread_locked(port_mq: &port->ip_messages,
985 option64, max_size: max_msg_size, max_aux_size, thread);
986 }
987 ip_mq_unlock(port);
988 return THREAD_NOT_WAITING;
989 }
990
991 if (!waitq_is_valid(wq: waitq)) {
992 /* someone raced us to destroy this mqueue/port! */
993 io_unlock(object);
994 /*
995 * ipc_mqueue_receive_results updates the thread's ith_state
996 * TODO: differentiate between rights being moved and
997 * rights/ports being destroyed (21885327)
998 */
999 return THREAD_RESTART;
1000 }
1001
1002 /*
1003 * Looks like we'll have to block. The waitq we will
1004 * block on (whether the set's or the local port's) is
1005 * still locked.
1006 */
1007 if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
1008 io_unlock(object);
1009 thread->ith_state = MACH_RCV_TIMED_OUT;
1010 return THREAD_NOT_WAITING;
1011 }
1012
1013 thread->ith_option = option64;
1014 thread->ith_max_msize = max_msg_size;
1015 thread->ith_msize = 0;
1016
1017 thread->ith_max_asize = max_aux_size;
1018 thread->ith_asize = 0;
1019
1020 thread->ith_state = MACH_RCV_IN_PROGRESS;
1021#if MACH_FLIPC
1022 if (option64 & MACH64_PEEK_MSG) {
1023 thread->ith_state = MACH_PEEK_IN_PROGRESS;
1024 }
1025#endif /* MACH_FLIPC */
1026
1027 if (option64 & MACH_RCV_TIMEOUT) {
1028 clock_interval_to_deadline(interval: rcv_timeout, scale_factor: 1000 * NSEC_PER_USEC, result: &deadline);
1029 } else {
1030 deadline = 0;
1031 }
1032
1033 /*
1034 * Threads waiting on a reply port (not portset)
1035 * will wait on its receive turnstile.
1036 *
1037 * Donate waiting thread's turnstile and
1038 * setup inheritor for special reply port.
1039 * Based on the state of the special reply
1040 * port, the inheritor would be the send
1041 * turnstile of the connection port on which
1042 * the send of sync ipc would happen or
1043 * workloop's turnstile who would reply to
1044 * the sync ipc message.
1045 *
1046 * Pass in mqueue wait in waitq_assert_wait to
1047 * support port set wakeup. The mqueue waitq of port
1048 * will be converted to to turnstile waitq
1049 * in waitq_assert_wait instead of global waitqs.
1050 */
1051 if (waitq_type(wq: waitq) == WQT_PORT) {
1052 port = ip_from_waitq(waitq);
1053 rcv_turnstile = turnstile_prepare(proprietor: (uintptr_t)port,
1054 port_rcv_turnstile_address(port),
1055 TURNSTILE_NULL, type: TURNSTILE_SYNC_IPC);
1056
1057 ipc_port_recv_update_inheritor(port, turnstile: rcv_turnstile,
1058 flags: TURNSTILE_DELAYED_UPDATE);
1059 }
1060
1061 thread_set_pending_block_hint(thread, block_hint: kThreadWaitPortReceive);
1062 wresult = waitq_assert_wait64_locked(waitq: waitq,
1063 IPC_MQUEUE_RECEIVE,
1064 interruptible,
1065 TIMEOUT_URGENCY_USER_NORMAL,
1066 deadline,
1067 TIMEOUT_NO_LEEWAY,
1068 thread);
1069 if (wresult == THREAD_AWAKENED) {
1070 /*
1071 * The first thing we did was to look for preposts
1072 * (using waitq_set_first_prepost() for sets, or looking
1073 * at the port's queue for ports).
1074 *
1075 * Since we found none, we kept the waitq locked.
1076 *
1077 * It ensures that waitq_assert_wait64_locked() can't
1078 * find pre-posts either, won't drop the waitq lock
1079 * either (even for a set), and can't return THREAD_AWAKENED.
1080 */
1081 panic("ipc_mqueue_receive_on_thread: sleep walking");
1082 }
1083
1084 io_unlock(object);
1085
1086 /*
1087 * After this point, a waiting thread could be found by the wakeup
1088 * identify path, and the other side now owns the ith_ fields until
1089 * this thread blocks and resumes in the continuation
1090 */
1091
1092 /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1093 if (rcv_turnstile != TURNSTILE_NULL) {
1094 turnstile_update_inheritor_complete(turnstile: rcv_turnstile, flags: TURNSTILE_INTERLOCK_NOT_HELD);
1095 }
1096 /* Its callers responsibility to call turnstile_complete to get the turnstile back */
1097
1098 return wresult;
1099}
1100
1101#if MACH_FLIPC
1102/*
1103 * Routine: ipc_mqueue_peek_on_thread_locked
1104 * Purpose:
1105 * A receiver discovered that there was a message on the queue
1106 * before he had to block. Tell a thread about the message queue,
1107 * but don't pick off any messages.
1108 * Conditions:
1109 * port_mq locked
1110 * at least one message on port_mq's message queue
1111 *
1112 * Returns: (on thread->ith_state)
1113 * MACH_PEEK_READY ith_peekq contains a message queue
1114 */
1115void
1116ipc_mqueue_peek_on_thread_locked(
1117 ipc_mqueue_t port_mq,
1118 __assert_only mach_msg_option64_t option64,
1119 thread_t thread)
1120{
1121 assert(option64 & MACH64_PEEK_MSG);
1122 assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1123
1124 /*
1125 * Take a reference on the mqueue's associated port:
1126 * the peeking thread will be responsible to release this reference
1127 */
1128 ip_validate(ip_from_mq(port_mq));
1129 ip_reference(ip_from_mq(port_mq));
1130 thread->ith_peekq = port_mq;
1131 thread->ith_state = MACH_PEEK_READY;
1132}
1133#endif /* MACH_FLIPC */
1134
1135/*
1136 * Routine: ipc_mqueue_select_on_thread_locked
1137 * Purpose:
1138 * A receiver discovered that there was a message on the queue
1139 * before he had to block. Pick the message off the queue and
1140 * "post" it to thread.
1141 * Conditions:
1142 * port locked.
1143 * thread not locked.
1144 * There is a message.
1145 * No need to reserve prepost objects - it will never prepost
1146 *
1147 * Returns:
1148 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
1149 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
1150 */
1151void
1152ipc_mqueue_select_on_thread_locked(
1153 ipc_mqueue_t port_mq,
1154 mach_msg_option64_t option64,
1155 mach_msg_size_t max_msg_size,
1156 mach_msg_size_t max_aux_size,
1157 thread_t thread)
1158{
1159 ipc_kmsg_t kmsg;
1160 mach_msg_size_t msize, asize;
1161
1162 mach_msg_return_t mr = MACH_MSG_SUCCESS;
1163
1164 /*
1165 * Do some sanity checking of our ability to receive
1166 * before pulling the message off the queue.
1167 */
1168 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1169 assert(kmsg != IKM_NULL);
1170
1171 /*
1172 * If we really can't receive it, but we had the
1173 * MACH_RCV_LARGE option set, then don't take it off
1174 * the queue, instead return the appropriate error
1175 * (and size needed).
1176 */
1177 msize = ipc_kmsg_copyout_size(kmsg, map: thread->map);
1178 asize = ipc_kmsg_aux_data_size(kmsg);
1179
1180 if (ipc_kmsg_too_large(msg_size: msize, aux_size: asize, options: option64,
1181 max_msg_size, max_aux_size, receiver: thread)) {
1182 mr = MACH_RCV_TOO_LARGE;
1183 if (option64 & MACH_RCV_LARGE) {
1184 ipc_kmsg_validate_partial_sig(kmsg);
1185 thread->ith_receiver_name = port_mq->imq_receiver_name;
1186 thread->ith_kmsg = IKM_NULL;
1187 thread->ith_msize = msize;
1188 thread->ith_asize = asize;
1189 thread->ith_seqno = 0;
1190 thread->ith_state = mr;
1191 return;
1192 }
1193 }
1194
1195 ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1196#if MACH_FLIPC
1197 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1198 flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1199 }
1200#endif
1201 ipc_mqueue_release_msgcount(port_mq);
1202 thread->ith_seqno = port_mq->imq_seqno++;
1203 thread->ith_kmsg = kmsg;
1204 thread->ith_state = mr;
1205
1206 counter_inc(&current_task()->messages_received);
1207 return;
1208}
1209
1210/*
1211 * Routine: ipc_mqueue_peek_locked
1212 * Purpose:
1213 * Peek at a (non-set) message queue to see if it has a message
1214 * matching the sequence number provided (if zero, then the
1215 * first message in the queue) and return vital info about the
1216 * message.
1217 *
1218 * Conditions:
1219 * The io object corresponding to mq is locked by callers.
1220 * Other locks may be held by callers, so this routine cannot block.
1221 * Caller holds reference on the message queue.
1222 */
1223unsigned
1224ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1225 mach_port_seqno_t * seqnop,
1226 mach_msg_size_t * msg_sizep,
1227 mach_msg_id_t * msg_idp,
1228 mach_msg_max_trailer_t * msg_trailerp,
1229 ipc_kmsg_t *kmsgp)
1230{
1231 ipc_kmsg_queue_t kmsgq;
1232 ipc_kmsg_t kmsg;
1233 mach_port_seqno_t seqno, msgoff;
1234 unsigned res = 0;
1235 mach_msg_header_t *hdr;
1236
1237 seqno = 0;
1238 if (seqnop != NULL) {
1239 seqno = *seqnop;
1240 }
1241
1242 if (seqno == 0) {
1243 seqno = mq->imq_seqno;
1244 msgoff = 0;
1245 } else if (seqno >= mq->imq_seqno &&
1246 seqno < mq->imq_seqno + mq->imq_msgcount) {
1247 msgoff = seqno - mq->imq_seqno;
1248 } else {
1249 goto out;
1250 }
1251
1252 /* look for the message that would match that seqno */
1253 kmsgq = &mq->imq_messages;
1254 kmsg = ipc_kmsg_queue_first(kmsgq);
1255 while (msgoff-- && kmsg != IKM_NULL) {
1256 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1257 }
1258 if (kmsg == IKM_NULL) {
1259 goto out;
1260 }
1261
1262 /*
1263 * Validate kmsg signature before doing anything with it. Since we are holding
1264 * the mqueue lock here, and only header + trailer will be peeked on, just
1265 * do a partial validation to finish quickly.
1266 *
1267 * Partial kmsg signature is only supported on PAC devices.
1268 */
1269 ipc_kmsg_validate_partial_sig(kmsg);
1270
1271 hdr = ikm_header(kmsg);
1272 /* found one - return the requested info */
1273 if (seqnop != NULL) {
1274 *seqnop = seqno;
1275 }
1276 if (msg_sizep != NULL) {
1277 *msg_sizep = hdr->msgh_size;
1278 }
1279 if (msg_idp != NULL) {
1280 *msg_idp = hdr->msgh_id;
1281 }
1282 if (msg_trailerp != NULL) {
1283 memcpy(dst: msg_trailerp, src: ipc_kmsg_get_trailer(kmsg, false), n: sizeof(mach_msg_max_trailer_t));
1284 }
1285 if (kmsgp != NULL) {
1286 *kmsgp = kmsg;
1287 }
1288
1289 res = 1;
1290
1291out:
1292 return res;
1293}
1294
1295
1296/*
1297 * Routine: ipc_mqueue_peek
1298 * Purpose:
1299 * Peek at a (non-set) message queue to see if it has a message
1300 * matching the sequence number provided (if zero, then the
1301 * first message in the queue) and return vital info about the
1302 * message.
1303 *
1304 * Conditions:
1305 * The ipc_mqueue_t is unlocked.
1306 * Locks may be held by callers, so this routine cannot block.
1307 * Caller holds reference on the message queue.
1308 */
1309unsigned
1310ipc_mqueue_peek(ipc_mqueue_t mq,
1311 mach_port_seqno_t * seqnop,
1312 mach_msg_size_t * msg_sizep,
1313 mach_msg_id_t * msg_idp,
1314 mach_msg_max_trailer_t * msg_trailerp,
1315 ipc_kmsg_t *kmsgp)
1316{
1317 ipc_port_t port = ip_from_mq(mq);
1318 unsigned res;
1319
1320 ip_mq_lock(port);
1321
1322 res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1323 msg_trailerp, kmsgp);
1324
1325 ip_mq_unlock(port);
1326 return res;
1327}
1328
1329#if MACH_FLIPC
1330/*
1331 * Routine: ipc_mqueue_release_peek_ref
1332 * Purpose:
1333 * Release the reference on an mqueue's associated port which was
1334 * granted to a thread in ipc_mqueue_peek_on_thread (on the
1335 * MACH64_PEEK_MSG thread wakeup path).
1336 *
1337 * Conditions:
1338 * The ipc_mqueue_t should be locked on entry.
1339 * The ipc_mqueue_t will be _unlocked_ on return
1340 * (and potentially invalid!)
1341 *
1342 */
1343void
1344ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1345{
1346 ipc_port_t port = ip_from_mq(mqueue);
1347
1348 ip_mq_lock_held(port);
1349
1350 /*
1351 * clear any preposts this mq may have generated
1352 * (which would cause subsequent immediate wakeups)
1353 */
1354 waitq_clear_prepost_locked(&port->ip_waitq);
1355
1356 ip_mq_unlock(port);
1357
1358 /*
1359 * release the port reference: we need to do this outside the lock
1360 * because we might be holding the last port reference!
1361 **/
1362 ip_release(port);
1363}
1364#endif /* MACH_FLIPC */
1365
1366/*
1367 * Routine: ipc_mqueue_destroy_locked
1368 * Purpose:
1369 * Destroy a message queue.
1370 * Set any blocked senders running.
1371 * Destroy the kmsgs in the queue.
1372 * Conditions:
1373 * port locked
1374 * Receivers were removed when the receive right was "changed"
1375 */
1376boolean_t
1377ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1378{
1379 ipc_port_t port = ip_from_mq(mqueue);
1380 boolean_t reap = FALSE;
1381 struct turnstile *send_turnstile = port_send_turnstile(port);
1382
1383 /*
1384 * rouse all blocked senders
1385 * (don't boost anyone - we're tearing this queue down)
1386 * (never preposts)
1387 */
1388 port->ip_fullwaiters = false;
1389
1390 if (send_turnstile != TURNSTILE_NULL) {
1391 waitq_wakeup64_all(waitq: &send_turnstile->ts_waitq,
1392 IPC_MQUEUE_FULL,
1393 THREAD_RESTART, flags: WAITQ_WAKEUP_DEFAULT);
1394 }
1395
1396#if MACH_FLIPC
1397 ipc_kmsg_t kmsg;
1398
1399 cqe_foreach_element_safe(kmsg, &mqueue->imq_messages, ikm_link) {
1400 if (MACH_NODE_VALID(kmsg->ikm_node) &&
1401 FPORT_VALID(mqueue->imq_fport)) {
1402 flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1403 }
1404 }
1405#endif
1406
1407 /*
1408 * Move messages from the specified queue to the per-thread
1409 * clean/drain queue while we have the mqueue lock.
1410 */
1411 reap = ipc_kmsg_delayed_destroy_queue(queue: &mqueue->imq_messages);
1412
1413 /*
1414 * Wipe out message count, both for messages about to be
1415 * reaped and for reserved space for (previously) woken senders.
1416 * This is the indication to them that their reserved space is gone
1417 * (the mqueue was destroyed).
1418 */
1419 mqueue->imq_msgcount = 0;
1420
1421 /*
1422 * invalidate the waitq for subsequent mqueue operations,
1423 * the port lock could be dropped after invalidating the mqueue.
1424 */
1425
1426 waitq_invalidate(wq: &port->ip_waitq);
1427
1428 waitq_unlink_all_locked(waitq: &port->ip_waitq, NULL, free_l);
1429
1430 return reap;
1431}
1432
1433/*
1434 * Routine: ipc_mqueue_set_qlimit_locked
1435 * Purpose:
1436 * Changes a message queue limit; the maximum number
1437 * of messages which may be queued.
1438 * Conditions:
1439 * Port locked.
1440 */
1441
1442void
1443ipc_mqueue_set_qlimit_locked(
1444 ipc_mqueue_t mqueue,
1445 mach_port_msgcount_t qlimit)
1446{
1447 ipc_port_t port = ip_from_mq(mqueue);
1448
1449 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1450
1451 /* wake up senders allowed by the new qlimit */
1452 if (qlimit > mqueue->imq_qlimit) {
1453 mach_port_msgcount_t i, wakeup;
1454 struct turnstile *send_turnstile = port_send_turnstile(port);
1455
1456 /* caution: wakeup, qlimit are unsigned */
1457 wakeup = qlimit - mqueue->imq_qlimit;
1458
1459 for (i = 0; i < wakeup; i++) {
1460 /*
1461 * boost the priority of the awoken thread
1462 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1463 * the message queue slot we've just reserved.
1464 *
1465 * NOTE: this will never prepost
1466 */
1467 if (send_turnstile == TURNSTILE_NULL ||
1468 waitq_wakeup64_one(waitq: &send_turnstile->ts_waitq,
1469 IPC_MQUEUE_FULL,
1470 THREAD_AWAKENED,
1471 flags: WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1472 port->ip_fullwaiters = false;
1473 break;
1474 }
1475 mqueue->imq_msgcount++; /* give it to the awakened thread */
1476 }
1477 }
1478 mqueue->imq_qlimit = (uint16_t)qlimit;
1479}
1480
1481/*
1482 * Routine: ipc_mqueue_set_seqno_locked
1483 * Purpose:
1484 * Changes an mqueue's sequence number.
1485 * Conditions:
1486 * Caller holds a reference to the queue's containing object.
1487 */
1488void
1489ipc_mqueue_set_seqno_locked(
1490 ipc_mqueue_t mqueue,
1491 mach_port_seqno_t seqno)
1492{
1493 mqueue->imq_seqno = seqno;
1494}
1495
1496
1497/*
1498 * Routine: ipc_mqueue_copyin
1499 * Purpose:
1500 * Convert a name in a space to a message queue.
1501 * Conditions:
1502 * Nothing locked. If successful, the caller gets a ref for
1503 * for the object. This ref ensures the continued existence of
1504 * the queue.
1505 * Returns:
1506 * MACH_MSG_SUCCESS Found a message queue.
1507 * MACH_RCV_INVALID_NAME The space is dead.
1508 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1509 * MACH_RCV_INVALID_NAME
1510 * The denoted right is not receive or port set.
1511 * MACH_RCV_IN_SET Receive right is a member of a set.
1512 */
1513
1514mach_msg_return_t
1515ipc_mqueue_copyin(
1516 ipc_space_t space,
1517 mach_port_name_t name,
1518 ipc_object_t *objectp)
1519{
1520 ipc_entry_bits_t bits;
1521 ipc_object_t object;
1522 kern_return_t kr;
1523
1524 kr = ipc_right_lookup_read(space, name, bitsp: &bits, objectp: &object);
1525 if (kr != KERN_SUCCESS) {
1526 return MACH_RCV_INVALID_NAME;
1527 }
1528 /* object is locked and active */
1529
1530 if (bits & MACH_PORT_TYPE_RECEIVE) {
1531 __assert_only ipc_port_t port = ip_object_to_port(object);
1532 assert(ip_get_receiver_name(port) == name);
1533 assert(ip_in_space(port, space));
1534 }
1535 if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1536 io_reference(object);
1537 io_unlock(object);
1538 } else {
1539 io_unlock(object);
1540 /* guard exception if we never held the receive right in this entry */
1541 if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1542 mach_port_guard_exception(name, inguard: 0, portguard: 0, reason: kGUARD_EXC_RCV_INVALID_NAME);
1543 }
1544 return MACH_RCV_INVALID_NAME;
1545 }
1546
1547 *objectp = object;
1548 return MACH_MSG_SUCCESS;
1549}
1550