VirtualBox

source: vbox/trunk/src/VBox/Runtime/common/misc/reqpool.cpp

Last change on this file was 103547, checked in by vboxsync, 3 months ago

Runtime/reqpool.cpp: Another place where we need to decrement the idle thread count or exiting idle threads would still have one idle reference. Also spawn a new thread if the number of pending requests is greater than the number of idling threads (each one can only handle on request at a time and depending on the processing time of each request others might get delayed too much)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 43.9 KB
Line 
1/* $Id: reqpool.cpp 103547 2024-02-23 15:23:03Z vboxsync $ */
2/** @file
3 * IPRT - Request Pool.
4 */
5
6/*
7 * Copyright (C) 2006-2023 Oracle and/or its affiliates.
8 *
9 * This file is part of VirtualBox base platform packages, as
10 * available from https://www.virtualbox.org.
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation, in version 3 of the
15 * License.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 * General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, see <https://www.gnu.org/licenses>.
24 *
25 * The contents of this file may alternatively be used under the terms
26 * of the Common Development and Distribution License Version 1.0
27 * (CDDL), a copy of it is provided in the "COPYING.CDDL" file included
28 * in the VirtualBox distribution, in which case the provisions of the
29 * CDDL are applicable instead of those of the GPL.
30 *
31 * You may elect to license modified versions of this file under the
32 * terms and conditions of either the GPL or the CDDL or both.
33 *
34 * SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
35 */
36
37
38/*********************************************************************************************************************************
39* Header Files *
40*********************************************************************************************************************************/
41#include <iprt/req.h>
42#include "internal/iprt.h"
43
44#include <iprt/assert.h>
45#include <iprt/asm.h>
46#include <iprt/critsect.h>
47#include <iprt/err.h>
48#include <iprt/list.h>
49#include <iprt/log.h>
50#include <iprt/mem.h>
51#include <iprt/string.h>
52#include <iprt/time.h>
53#include <iprt/semaphore.h>
54#include <iprt/thread.h>
55
56#include "internal/req.h"
57#include "internal/magics.h"
58
59
60/*********************************************************************************************************************************
61* Defined Constants And Macros *
62*********************************************************************************************************************************/
63/** The max number of worker threads. */
64#define RTREQPOOL_MAX_THREADS UINT32_C(16384)
65/** The max number of milliseconds to push back. */
66#define RTREQPOOL_PUSH_BACK_MAX_MS RT_MS_1MIN
67/** The max number of free requests to keep around. */
68#define RTREQPOOL_MAX_FREE_REQUESTS (RTREQPOOL_MAX_THREADS * 2U)
69
70
71/*********************************************************************************************************************************
72* Structures and Typedefs *
73*********************************************************************************************************************************/
74typedef struct RTREQPOOLTHREAD
75{
76 /** Node in the RTREQPOOLINT::IdleThreads list. */
77 RTLISTNODE IdleNode;
78 /** Node in the RTREQPOOLINT::WorkerThreads list. */
79 RTLISTNODE ListNode;
80
81 /** The submit timestamp of the pending request. */
82 uint64_t uPendingNanoTs;
83 /** The submit timestamp of the request processing. */
84 uint64_t uProcessingNanoTs;
85 /** When this CPU went idle the last time. */
86 uint64_t uIdleNanoTs;
87 /** The number of requests processed by this thread. */
88 uint64_t cReqProcessed;
89 /** Total time the requests processed by this thread took to process. */
90 uint64_t cNsTotalReqProcessing;
91 /** Total time the requests processed by this thread had to wait in
92 * the queue before being scheduled. */
93 uint64_t cNsTotalReqQueued;
94 /** The CPU this was scheduled last time we checked. */
95 RTCPUID idLastCpu;
96
97 /** The submitter will put an incoming request here when scheduling an idle
98 * thread. */
99 PRTREQINT volatile pTodoReq;
100 /** The request the thread is currently processing. */
101 PRTREQINT volatile pPendingReq;
102
103 /** The thread handle. */
104 RTTHREAD hThread;
105 /** Nano seconds timestamp representing the birth time of the thread. */
106 uint64_t uBirthNanoTs;
107 /** Pointer to the request thread pool instance the thread is associated
108 * with. */
109 struct RTREQPOOLINT *pPool;
110} RTREQPOOLTHREAD;
111/** Pointer to a worker thread. */
112typedef RTREQPOOLTHREAD *PRTREQPOOLTHREAD;
113
114/**
115 * Request thread pool instance data.
116 */
117typedef struct RTREQPOOLINT
118{
119 /** Magic value (RTREQPOOL_MAGIC). */
120 uint32_t u32Magic;
121 /** The request pool name. */
122 char szName[12];
123
124 /** @name Config
125 * @{ */
126 /** The worker thread type. */
127 RTTHREADTYPE enmThreadType;
128 /** The work thread flags (RTTHREADFLAGS). */
129 uint32_t fThreadFlags;
130 /** The maximum number of worker threads. */
131 uint32_t cMaxThreads;
132 /** The minimum number of worker threads. */
133 uint32_t cMinThreads;
134 /** The number of milliseconds a thread needs to be idle before it is
135 * considered for retirement. */
136 uint32_t cMsMinIdle;
137 /** cMsMinIdle in nano seconds. */
138 uint64_t cNsMinIdle;
139 /** The idle thread sleep interval in milliseconds. */
140 RTMSINTERVAL cMsIdleSleep;
141 /** The number of threads which should be spawned before throttling kicks
142 * in. */
143 uint32_t cThreadsPushBackThreshold;
144 /** The max number of milliseconds to push back a submitter before creating
145 * a new worker thread once the threshold has been reached. */
146 uint32_t cMsMaxPushBack;
147 /** The minimum number of milliseconds to push back a submitter before
148 * creating a new worker thread once the threshold has been reached. */
149 uint32_t cMsMinPushBack;
150 /** The max number of free requests in the recycle LIFO. */
151 uint32_t cMaxFreeRequests;
152 /** @} */
153
154 /** Signaled by terminating worker threads. */
155 RTSEMEVENTMULTI hThreadTermEvt;
156
157 /** Destruction indicator. The worker threads checks in their loop. */
158 bool volatile fDestructing;
159
160 /** The current submitter push back in milliseconds.
161 * This is recalculated when worker threads come and go. */
162 uint32_t cMsCurPushBack;
163 /** The current number of worker threads. */
164 uint32_t cCurThreads;
165 /** Statistics: The total number of threads created. */
166 uint32_t cThreadsCreated;
167 /** Statistics: The timestamp when the last thread was created. */
168 uint64_t uLastThreadCreateNanoTs;
169 /** Linked list of worker threads. */
170 RTLISTANCHOR WorkerThreads;
171
172 /** The number of requests processed and counted in the time totals. */
173 uint64_t cReqProcessed;
174 /** Total time the requests processed by this thread took to process. */
175 uint64_t cNsTotalReqProcessing;
176 /** Total time the requests processed by this thread had to wait in
177 * the queue before being scheduled. */
178 uint64_t cNsTotalReqQueued;
179
180 /** Reference counter. */
181 uint32_t volatile cRefs;
182 /** The number of idle thread or threads in the process of becoming
183 * idle. This is increased before the to-be-idle thread tries to enter
184 * the critical section and add itself to the list. */
185 uint32_t volatile cIdleThreads;
186 /** Linked list of idle threads. */
187 RTLISTANCHOR IdleThreads;
188
189 /** Head of the request FIFO. */
190 PRTREQINT pPendingRequests;
191 /** Where to insert the next request. */
192 PRTREQINT *ppPendingRequests;
193 /** The number of requests currently pending. */
194 uint32_t cCurPendingRequests;
195 /** The number of requests currently being executed. */
196 uint32_t volatile cCurActiveRequests;
197 /** The number of requests submitted. */
198 uint64_t cReqSubmitted;
199 /** The number of cancelled. */
200 uint64_t cReqCancelled;
201
202 /** Head of the request recycling LIFO. */
203 PRTREQINT pFreeRequests;
204 /** The number of requests in the recycling LIFO. This is read without
205 * entering the critical section, thus volatile. */
206 uint32_t volatile cCurFreeRequests;
207
208 /** Critical section serializing access to members of this structure. */
209 RTCRITSECT CritSect;
210
211} RTREQPOOLINT;
212
213
214/**
215 * Used by exiting thread and the pool destruction code to cancel unexpected
216 * requests.
217 *
218 * @param pReq The request.
219 */
220static void rtReqPoolCancelReq(PRTREQINT pReq)
221{
222 pReq->uOwner.hPool = NIL_RTREQPOOL; /* force free */
223 pReq->enmState = RTREQSTATE_COMPLETED;
224 ASMAtomicWriteS32(&pReq->iStatusX, VERR_CANCELLED);
225 if (pReq->hPushBackEvt != NIL_RTSEMEVENTMULTI)
226 RTSemEventMultiSignal(pReq->hPushBackEvt);
227 RTSemEventSignal(pReq->EventSem);
228
229 RTReqRelease(pReq);
230}
231
232
233/**
234 * Recalculate the max pushback interval when adding or removing worker threads.
235 *
236 * @param pPool The pool. cMsCurPushBack will be changed.
237 */
238static void rtReqPoolRecalcPushBack(PRTREQPOOLINT pPool)
239{
240 uint32_t const cMsRange = pPool->cMsMaxPushBack - pPool->cMsMinPushBack;
241 uint32_t const cSteps = pPool->cMaxThreads - pPool->cThreadsPushBackThreshold;
242 uint32_t const iStep = pPool->cCurThreads - pPool->cThreadsPushBackThreshold;
243
244 uint32_t cMsCurPushBack;
245 if (cSteps == 0 /* disabled */)
246 cMsCurPushBack = 0;
247 else if ((cMsRange >> 2) >= cSteps)
248 cMsCurPushBack = cMsRange / cSteps * iStep;
249 else
250 cMsCurPushBack = (uint32_t)( (uint64_t)cMsRange * RT_NS_1MS / cSteps * iStep / RT_NS_1MS );
251 cMsCurPushBack += pPool->cMsMinPushBack;
252
253 pPool->cMsCurPushBack = cMsCurPushBack;
254}
255
256
257
258/**
259 * Performs thread exit.
260 *
261 * @returns Thread termination status code (VINF_SUCCESS).
262 * @param pPool The pool.
263 * @param pThread The thread.
264 * @param fLocked Whether we are inside the critical section
265 * already.
266 */
267static int rtReqPoolThreadExit(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, bool fLocked)
268{
269 if (!fLocked)
270 RTCritSectEnter(&pPool->CritSect);
271
272 /* Get out of the idle list. */
273 if (!RTListIsEmpty(&pThread->IdleNode))
274 {
275 RTListNodeRemove(&pThread->IdleNode);
276 Assert(pPool->cIdleThreads > 0);
277 ASMAtomicDecU32(&pPool->cIdleThreads);
278 }
279
280 /* Get out of the thread list. */
281 RTListNodeRemove(&pThread->ListNode);
282 Assert(pPool->cCurThreads > 0);
283 pPool->cCurThreads--;
284 rtReqPoolRecalcPushBack(pPool);
285
286 /* This shouldn't happen... */
287 PRTREQINT pReq = pThread->pTodoReq;
288 if (pReq)
289 {
290 AssertFailed();
291 pThread->pTodoReq = NULL;
292 rtReqPoolCancelReq(pReq);
293 }
294
295 /* If we're the last thread terminating, ping the destruction thread before
296 we leave the critical section. */
297 if ( RTListIsEmpty(&pPool->WorkerThreads)
298 && pPool->hThreadTermEvt != NIL_RTSEMEVENT)
299 RTSemEventMultiSignal(pPool->hThreadTermEvt);
300
301 RTCritSectLeave(&pPool->CritSect);
302
303 RTMemFree(pThread);
304 return VINF_SUCCESS;
305}
306
307
308
309/**
310 * Process one request.
311 *
312 * @param pPool The pool.
313 * @param pThread The worker thread.
314 * @param pReq The request to process.
315 */
316static void rtReqPoolThreadProcessRequest(PRTREQPOOLINT pPool, PRTREQPOOLTHREAD pThread, PRTREQINT pReq)
317{
318 /*
319 * Update thread state.
320 */
321 pThread->uProcessingNanoTs = RTTimeNanoTS();
322 pThread->uPendingNanoTs = pReq->uSubmitNanoTs;
323 pThread->pPendingReq = pReq;
324 ASMAtomicIncU32(&pPool->cCurActiveRequests);
325 Assert(pReq->u32Magic == RTREQ_MAGIC);
326
327 /*
328 * Do the actual processing.
329 */
330 rtReqProcessOne(pReq);
331
332 /*
333 * Update thread statistics and state.
334 */
335 ASMAtomicDecU32(&pPool->cCurActiveRequests);
336 pThread->pPendingReq = NULL;
337 uint64_t const uNsTsEnd = RTTimeNanoTS();
338 pThread->cNsTotalReqProcessing += uNsTsEnd - pThread->uProcessingNanoTs;
339 pThread->cNsTotalReqQueued += pThread->uProcessingNanoTs - pThread->uPendingNanoTs;
340 pThread->cReqProcessed++;
341}
342
343
344
345/**
346 * The Worker Thread Procedure.
347 *
348 * @returns VINF_SUCCESS.
349 * @param hThreadSelf The thread handle (unused).
350 * @param pvArg Pointer to the thread data.
351 */
352static DECLCALLBACK(int) rtReqPoolThreadProc(RTTHREAD hThreadSelf, void *pvArg)
353{
354 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)pvArg;
355 PRTREQPOOLINT pPool = pThread->pPool;
356
357 /*
358 * The work loop.
359 */
360 uint64_t cReqPrevProcessedIdle = UINT64_MAX;
361 uint64_t cReqPrevProcessedStat = 0;
362 uint64_t cNsPrevTotalReqProcessing = 0;
363 uint64_t cNsPrevTotalReqQueued = 0;
364 while (!pPool->fDestructing)
365 {
366 /*
367 * Process pending work.
368 */
369
370 /* Check if anything is scheduled directly to us. */
371 PRTREQINT pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
372 if (pReq)
373 {
374 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
375 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
376 continue;
377 }
378
379 ASMAtomicIncU32(&pPool->cIdleThreads);
380 RTCritSectEnter(&pPool->CritSect);
381
382 /* Update the global statistics. */
383 if (cReqPrevProcessedStat != pThread->cReqProcessed)
384 {
385 pPool->cReqProcessed += pThread->cReqProcessed - cReqPrevProcessedStat;
386 cReqPrevProcessedStat = pThread->cReqProcessed;
387 pPool->cNsTotalReqProcessing += pThread->cNsTotalReqProcessing - cNsPrevTotalReqProcessing;
388 cNsPrevTotalReqProcessing = pThread->cNsTotalReqProcessing;
389 pPool->cNsTotalReqQueued += pThread->cNsTotalReqQueued - cNsPrevTotalReqQueued;
390 cNsPrevTotalReqQueued = pThread->cNsTotalReqQueued;
391 }
392
393 /* Recheck the todo request pointer after entering the critsect. */
394 pReq = ASMAtomicXchgPtrT(&pThread->pTodoReq, NULL, PRTREQINT);
395 if (pReq)
396 {
397 Assert(RTListIsEmpty(&pThread->IdleNode)); /* Must not be in the idle list. */
398 ASMAtomicDecU32(&pPool->cIdleThreads); /* Was already marked as idle above. */
399 RTCritSectLeave(&pPool->CritSect);
400
401 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
402 continue;
403 }
404
405 /* Any pending requests in the queue? */
406 pReq = pPool->pPendingRequests;
407 if (pReq)
408 {
409 pPool->pPendingRequests = pReq->pNext;
410 if (pReq->pNext == NULL)
411 pPool->ppPendingRequests = &pPool->pPendingRequests;
412 Assert(pPool->cCurPendingRequests > 0);
413 pPool->cCurPendingRequests--;
414
415 /* Un-idle ourselves and process the request. */
416 if (!RTListIsEmpty(&pThread->IdleNode))
417 {
418 RTListNodeRemove(&pThread->IdleNode);
419 RTListInit(&pThread->IdleNode);
420 ASMAtomicDecU32(&pPool->cIdleThreads);
421 }
422 ASMAtomicDecU32(&pPool->cIdleThreads);
423 RTCritSectLeave(&pPool->CritSect);
424
425 rtReqPoolThreadProcessRequest(pPool, pThread, pReq);
426 continue;
427 }
428
429 /*
430 * Nothing to do, go idle.
431 */
432 if (cReqPrevProcessedIdle != pThread->cReqProcessed)
433 {
434 cReqPrevProcessedIdle = pThread->cReqProcessed;
435 pThread->uIdleNanoTs = RTTimeNanoTS();
436 }
437 else if (pPool->cCurThreads > pPool->cMinThreads)
438 {
439 uint64_t cNsIdle = RTTimeNanoTS() - pThread->uIdleNanoTs;
440 if (cNsIdle >= pPool->cNsMinIdle)
441 {
442 ASMAtomicDecU32(&pPool->cIdleThreads); /* Was already marked as idle above. */
443 return rtReqPoolThreadExit(pPool, pThread, true /*fLocked*/);
444 }
445 }
446
447 if (RTListIsEmpty(&pThread->IdleNode))
448 RTListPrepend(&pPool->IdleThreads, &pThread->IdleNode);
449 else
450 ASMAtomicDecU32(&pPool->cIdleThreads);
451 RTThreadUserReset(hThreadSelf);
452 uint32_t const cMsSleep = pPool->cMsIdleSleep;
453
454 RTCritSectLeave(&pPool->CritSect);
455
456 RTThreadUserWait(hThreadSelf, cMsSleep);
457 }
458
459 return rtReqPoolThreadExit(pPool, pThread, false /*fLocked*/);
460}
461
462
463/**
464 * Create a new worker thread.
465 *
466 * @param pPool The pool needing new worker thread.
467 * @remarks Caller owns the critical section
468 */
469static void rtReqPoolCreateNewWorker(RTREQPOOL pPool)
470{
471 PRTREQPOOLTHREAD pThread = (PRTREQPOOLTHREAD)RTMemAllocZ(sizeof(RTREQPOOLTHREAD));
472 if (!pThread)
473 return;
474
475 pThread->uBirthNanoTs = RTTimeNanoTS();
476 pThread->pPool = pPool;
477 pThread->idLastCpu = NIL_RTCPUID;
478 pThread->hThread = NIL_RTTHREAD;
479 RTListInit(&pThread->IdleNode);
480 RTListAppend(&pPool->WorkerThreads, &pThread->ListNode);
481 pPool->cCurThreads++;
482 pPool->cThreadsCreated++;
483
484 int rc = RTThreadCreateF(&pThread->hThread, rtReqPoolThreadProc, pThread, 0 /*default stack size*/,
485 pPool->enmThreadType, pPool->fThreadFlags, "%s%02u", pPool->szName, pPool->cThreadsCreated);
486 if (RT_SUCCESS(rc))
487 pPool->uLastThreadCreateNanoTs = pThread->uBirthNanoTs;
488 else
489 {
490 pPool->cCurThreads--;
491 RTListNodeRemove(&pThread->ListNode);
492 RTMemFree(pThread);
493 }
494}
495
496
497/**
498 * Repel the submitter, giving the worker threads a chance to process the
499 * incoming request.
500 *
501 * @returns Success if a worker picked up the request, failure if not. The
502 * critical section has been left on success, while we'll be inside it
503 * on failure.
504 * @param pPool The pool.
505 * @param pReq The incoming request.
506 */
507static int rtReqPoolPushBack(PRTREQPOOLINT pPool, PRTREQINT pReq)
508{
509 /*
510 * Lazily create the push back semaphore that we'll be blociing on.
511 */
512 int rc;
513 RTSEMEVENTMULTI hEvt = pReq->hPushBackEvt;
514 if (hEvt == NIL_RTSEMEVENTMULTI)
515 {
516 rc = RTSemEventMultiCreate(&hEvt);
517 if (RT_FAILURE(rc))
518 return rc;
519 pReq->hPushBackEvt = hEvt;
520 }
521
522 /*
523 * Prepare the request and semaphore.
524 */
525 uint32_t const cMsTimeout = pPool->cMsCurPushBack;
526 pReq->fSignalPushBack = true;
527 RTReqRetain(pReq);
528 RTSemEventMultiReset(hEvt);
529
530 RTCritSectLeave(&pPool->CritSect);
531
532 /*
533 * Block.
534 */
535 rc = RTSemEventMultiWait(hEvt, cMsTimeout);
536 if (RT_FAILURE(rc))
537 {
538 AssertMsg(rc == VERR_TIMEOUT, ("%Rrc\n", rc));
539 RTCritSectEnter(&pPool->CritSect);
540 }
541 RTReqRelease(pReq);
542 return rc;
543}
544
545
546
547DECLHIDDEN(void) rtReqPoolSubmit(PRTREQPOOLINT pPool, PRTREQINT pReq)
548{
549 RTCritSectEnter(&pPool->CritSect);
550
551 pPool->cReqSubmitted++;
552
553 /*
554 * Try schedule the request to a thread that's currently idle.
555 */
556 PRTREQPOOLTHREAD pThread = RTListGetFirst(&pPool->IdleThreads, RTREQPOOLTHREAD, IdleNode);
557 if (pThread)
558 {
559 /** @todo CPU affinity??? */
560 ASMAtomicWritePtr(&pThread->pTodoReq, pReq);
561
562 RTListNodeRemove(&pThread->IdleNode);
563 RTListInit(&pThread->IdleNode);
564 ASMAtomicDecU32(&pPool->cIdleThreads);
565
566 RTThreadUserSignal(pThread->hThread);
567
568 RTCritSectLeave(&pPool->CritSect);
569 return;
570 }
571 Assert(RTListIsEmpty(&pPool->IdleThreads));
572
573 /*
574 * Put the request in the pending queue.
575 */
576 pReq->pNext = NULL;
577 *pPool->ppPendingRequests = pReq;
578 pPool->ppPendingRequests = (PRTREQINT *)&pReq->pNext;
579 pPool->cCurPendingRequests++;
580
581 /*
582 * If there is an incoming worker thread already or we've reached the
583 * maximum number of worker threads, we're done.
584 */
585 if ( pPool->cIdleThreads >= pPool->cCurPendingRequests
586 || pPool->cCurThreads >= pPool->cMaxThreads)
587 {
588 RTCritSectLeave(&pPool->CritSect);
589 return;
590 }
591
592 /*
593 * Push back before creating a new worker thread.
594 */
595 if ( pPool->cCurThreads > pPool->cThreadsPushBackThreshold
596 && (RTTimeNanoTS() - pReq->uSubmitNanoTs) / RT_NS_1MS >= pPool->cMsCurPushBack )
597 {
598 int rc = rtReqPoolPushBack(pPool, pReq);
599 if (RT_SUCCESS(rc))
600 return;
601 }
602
603 /*
604 * Create a new thread for processing the request.
605 * For simplicity, we don't bother leaving the critical section while doing so.
606 */
607 rtReqPoolCreateNewWorker(pPool);
608
609 RTCritSectLeave(&pPool->CritSect);
610 return;
611}
612
613
614/**
615 * Worker for RTReqCancel that looks for the request in the pending list and
616 * completes it if found there.
617 *
618 * @param pPool The request thread pool.
619 * @param pReq The request.
620 */
621DECLHIDDEN(void) rtReqPoolCancel(PRTREQPOOLINT pPool, PRTREQINT pReq)
622{
623 RTCritSectEnter(&pPool->CritSect);
624
625 pPool->cReqCancelled++;
626
627 /*
628 * Check if the request is in the pending list.
629 */
630 PRTREQINT pPrev = NULL;
631 PRTREQINT pCur = pPool->pPendingRequests;
632 while (pCur)
633 if (pCur != pReq)
634 {
635 pPrev = pCur;
636 pCur = pCur->pNext;
637 }
638 else
639 {
640 /*
641 * Unlink it and process it.
642 */
643 if (!pPrev)
644 {
645 pPool->pPendingRequests = pReq->pNext;
646 if (!pReq->pNext)
647 pPool->ppPendingRequests = &pPool->pPendingRequests;
648 }
649 else
650 {
651 pPrev->pNext = pReq->pNext;
652 if (!pReq->pNext)
653 pPool->ppPendingRequests = (PRTREQINT *)&pPrev->pNext;
654 }
655 Assert(pPool->cCurPendingRequests > 0);
656 pPool->cCurPendingRequests--;
657
658 rtReqProcessOne(pReq);
659 break;
660 }
661
662 RTCritSectLeave(&pPool->CritSect);
663 return;
664}
665
666
667/**
668 * Frees a requst.
669 *
670 * @returns true if recycled, false if not.
671 * @param pPool The request thread pool.
672 * @param pReq The request.
673 */
674DECLHIDDEN(bool) rtReqPoolRecycle(PRTREQPOOLINT pPool, PRTREQINT pReq)
675{
676 if ( pPool
677 && ASMAtomicReadU32(&pPool->cCurFreeRequests) < pPool->cMaxFreeRequests)
678 {
679 RTCritSectEnter(&pPool->CritSect);
680 if (pPool->cCurFreeRequests < pPool->cMaxFreeRequests)
681 {
682 pReq->pNext = pPool->pFreeRequests;
683 pPool->pFreeRequests = pReq;
684 ASMAtomicIncU32(&pPool->cCurFreeRequests);
685
686 RTCritSectLeave(&pPool->CritSect);
687 return true;
688 }
689
690 RTCritSectLeave(&pPool->CritSect);
691 }
692 return false;
693}
694
695
696RTDECL(int) RTReqPoolCreate(uint32_t cMaxThreads, RTMSINTERVAL cMsMinIdle,
697 uint32_t cThreadsPushBackThreshold, uint32_t cMsMaxPushBack,
698 const char *pszName, PRTREQPOOL phPool)
699{
700 /*
701 * Validate and massage the config.
702 */
703 if (cMaxThreads == UINT32_MAX)
704 cMaxThreads = RTREQPOOL_MAX_THREADS;
705 AssertMsgReturn(cMaxThreads > 0 && cMaxThreads <= RTREQPOOL_MAX_THREADS, ("%u\n", cMaxThreads), VERR_OUT_OF_RANGE);
706 uint32_t const cMinThreads = cMaxThreads > 2 ? 2 : cMaxThreads - 1;
707
708 if (cThreadsPushBackThreshold == 0)
709 cThreadsPushBackThreshold = cMinThreads;
710 else if (cThreadsPushBackThreshold == UINT32_MAX)
711 cThreadsPushBackThreshold = cMaxThreads;
712 AssertMsgReturn(cThreadsPushBackThreshold <= cMaxThreads, ("%u/%u\n", cThreadsPushBackThreshold, cMaxThreads), VERR_OUT_OF_RANGE);
713
714 if (cMsMaxPushBack == UINT32_MAX)
715 cMsMaxPushBack = RTREQPOOL_PUSH_BACK_MAX_MS;
716 AssertMsgReturn(cMsMaxPushBack <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", cMsMaxPushBack), VERR_OUT_OF_RANGE);
717 uint32_t const cMsMinPushBack = cMsMaxPushBack >= 200 ? 100 : cMsMaxPushBack / 2;
718
719 AssertPtrReturn(pszName, VERR_INVALID_POINTER);
720 size_t cchName = strlen(pszName);
721 AssertReturn(cchName > 0, VERR_INVALID_PARAMETER);
722 Assert(cchName <= 10);
723
724 AssertPtrReturn(phPool, VERR_INVALID_POINTER);
725
726 /*
727 * Create and initialize the pool.
728 */
729 PRTREQPOOLINT pPool = (PRTREQPOOLINT)RTMemAlloc(sizeof(*pPool));
730 if (!pPool)
731 return VERR_NO_MEMORY;
732
733 pPool->u32Magic = RTREQPOOL_MAGIC;
734 RTStrCopy(pPool->szName, sizeof(pPool->szName), pszName);
735
736 pPool->enmThreadType = RTTHREADTYPE_DEFAULT;
737 pPool->fThreadFlags = 0;
738 pPool->cMaxThreads = cMaxThreads;
739 pPool->cMinThreads = cMinThreads;
740 pPool->cMsMinIdle = cMsMinIdle == RT_INDEFINITE_WAIT || cMsMinIdle >= UINT32_MAX ? UINT32_MAX : cMsMinIdle;
741 pPool->cNsMinIdle = pPool->cMsMinIdle == UINT32_MAX ? UINT64_MAX : cMsMinIdle * RT_NS_1MS_64;
742 pPool->cMsIdleSleep = pPool->cMsMinIdle == UINT32_MAX ? RT_INDEFINITE_WAIT : RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
743 pPool->cThreadsPushBackThreshold = cThreadsPushBackThreshold;
744 pPool->cMsMaxPushBack = cMsMaxPushBack;
745 pPool->cMsMinPushBack = cMsMinPushBack;
746 pPool->cMaxFreeRequests = cMaxThreads * 2;
747 pPool->hThreadTermEvt = NIL_RTSEMEVENTMULTI;
748 pPool->fDestructing = false;
749 pPool->cMsCurPushBack = 0;
750 pPool->cCurThreads = 0;
751 pPool->cThreadsCreated = 0;
752 pPool->uLastThreadCreateNanoTs = 0;
753 RTListInit(&pPool->WorkerThreads);
754 pPool->cReqProcessed = 0;
755 pPool->cNsTotalReqProcessing= 0;
756 pPool->cNsTotalReqQueued = 0;
757 pPool->cRefs = 1;
758 pPool->cIdleThreads = 0;
759 RTListInit(&pPool->IdleThreads);
760 pPool->pPendingRequests = NULL;
761 pPool->ppPendingRequests = &pPool->pPendingRequests;
762 pPool->cCurPendingRequests = 0;
763 pPool->cCurActiveRequests = 0;
764 pPool->cReqSubmitted = 0;
765 pPool->cReqCancelled = 0;
766 pPool->pFreeRequests = NULL;
767 pPool->cCurFreeRequests = 0;
768
769 int rc = RTSemEventMultiCreate(&pPool->hThreadTermEvt);
770 if (RT_SUCCESS(rc))
771 {
772 rc = RTCritSectInit(&pPool->CritSect);
773 if (RT_SUCCESS(rc))
774 {
775 *phPool = pPool;
776 return VINF_SUCCESS;
777 }
778
779 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
780 }
781 pPool->u32Magic = RTREQPOOL_MAGIC_DEAD;
782 RTMemFree(pPool);
783 return rc;
784}
785
786
787
788RTDECL(int) RTReqPoolSetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar, uint64_t uValue)
789{
790 PRTREQPOOLINT pPool = hPool;
791 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
792 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
793 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, VERR_INVALID_PARAMETER);
794
795 RTCritSectEnter(&pPool->CritSect);
796
797 bool fWakeUpIdleThreads = false;
798 int rc = VINF_SUCCESS;
799 switch (enmVar)
800 {
801 case RTREQPOOLCFGVAR_THREAD_TYPE:
802 AssertMsgBreakStmt(uValue > (uint64_t)RTTHREADTYPE_INVALID && uValue < (uint64_t)RTTHREADTYPE_END,
803 ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
804
805 pPool->enmThreadType = (RTTHREADTYPE)uValue;
806 break;
807
808 case RTREQPOOLCFGVAR_THREAD_FLAGS:
809 AssertMsgBreakStmt(!(uValue & ~(uint64_t)RTTHREADFLAGS_MASK) && !(uValue & RTTHREADFLAGS_WAITABLE),
810 ("%#llx\n", uValue), rc = VERR_INVALID_FLAGS);
811
812 pPool->fThreadFlags = (uint32_t)uValue;
813 break;
814
815 case RTREQPOOLCFGVAR_MIN_THREADS:
816 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
817 fWakeUpIdleThreads = pPool->cMinThreads > (uint32_t)uValue;
818 pPool->cMinThreads = (uint32_t)uValue;
819 if (pPool->cMinThreads > pPool->cMaxThreads)
820 pPool->cMaxThreads = pPool->cMinThreads;
821 if ( pPool->cThreadsPushBackThreshold < pPool->cMinThreads
822 || pPool->cThreadsPushBackThreshold > pPool->cMaxThreads)
823 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
824 rtReqPoolRecalcPushBack(pPool);
825 break;
826
827 case RTREQPOOLCFGVAR_MAX_THREADS:
828 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_THREADS && uValue >= 1, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
829 pPool->cMaxThreads = (uint32_t)uValue;
830 if (pPool->cMaxThreads < pPool->cMinThreads)
831 {
832 pPool->cMinThreads = pPool->cMaxThreads;
833 fWakeUpIdleThreads = true;
834 }
835 if (pPool->cMaxThreads < pPool->cThreadsPushBackThreshold)
836 pPool->cThreadsPushBackThreshold = pPool->cMinThreads + (pPool->cMaxThreads - pPool->cMinThreads) / 2;
837 rtReqPoolRecalcPushBack(pPool);
838 break;
839
840 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
841 AssertMsgBreakStmt(uValue < UINT32_MAX || uValue == RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
842 if (uValue < UINT32_MAX && uValue != RT_INDEFINITE_WAIT)
843 {
844 fWakeUpIdleThreads = pPool->cMsMinIdle != (uint32_t)uValue;
845 pPool->cMsMinIdle = (uint32_t)uValue;
846 pPool->cNsMinIdle = pPool->cMsMinIdle * RT_NS_1MS_64;
847 if (pPool->cMsIdleSleep > pPool->cMsMinIdle)
848 pPool->cMsIdleSleep = RT_MAX(RT_MS_1SEC, pPool->cMsMinIdle);
849 }
850 else
851 {
852 pPool->cMsMinIdle = UINT32_MAX;
853 pPool->cNsMinIdle = UINT64_MAX;
854 pPool->cMsIdleSleep = RT_INDEFINITE_WAIT;
855 }
856 break;
857
858 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
859 AssertMsgBreakStmt(uValue <= RT_INDEFINITE_WAIT, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
860 fWakeUpIdleThreads = pPool->cMsMinIdle > (RTMSINTERVAL)uValue;
861 pPool->cMsIdleSleep = (RTMSINTERVAL)uValue;
862 if (pPool->cMsIdleSleep == RT_INDEFINITE_WAIT)
863 {
864 pPool->cMsMinIdle = UINT32_MAX;
865 pPool->cNsMinIdle = UINT64_MAX;
866 }
867 break;
868
869 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
870 if (uValue == UINT64_MAX)
871 pPool->cThreadsPushBackThreshold = pPool->cMaxThreads;
872 else if (uValue == 0)
873 pPool->cThreadsPushBackThreshold = pPool->cMinThreads;
874 else
875 {
876 AssertMsgBreakStmt(uValue <= pPool->cMaxThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
877 AssertMsgBreakStmt(uValue >= pPool->cMinThreads, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
878 pPool->cThreadsPushBackThreshold = (uint32_t)uValue;
879 }
880 break;
881
882 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
883 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
884 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
885 else
886 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
887 pPool->cMsMinPushBack = (uint32_t)uValue;
888 if (pPool->cMsMaxPushBack < pPool->cMsMinPushBack)
889 pPool->cMsMaxPushBack = pPool->cMsMinPushBack;
890 rtReqPoolRecalcPushBack(pPool);
891 break;
892
893 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
894 if (uValue == UINT32_MAX || uValue == UINT64_MAX)
895 uValue = RTREQPOOL_PUSH_BACK_MAX_MS;
896 else
897 AssertMsgBreakStmt(uValue <= RTREQPOOL_PUSH_BACK_MAX_MS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
898 pPool->cMsMaxPushBack = (uint32_t)uValue;
899 if (pPool->cMsMinPushBack < pPool->cMsMaxPushBack)
900 pPool->cMsMinPushBack = pPool->cMsMaxPushBack;
901 rtReqPoolRecalcPushBack(pPool);
902 break;
903
904 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
905 if (uValue == UINT64_MAX)
906 {
907 pPool->cMaxFreeRequests = pPool->cMaxThreads * 2;
908 if (pPool->cMaxFreeRequests < 16)
909 pPool->cMaxFreeRequests = 16;
910 }
911 else
912 {
913 AssertMsgBreakStmt(uValue <= RTREQPOOL_MAX_FREE_REQUESTS, ("%llu\n", uValue), rc = VERR_OUT_OF_RANGE);
914 pPool->cMaxFreeRequests = (uint32_t)uValue;
915 }
916
917 while (pPool->cCurFreeRequests > pPool->cMaxFreeRequests)
918 {
919 PRTREQINT pReq = pPool->pFreeRequests;
920 pPool->pFreeRequests = pReq->pNext;
921 ASMAtomicDecU32(&pPool->cCurFreeRequests);
922 rtReqFreeIt(pReq);
923 }
924 break;
925
926 default:
927 AssertFailed();
928 rc = VERR_IPE_NOT_REACHED_DEFAULT_CASE;
929 }
930
931 /* Wake up all idle threads if required. */
932 if (fWakeUpIdleThreads)
933 {
934 Assert(rc == VINF_SUCCESS);
935 PRTREQPOOLTHREAD pThread;
936 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
937 {
938 RTThreadUserSignal(pThread->hThread);
939 }
940 }
941
942 RTCritSectLeave(&pPool->CritSect);
943
944 return rc;
945}
946RT_EXPORT_SYMBOL(RTReqPoolSetCfgVar);
947
948
949RTDECL(uint64_t) RTReqPoolGetCfgVar(RTREQPOOL hPool, RTREQPOOLCFGVAR enmVar)
950{
951 PRTREQPOOLINT pPool = hPool;
952 AssertPtrReturn(pPool, UINT64_MAX);
953 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
954 AssertReturn(enmVar > RTREQPOOLCFGVAR_INVALID && enmVar < RTREQPOOLCFGVAR_END, UINT64_MAX);
955
956 RTCritSectEnter(&pPool->CritSect);
957
958 uint64_t u64;
959 switch (enmVar)
960 {
961 case RTREQPOOLCFGVAR_THREAD_TYPE:
962 u64 = pPool->enmThreadType;
963 break;
964
965 case RTREQPOOLCFGVAR_THREAD_FLAGS:
966 u64 = pPool->fThreadFlags;
967 break;
968
969 case RTREQPOOLCFGVAR_MIN_THREADS:
970 u64 = pPool->cMinThreads;
971 break;
972
973 case RTREQPOOLCFGVAR_MAX_THREADS:
974 u64 = pPool->cMaxThreads;
975 break;
976
977 case RTREQPOOLCFGVAR_MS_MIN_IDLE:
978 u64 = pPool->cMsMinIdle;
979 break;
980
981 case RTREQPOOLCFGVAR_MS_IDLE_SLEEP:
982 u64 = pPool->cMsIdleSleep;
983 break;
984
985 case RTREQPOOLCFGVAR_PUSH_BACK_THRESHOLD:
986 u64 = pPool->cThreadsPushBackThreshold;
987 break;
988
989 case RTREQPOOLCFGVAR_PUSH_BACK_MIN_MS:
990 u64 = pPool->cMsMinPushBack;
991 break;
992
993 case RTREQPOOLCFGVAR_PUSH_BACK_MAX_MS:
994 u64 = pPool->cMsMaxPushBack;
995 break;
996
997 case RTREQPOOLCFGVAR_MAX_FREE_REQUESTS:
998 u64 = pPool->cMaxFreeRequests;
999 break;
1000
1001 default:
1002 AssertFailed();
1003 u64 = UINT64_MAX;
1004 break;
1005 }
1006
1007 RTCritSectLeave(&pPool->CritSect);
1008
1009 return u64;
1010}
1011RT_EXPORT_SYMBOL(RTReqGetQueryCfgVar);
1012
1013
1014RTDECL(uint64_t) RTReqPoolGetStat(RTREQPOOL hPool, RTREQPOOLSTAT enmStat)
1015{
1016 PRTREQPOOLINT pPool = hPool;
1017 AssertPtrReturn(pPool, UINT64_MAX);
1018 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT64_MAX);
1019 AssertReturn(enmStat > RTREQPOOLSTAT_INVALID && enmStat < RTREQPOOLSTAT_END, UINT64_MAX);
1020
1021 RTCritSectEnter(&pPool->CritSect);
1022
1023 uint64_t u64;
1024 switch (enmStat)
1025 {
1026 case RTREQPOOLSTAT_THREADS: u64 = pPool->cCurThreads; break;
1027 case RTREQPOOLSTAT_THREADS_CREATED: u64 = pPool->cThreadsCreated; break;
1028 case RTREQPOOLSTAT_REQUESTS_PROCESSED: u64 = pPool->cReqProcessed; break;
1029 case RTREQPOOLSTAT_REQUESTS_SUBMITTED: u64 = pPool->cReqSubmitted; break;
1030 case RTREQPOOLSTAT_REQUESTS_CANCELLED: u64 = pPool->cReqCancelled; break;
1031 case RTREQPOOLSTAT_REQUESTS_PENDING: u64 = pPool->cCurPendingRequests; break;
1032 case RTREQPOOLSTAT_REQUESTS_ACTIVE: u64 = pPool->cCurActiveRequests; break;
1033 case RTREQPOOLSTAT_REQUESTS_FREE: u64 = pPool->cCurFreeRequests; break;
1034 case RTREQPOOLSTAT_NS_TOTAL_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing; break;
1035 case RTREQPOOLSTAT_NS_TOTAL_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued; break;
1036 case RTREQPOOLSTAT_NS_AVERAGE_REQ_PROCESSING: u64 = pPool->cNsTotalReqProcessing / RT_MAX(pPool->cReqProcessed, 1); break;
1037 case RTREQPOOLSTAT_NS_AVERAGE_REQ_QUEUED: u64 = pPool->cNsTotalReqQueued / RT_MAX(pPool->cReqProcessed, 1); break;
1038 default:
1039 AssertFailed();
1040 u64 = UINT64_MAX;
1041 break;
1042 }
1043
1044 RTCritSectLeave(&pPool->CritSect);
1045
1046 return u64;
1047}
1048RT_EXPORT_SYMBOL(RTReqPoolGetStat);
1049
1050
1051RTDECL(uint32_t) RTReqPoolRetain(RTREQPOOL hPool)
1052{
1053 PRTREQPOOLINT pPool = hPool;
1054 AssertPtrReturn(pPool, UINT32_MAX);
1055 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
1056
1057 return ASMAtomicIncU32(&pPool->cRefs);
1058}
1059RT_EXPORT_SYMBOL(RTReqPoolRetain);
1060
1061
1062RTDECL(uint32_t) RTReqPoolRelease(RTREQPOOL hPool)
1063{
1064 /*
1065 * Ignore NULL and validate the request.
1066 */
1067 if (!hPool)
1068 return 0;
1069 PRTREQPOOLINT pPool = hPool;
1070 AssertPtrReturn(pPool, UINT32_MAX);
1071 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, UINT32_MAX);
1072
1073 /*
1074 * Drop a reference, free it when it reaches zero.
1075 */
1076 uint32_t cRefs = ASMAtomicDecU32(&pPool->cRefs);
1077 if (cRefs == 0)
1078 {
1079 AssertReturn(ASMAtomicCmpXchgU32(&pPool->u32Magic, RTREQPOOL_MAGIC_DEAD, RTREQPOOL_MAGIC), UINT32_MAX);
1080
1081 RTCritSectEnter(&pPool->CritSect);
1082#ifdef RT_STRICT
1083 RTTHREAD const hSelf = RTThreadSelf();
1084#endif
1085
1086 /* Indicate to the worker threads that we're shutting down. */
1087 ASMAtomicWriteBool(&pPool->fDestructing, true);
1088 PRTREQPOOLTHREAD pThread;
1089 RTListForEach(&pPool->WorkerThreads, pThread, RTREQPOOLTHREAD, ListNode)
1090 {
1091 Assert(pThread->hThread != hSelf);
1092 RTThreadUserSignal(pThread->hThread);
1093 }
1094
1095 /* Cancel pending requests. */
1096 Assert(!pPool->pPendingRequests);
1097 while (pPool->pPendingRequests)
1098 {
1099 PRTREQINT pReq = pPool->pPendingRequests;
1100 pPool->pPendingRequests = pReq->pNext;
1101 rtReqPoolCancelReq(pReq);
1102 }
1103 pPool->ppPendingRequests = NULL;
1104 pPool->cCurPendingRequests = 0;
1105
1106 /* Wait for the workers to shut down. */
1107 while (!RTListIsEmpty(&pPool->WorkerThreads))
1108 {
1109 RTCritSectLeave(&pPool->CritSect);
1110 RTSemEventMultiWait(pPool->hThreadTermEvt, RT_MS_1MIN);
1111 RTCritSectEnter(&pPool->CritSect);
1112 /** @todo should we wait forever here? */
1113 }
1114
1115 /* Free recycled requests. */
1116 for (;;)
1117 {
1118 PRTREQINT pReq = pPool->pFreeRequests;
1119 if (!pReq)
1120 break;
1121 pPool->pFreeRequests = pReq->pNext;
1122 pPool->cCurFreeRequests--;
1123 rtReqFreeIt(pReq);
1124 }
1125
1126 /* Finally, free the critical section and pool instance. */
1127 RTSemEventMultiDestroy(pPool->hThreadTermEvt);
1128 RTCritSectLeave(&pPool->CritSect);
1129 RTCritSectDelete(&pPool->CritSect);
1130 RTMemFree(pPool);
1131 }
1132
1133 return cRefs;
1134}
1135RT_EXPORT_SYMBOL(RTReqPoolRelease);
1136
1137
1138RTDECL(int) RTReqPoolAlloc(RTREQPOOL hPool, RTREQTYPE enmType, PRTREQ *phReq)
1139{
1140 PRTREQPOOLINT pPool = hPool;
1141 AssertPtrReturn(pPool, VERR_INVALID_HANDLE);
1142 AssertReturn(pPool->u32Magic == RTREQPOOL_MAGIC, VERR_INVALID_HANDLE);
1143
1144 /*
1145 * Try recycle old requests.
1146 */
1147 if (ASMAtomicReadU32(&pPool->cCurFreeRequests) > 0)
1148 {
1149 RTCritSectEnter(&pPool->CritSect);
1150 PRTREQINT pReq = pPool->pFreeRequests;
1151 if (pReq)
1152 {
1153 ASMAtomicDecU32(&pPool->cCurFreeRequests);
1154 pPool->pFreeRequests = pReq->pNext;
1155
1156 RTCritSectLeave(&pPool->CritSect);
1157
1158 Assert(pReq->fPoolOrQueue);
1159 Assert(pReq->uOwner.hPool == pPool);
1160
1161 int rc = rtReqReInit(pReq, enmType);
1162 if (RT_SUCCESS(rc))
1163 {
1164 *phReq = pReq;
1165 LogFlow(("RTReqPoolAlloc: returns VINF_SUCCESS *phReq=%p recycled\n", pReq));
1166 return rc;
1167 }
1168 }
1169 else
1170 RTCritSectLeave(&pPool->CritSect);
1171 }
1172
1173 /*
1174 * Allocate a new request.
1175 */
1176 int rc = rtReqAlloc(enmType, true /*fPoolOrQueue*/, pPool, phReq);
1177 LogFlow(("RTReqPoolAlloc: returns %Rrc *phReq=%p\n", rc, *phReq));
1178 return rc;
1179}
1180RT_EXPORT_SYMBOL(RTReqPoolAlloc);
1181
1182
1183RTDECL(int) RTReqPoolCallEx( RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, ...)
1184{
1185 va_list va;
1186 va_start(va, cArgs);
1187 int rc = RTReqPoolCallExV(hPool, cMillies, phReq, fFlags, pfnFunction, cArgs, va);
1188 va_end(va);
1189 return rc;
1190}
1191RT_EXPORT_SYMBOL(RTReqPoolCallEx);
1192
1193
1194RTDECL(int) RTReqPoolCallExV(RTREQPOOL hPool, RTMSINTERVAL cMillies, PRTREQ *phReq, uint32_t fFlags, PFNRT pfnFunction, unsigned cArgs, va_list va)
1195{
1196 /*
1197 * Check input.
1198 */
1199 AssertPtrReturn(pfnFunction, VERR_INVALID_POINTER);
1200 AssertMsgReturn(!((uint32_t)fFlags & ~(uint32_t)(RTREQFLAGS_NO_WAIT | RTREQFLAGS_RETURN_MASK)), ("%#x\n", (uint32_t)fFlags), VERR_INVALID_PARAMETER);
1201 if (!(fFlags & RTREQFLAGS_NO_WAIT) || phReq)
1202 {
1203 AssertPtrReturn(phReq, VERR_INVALID_POINTER);
1204 *phReq = NIL_RTREQ;
1205 }
1206
1207 PRTREQINT pReq = NULL;
1208 AssertMsgReturn(cArgs * sizeof(uintptr_t) <= sizeof(pReq->u.Internal.aArgs), ("cArgs=%u\n", cArgs), VERR_TOO_MUCH_DATA);
1209
1210 /*
1211 * Allocate and initialize the request.
1212 */
1213 int rc = RTReqPoolAlloc(hPool, RTREQTYPE_INTERNAL, &pReq);
1214 if (RT_FAILURE(rc))
1215 return rc;
1216 pReq->fFlags = fFlags;
1217 pReq->u.Internal.pfn = pfnFunction;
1218 pReq->u.Internal.cArgs = cArgs;
1219 for (unsigned iArg = 0; iArg < cArgs; iArg++)
1220 pReq->u.Internal.aArgs[iArg] = va_arg(va, uintptr_t);
1221
1222 /*
1223 * Submit the request.
1224 */
1225 rc = RTReqSubmit(pReq, cMillies);
1226 if ( rc != VINF_SUCCESS
1227 && rc != VERR_TIMEOUT)
1228 {
1229 Assert(rc != VERR_INTERRUPTED);
1230 RTReqRelease(pReq);
1231 pReq = NULL;
1232 }
1233
1234 if (phReq)
1235 {
1236 *phReq = pReq;
1237 LogFlow(("RTReqPoolCallExV: returns %Rrc *phReq=%p\n", rc, pReq));
1238 }
1239 else
1240 {
1241 RTReqRelease(pReq);
1242 LogFlow(("RTReqPoolCallExV: returns %Rrc\n", rc));
1243 }
1244 return rc;
1245}
1246RT_EXPORT_SYMBOL(RTReqPoolCallExV);
1247
1248
1249RTDECL(int) RTReqPoolCallWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1250{
1251 PRTREQINT pReq;
1252 va_list va;
1253 va_start(va, cArgs);
1254 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_IPRT_STATUS,
1255 pfnFunction, cArgs, va);
1256 va_end(va);
1257 if (RT_SUCCESS(rc))
1258 rc = pReq->iStatusX;
1259 RTReqRelease(pReq);
1260 return rc;
1261}
1262RT_EXPORT_SYMBOL(RTReqPoolCallWait);
1263
1264
1265RTDECL(int) RTReqPoolCallNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1266{
1267 va_list va;
1268 va_start(va, cArgs);
1269 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_IPRT_STATUS | RTREQFLAGS_NO_WAIT,
1270 pfnFunction, cArgs, va);
1271 va_end(va);
1272 return rc;
1273}
1274RT_EXPORT_SYMBOL(RTReqPoolCallNoWait);
1275
1276
1277RTDECL(int) RTReqPoolCallVoidWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1278{
1279 PRTREQINT pReq;
1280 va_list va;
1281 va_start(va, cArgs);
1282 int rc = RTReqPoolCallExV(hPool, RT_INDEFINITE_WAIT, &pReq, RTREQFLAGS_VOID,
1283 pfnFunction, cArgs, va);
1284 va_end(va);
1285 if (RT_SUCCESS(rc))
1286 rc = pReq->iStatusX;
1287 RTReqRelease(pReq);
1288 return rc;
1289}
1290RT_EXPORT_SYMBOL(RTReqPoolCallVoidWait);
1291
1292
1293RTDECL(int) RTReqPoolCallVoidNoWait(RTREQPOOL hPool, PFNRT pfnFunction, unsigned cArgs, ...)
1294{
1295 va_list va;
1296 va_start(va, cArgs);
1297 int rc = RTReqPoolCallExV(hPool, 0, NULL, RTREQFLAGS_VOID | RTREQFLAGS_NO_WAIT,
1298 pfnFunction, cArgs, va);
1299 va_end(va);
1300 return rc;
1301}
1302RT_EXPORT_SYMBOL(RTReqPoolCallVoidNoWait);
1303
Note: See TracBrowser for help on using the repository browser.

© 2023 Oracle
ContactPrivacy policyTerms of Use