VirtualBox

source: vbox/trunk/src/VBox/Runtime/r3/os2/pipe-os2.cpp@ 103795

Last change on this file since 103795 was 98103, checked in by vboxsync, 2 years ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 33.2 KB
Line 
1/* $Id: pipe-os2.cpp 98103 2023-01-17 14:15:46Z vboxsync $ */
2/** @file
3 * IPRT - Anonymous Pipes, OS/2 Implementation.
4 */
5
6/*
7 * Copyright (C) 2010-2023 Oracle and/or its affiliates.
8 *
9 * This file is part of VirtualBox base platform packages, as
10 * available from https://www.virtualbox.org.
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation, in version 3 of the
15 * License.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 * General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, see <https://www.gnu.org/licenses>.
24 *
25 * The contents of this file may alternatively be used under the terms
26 * of the Common Development and Distribution License Version 1.0
27 * (CDDL), a copy of it is provided in the "COPYING.CDDL" file included
28 * in the VirtualBox distribution, in which case the provisions of the
29 * CDDL are applicable instead of those of the GPL.
30 *
31 * You may elect to license modified versions of this file under the
32 * terms and conditions of either the GPL or the CDDL or both.
33 *
34 * SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
35 */
36
37
38/*********************************************************************************************************************************
39* Header Files *
40*********************************************************************************************************************************/
41#define INCL_ERRORS
42#define INCL_DOSSEMAPHORES
43#include <os2.h>
44
45#include <iprt/pipe.h>
46#include "internal/iprt.h"
47
48#include <iprt/asm.h>
49#include <iprt/assert.h>
50#include <iprt/critsect.h>
51#include <iprt/err.h>
52#include <iprt/mem.h>
53#include <iprt/string.h>
54#include <iprt/poll.h>
55#include <iprt/process.h>
56#include <iprt/thread.h>
57#include <iprt/time.h>
58#include "internal/pipe.h"
59#include "internal/magics.h"
60
61
62/*********************************************************************************************************************************
63* Defined Constants And Macros *
64*********************************************************************************************************************************/
65/** The pipe buffer size we prefer. */
66#define RTPIPE_OS2_SIZE _32K
67
68
69/*********************************************************************************************************************************
70* Structures and Typedefs *
71*********************************************************************************************************************************/
72typedef struct RTPIPEINTERNAL
73{
74 /** Magic value (RTPIPE_MAGIC). */
75 uint32_t u32Magic;
76 /** The pipe handle. */
77 HPIPE hPipe;
78 /** Set if this is the read end, clear if it's the write end. */
79 bool fRead;
80 /** RTPipeFromNative: Leave open. */
81 bool fLeaveOpen;
82 /** Whether the pipe is in blocking or non-blocking mode. */
83 bool fBlocking;
84 /** Set if the pipe is broken. */
85 bool fBrokenPipe;
86 /** Usage counter. */
87 uint32_t cUsers;
88
89 /** The event semaphore associated with the pipe. */
90 HEV hev;
91 /** The handle of the poll set currently polling on this pipe.
92 * We can only have one poller at the time (lazy bird). */
93 RTPOLLSET hPollSet;
94 /** Critical section protecting the above members.
95 * (Taking the lazy/simple approach.) */
96 RTCRITSECT CritSect;
97
98} RTPIPEINTERNAL;
99
100
101/**
102 * Ensures that the pipe has a semaphore associated with it.
103 *
104 * @returns VBox status code.
105 * @param pThis The pipe.
106 */
107static int rtPipeOs2EnsureSem(RTPIPEINTERNAL *pThis)
108{
109 if (pThis->hev != NULLHANDLE)
110 return VINF_SUCCESS;
111
112 HEV hev;
113 APIRET orc = DosCreateEventSem(NULL, &hev, DC_SEM_SHARED, FALSE);
114 if (orc == NO_ERROR)
115 {
116 orc = DosSetNPipeSem(pThis->hPipe, (HSEM)hev, 1);
117 if (orc == NO_ERROR)
118 {
119 pThis->hev = hev;
120 return VINF_SUCCESS;
121 }
122
123 DosCloseEventSem(hev);
124 }
125 return RTErrConvertFromOS2(orc);
126}
127
128
129RTDECL(int) RTPipeCreate(PRTPIPE phPipeRead, PRTPIPE phPipeWrite, uint32_t fFlags)
130{
131 AssertPtrReturn(phPipeRead, VERR_INVALID_POINTER);
132 AssertPtrReturn(phPipeWrite, VERR_INVALID_POINTER);
133 AssertReturn(!(fFlags & ~RTPIPE_C_VALID_MASK), VERR_INVALID_PARAMETER);
134
135 /*
136 * Try create and connect a pipe pair.
137 */
138 APIRET orc;
139 HPIPE hPipeR;
140 HFILE hPipeW;
141 int rc;
142 for (;;)
143 {
144 static volatile uint32_t g_iNextPipe = 0;
145 char szName[128];
146 RTStrPrintf(szName, sizeof(szName), "\\pipe\\iprt-pipe-%u-%u", RTProcSelf(), ASMAtomicIncU32(&g_iNextPipe));
147
148 /*
149 * Create the read end of the pipe.
150 */
151 ULONG fPipeMode = 1 /*instance*/ | NP_TYPE_BYTE | NP_READMODE_BYTE | NP_NOWAIT;
152 ULONG fOpenMode = NP_ACCESS_DUPLEX | NP_WRITEBEHIND;
153 if (fFlags & RTPIPE_C_INHERIT_READ)
154 fOpenMode |= NP_INHERIT;
155 else
156 fOpenMode |= NP_NOINHERIT;
157 orc = DosCreateNPipe((PSZ)szName, &hPipeR, fOpenMode, fPipeMode, RTPIPE_OS2_SIZE, RTPIPE_OS2_SIZE, NP_DEFAULT_WAIT);
158 if (orc == NO_ERROR)
159 {
160 orc = DosConnectNPipe(hPipeR);
161 if (orc == ERROR_PIPE_NOT_CONNECTED || orc == NO_ERROR)
162 {
163 /*
164 * Connect to the pipe (the write end), attach sem below.
165 */
166 ULONG ulAction = 0;
167 ULONG fOpenW = OPEN_ACTION_FAIL_IF_NEW | OPEN_ACTION_OPEN_IF_EXISTS;
168 ULONG fModeW = OPEN_ACCESS_WRITEONLY | OPEN_SHARE_DENYNONE | OPEN_FLAGS_FAIL_ON_ERROR;
169 if (!(fFlags & RTPIPE_C_INHERIT_WRITE))
170 fModeW |= OPEN_FLAGS_NOINHERIT;
171 orc = DosOpen((PSZ)szName, &hPipeW, &ulAction, 0 /*cbFile*/, FILE_NORMAL,
172 fOpenW, fModeW, NULL /*peaop2*/);
173 if (orc == NO_ERROR)
174 break;
175 }
176 DosClose(hPipeR);
177 }
178 if ( orc != ERROR_PIPE_BUSY /* already exist - compatible */
179 && orc != ERROR_ACCESS_DENIED /* already exist - incompatible (?) */)
180 return RTErrConvertFromOS2(orc);
181 /* else: try again with a new name */
182 }
183
184 /*
185 * Create the two handles.
186 */
187 RTPIPEINTERNAL *pThisR = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
188 if (pThisR)
189 {
190 RTPIPEINTERNAL *pThisW = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
191 if (pThisW)
192 {
193 /* Crit sects. */
194 rc = RTCritSectInit(&pThisR->CritSect);
195 if (RT_SUCCESS(rc))
196 {
197 rc = RTCritSectInit(&pThisW->CritSect);
198 if (RT_SUCCESS(rc))
199 {
200 /* Initialize the structures. */
201 pThisR->u32Magic = RTPIPE_MAGIC;
202 pThisW->u32Magic = RTPIPE_MAGIC;
203 pThisR->hPipe = hPipeR;
204 pThisW->hPipe = hPipeW;
205 pThisR->hev = NULLHANDLE;
206 pThisW->hev = NULLHANDLE;
207 pThisR->fRead = true;
208 pThisW->fRead = false;
209 pThisR->fLeaveOpen = false;
210 pThisW->fLeaveOpen = false;
211 pThisR->fBlocking = false;
212 pThisW->fBlocking = true;
213 //pThisR->fBrokenPipe = false;
214 //pThisW->fBrokenPipe = false;
215 //pThisR->cUsers = 0;
216 //pThisW->cUsers = 0;
217 pThisR->hPollSet = NIL_RTPOLLSET;
218 pThisW->hPollSet = NIL_RTPOLLSET;
219
220 *phPipeRead = pThisR;
221 *phPipeWrite = pThisW;
222 return VINF_SUCCESS;
223 }
224
225 RTCritSectDelete(&pThisR->CritSect);
226 }
227 RTMemFree(pThisW);
228 }
229 else
230 rc = VERR_NO_MEMORY;
231 RTMemFree(pThisR);
232 }
233 else
234 rc = VERR_NO_MEMORY;
235
236 /* Don't call DosDisConnectNPipe! */
237 DosClose(hPipeW);
238 DosClose(hPipeR);
239 return rc;
240}
241
242
243RTDECL(int) RTPipeCloseEx(RTPIPE hPipe, bool fLeaveOpen)
244{
245 RTPIPEINTERNAL *pThis = hPipe;
246 if (pThis == NIL_RTPIPE)
247 return VINF_SUCCESS;
248 AssertPtrReturn(pThis, VERR_INVALID_PARAMETER);
249 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
250
251 /*
252 * Do the cleanup.
253 */
254 AssertReturn(ASMAtomicCmpXchgU32(&pThis->u32Magic, ~RTPIPE_MAGIC, RTPIPE_MAGIC), VERR_INVALID_HANDLE);
255 RTCritSectEnter(&pThis->CritSect);
256 Assert(pThis->cUsers == 0);
257
258 /* Don't call DosDisConnectNPipe! */
259 if (!fLeaveOpen && !pThis->fLeaveOpen)
260 DosClose(pThis->hPipe);
261 pThis->hPipe = (HPIPE)-1;
262
263 if (pThis->hev != NULLHANDLE)
264 {
265 DosCloseEventSem(pThis->hev);
266 pThis->hev = NULLHANDLE;
267 }
268
269 RTCritSectLeave(&pThis->CritSect);
270 RTCritSectDelete(&pThis->CritSect);
271
272 RTMemFree(pThis);
273
274 return VINF_SUCCESS;
275}
276
277
278RTDECL(int) RTPipeClose(RTPIPE hPipe)
279{
280 return RTPipeCloseEx(hPipe, false /*fLeaveOpen*/);
281}
282
283
284RTDECL(int) RTPipeFromNative(PRTPIPE phPipe, RTHCINTPTR hNativePipe, uint32_t fFlags)
285{
286 AssertPtrReturn(phPipe, VERR_INVALID_POINTER);
287 AssertReturn(!(fFlags & ~RTPIPE_N_VALID_MASK_FN), VERR_INVALID_PARAMETER);
288 AssertReturn(!!(fFlags & RTPIPE_N_READ) != !!(fFlags & RTPIPE_N_WRITE), VERR_INVALID_PARAMETER);
289
290 /*
291 * Get and validate the pipe handle info.
292 */
293 HPIPE hNative = (HPIPE)hNativePipe;
294 ULONG ulType = 0;
295 ULONG ulAttr = 0;
296 APIRET orc = DosQueryHType(hNative, &ulType, &ulAttr);
297 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
298 AssertReturn((ulType & 0x7) == HANDTYPE_PIPE, VERR_INVALID_HANDLE);
299
300#if 0
301 union
302 {
303 PIPEINFO PipeInfo;
304 uint8_t abPadding[sizeof(PIPEINFO) + 127];
305 } Buf;
306 orc = DosQueryNPipeInfo(hNative, 1, &Buf, sizeof(Buf));
307 if (orc != NO_ERROR)
308 {
309 /* Sorry, anonymous pips are not supported. */
310 AssertMsgFailed(("%d\n", orc));
311 return VERR_INVALID_HANDLE;
312 }
313 AssertReturn(Buf.PipeInfo.cbMaxInst == 1, VERR_INVALID_HANDLE);
314#endif
315
316 ULONG fPipeState = 0;
317 orc = DosQueryNPHState(hNative, &fPipeState);
318 if (orc != NO_ERROR)
319 {
320 /* Sorry, anonymous pips are not supported. */
321 AssertMsgFailed(("%d\n", orc));
322 return VERR_INVALID_HANDLE;
323 }
324 AssertReturn(!(fPipeState & NP_TYPE_MESSAGE), VERR_INVALID_HANDLE);
325 AssertReturn(!(fPipeState & NP_READMODE_MESSAGE), VERR_INVALID_HANDLE);
326 AssertReturn((fPipeState & 0xff) == 1, VERR_INVALID_HANDLE);
327
328 ULONG fFileState = 0;
329 orc = DosQueryFHState(hNative, &fFileState);
330 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), VERR_INVALID_HANDLE);
331 AssertMsgReturn( (fFileState & 0x3) == (fFlags & RTPIPE_N_READ ? OPEN_ACCESS_READONLY : OPEN_ACCESS_WRITEONLY)
332 || (fFileState & 0x3) == OPEN_ACCESS_READWRITE
333 , ("%#x\n", fFileState), VERR_INVALID_HANDLE);
334
335 /*
336 * Looks kind of OK. Fix the inherit flag.
337 */
338 orc = DosSetFHState(hNative, (fFileState & (OPEN_FLAGS_WRITE_THROUGH | OPEN_FLAGS_FAIL_ON_ERROR | OPEN_FLAGS_NO_CACHE))
339 | (fFlags & RTPIPE_N_INHERIT ? 0 : OPEN_FLAGS_NOINHERIT));
340 AssertMsgReturn(orc == NO_ERROR, ("%d\n", orc), RTErrConvertFromOS2(orc));
341
342
343 /*
344 * Create a handle so we can try rtPipeQueryInfo on it
345 * and see if we need to duplicate it to make that call work.
346 */
347 RTPIPEINTERNAL *pThis = (RTPIPEINTERNAL *)RTMemAllocZ(sizeof(RTPIPEINTERNAL));
348 if (!pThis)
349 return VERR_NO_MEMORY;
350 int rc = RTCritSectInit(&pThis->CritSect);
351 if (RT_SUCCESS(rc))
352 {
353 pThis->u32Magic = RTPIPE_MAGIC;
354 pThis->hPipe = hNative;
355 pThis->hev = NULLHANDLE;
356 pThis->fRead = RT_BOOL(fFlags & RTPIPE_N_READ);
357 pThis->fLeaveOpen = RT_BOOL(fFlags & RTPIPE_N_LEAVE_OPEN);
358 pThis->fBlocking = !(fPipeState & NP_NOWAIT);
359 //pThis->fBrokenPipe = false;
360 //pThis->cUsers = 0;
361 pThis->hPollSet = NIL_RTPOLLSET;
362
363 *phPipe = pThis;
364 return VINF_SUCCESS;
365
366 //RTCritSectDelete(&pThis->CritSect);
367 }
368 RTMemFree(pThis);
369 return rc;
370}
371
372RTDECL(RTHCINTPTR) RTPipeToNative(RTPIPE hPipe)
373{
374 RTPIPEINTERNAL *pThis = hPipe;
375 AssertPtrReturn(pThis, -1);
376 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, -1);
377
378 return (RTHCINTPTR)pThis->hPipe;
379}
380
381/**
382 * Prepare blocking mode.
383 *
384 * @returns IPRT status code.
385 * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
386 * attempted.
387 *
388 * @param pThis The pipe handle.
389 *
390 * @remarks Caller owns the critical section.
391 */
392static int rtPipeTryBlocking(RTPIPEINTERNAL *pThis)
393{
394 if (!pThis->fBlocking)
395 {
396 if (pThis->cUsers != 0)
397 return VERR_WRONG_ORDER;
398
399 APIRET orc = DosSetNPHState(pThis->hPipe, NP_WAIT | NP_READMODE_BYTE);
400 if (orc != NO_ERROR)
401 {
402 if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
403 return RTErrConvertFromOS2(orc);
404 pThis->fBrokenPipe = true;
405 }
406 pThis->fBlocking = true;
407 }
408
409 pThis->cUsers++;
410 return VINF_SUCCESS;
411}
412
413
414/**
415 * Prepare non-blocking mode.
416 *
417 * @returns IPRT status code.
418 * @retval VERR_WRONG_ORDER if simultaneous non-blocking and blocking access is
419 * attempted.
420 *
421 * @param pThis The pipe handle.
422 */
423static int rtPipeTryNonBlocking(RTPIPEINTERNAL *pThis)
424{
425 if (pThis->fBlocking)
426 {
427 if (pThis->cUsers != 0)
428 return VERR_WRONG_ORDER;
429
430 APIRET orc = DosSetNPHState(pThis->hPipe, NP_NOWAIT | NP_READMODE_BYTE);
431 if (orc != NO_ERROR)
432 {
433 if (orc != ERROR_BROKEN_PIPE && orc != ERROR_PIPE_NOT_CONNECTED)
434 return RTErrConvertFromOS2(orc);
435 pThis->fBrokenPipe = true;
436 }
437 pThis->fBlocking = false;
438 }
439
440 pThis->cUsers++;
441 return VINF_SUCCESS;
442}
443
444
445/**
446 * Checks if the read pipe has been broken.
447 *
448 * @returns true if broken, false if no.
449 * @param pThis The pipe handle (read).
450 */
451static bool rtPipeOs2IsBroken(RTPIPEINTERNAL *pThis)
452{
453 Assert(pThis->fRead);
454
455#if 0
456 /*
457 * Query it via the semaphore. Not sure how fast this is...
458 */
459 PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
460 APIRET orc = DosQueryNPipeSemState(pThis->hev, &aStates[0], sizeof(aStates));
461 if (orc == NO_ERROR)
462 {
463 if (aStates[0].fStatus == NPSS_CLOSE)
464 return true;
465 if (aStates[0].fStatus == NPSS_RDATA)
466 return false;
467 }
468 AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
469
470 /*
471 * Fall back / alternative method.
472 */
473#endif
474 ULONG cbActual = 0;
475 ULONG ulState = 0;
476 AVAILDATA Avail = { 0, 0 };
477 APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
478 if (orc != NO_ERROR)
479 {
480 if (orc != ERROR_PIPE_BUSY)
481 AssertMsgFailed(("%d\n", orc));
482 return false;
483 }
484
485 return ulState != NP_STATE_CONNECTED;
486}
487
488
489RTDECL(int) RTPipeRead(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
490{
491 RTPIPEINTERNAL *pThis = hPipe;
492 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
493 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
494 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
495 AssertPtr(pcbRead);
496 AssertPtr(pvBuf);
497
498 int rc = RTCritSectEnter(&pThis->CritSect);
499 if (RT_SUCCESS(rc))
500 {
501 rc = rtPipeTryNonBlocking(pThis);
502 if (RT_SUCCESS(rc))
503 {
504 RTCritSectLeave(&pThis->CritSect);
505
506 ULONG cbActual = 0;
507 APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
508 if (orc == NO_ERROR)
509 {
510 if (cbActual || !cbToRead || !rtPipeOs2IsBroken(pThis))
511 *pcbRead = cbActual;
512 else
513 rc = VERR_BROKEN_PIPE;
514 }
515 else if (orc == ERROR_NO_DATA)
516 {
517 *pcbRead = 0;
518 rc = VINF_TRY_AGAIN;
519 }
520 else
521 rc = RTErrConvertFromOS2(orc);
522
523 RTCritSectEnter(&pThis->CritSect);
524 if (rc == VERR_BROKEN_PIPE)
525 pThis->fBrokenPipe = true;
526 pThis->cUsers--;
527 }
528 else
529 rc = VERR_WRONG_ORDER;
530 RTCritSectLeave(&pThis->CritSect);
531 }
532 return rc;
533}
534
535
536RTDECL(int) RTPipeReadBlocking(RTPIPE hPipe, void *pvBuf, size_t cbToRead, size_t *pcbRead)
537{
538 RTPIPEINTERNAL *pThis = hPipe;
539 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
540 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
541 AssertReturn(pThis->fRead, VERR_ACCESS_DENIED);
542 AssertPtr(pvBuf);
543
544 int rc = RTCritSectEnter(&pThis->CritSect);
545 if (RT_SUCCESS(rc))
546 {
547 rc = rtPipeTryBlocking(pThis);
548 if (RT_SUCCESS(rc))
549 {
550 RTCritSectLeave(&pThis->CritSect);
551
552 size_t cbTotalRead = 0;
553 while (cbToRead > 0)
554 {
555 ULONG cbActual = 0;
556 APIRET orc = DosRead(pThis->hPipe, pvBuf, cbToRead, &cbActual);
557 if (orc != NO_ERROR)
558 {
559 rc = RTErrConvertFromOS2(orc);
560 break;
561 }
562 if (!cbActual && rtPipeOs2IsBroken(pThis))
563 {
564 rc = VERR_BROKEN_PIPE;
565 break;
566 }
567
568 /* advance */
569 pvBuf = (char *)pvBuf + cbActual;
570 cbTotalRead += cbActual;
571 cbToRead -= cbActual;
572 }
573
574 if (pcbRead)
575 {
576 *pcbRead = cbTotalRead;
577 if ( RT_FAILURE(rc)
578 && cbTotalRead)
579 rc = VINF_SUCCESS;
580 }
581
582 RTCritSectEnter(&pThis->CritSect);
583 if (rc == VERR_BROKEN_PIPE)
584 pThis->fBrokenPipe = true;
585 pThis->cUsers--;
586 }
587 else
588 rc = VERR_WRONG_ORDER;
589 RTCritSectLeave(&pThis->CritSect);
590 }
591 return rc;
592}
593
594
595/**
596 * Gets the available write buffer size of the pipe.
597 *
598 * @returns Number of bytes, 1 on failure.
599 * @param pThis The pipe handle.
600 */
601static ULONG rtPipeOs2GetSpace(RTPIPEINTERNAL *pThis)
602{
603 Assert(!pThis->fRead);
604
605#if 0 /* Not sure which is more efficient, neither are really optimal, I fear. */
606 /*
607 * Query via semaphore state.
608 * This will walk the list of active named pipes...
609 */
610 /** @todo Check how hev and hpipe are associated, if complicated, use the
611 * alternative method below. */
612 PIPESEMSTATE aStates[3]; RT_ZERO(aStates);
613 APIRET orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
614 if (orc == NO_ERROR)
615 {
616 if (aStates[0].fStatus == NPSS_WSPACE)
617 return aStates[0].usAvail;
618 if (aStates[1].fStatus == NPSS_WSPACE)
619 return aStates[1].usAvail;
620 return 0;
621 }
622 AssertMsgFailed(("%d / %d\n", orc, aStates[0].fStatus));
623
624#else
625 /*
626 * Query via the pipe info.
627 * This will have to lookup and store the pipe name.
628 */
629 union
630 {
631 PIPEINFO PipeInfo;
632 uint8_t abPadding[sizeof(PIPEINFO) + 127];
633 } Buf;
634 APIRET orc = DosQueryNPipeInfo(pThis->hPipe, 1, &Buf, sizeof(Buf));
635 if (orc == NO_ERROR)
636 return Buf.PipeInfo.cbOut;
637 AssertMsgFailed(("%d\n", orc));
638#endif
639
640 return 1;
641}
642
643
644RTDECL(int) RTPipeWrite(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
645{
646 RTPIPEINTERNAL *pThis = hPipe;
647 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
648 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
649 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
650 AssertPtr(pcbWritten);
651 AssertPtr(pvBuf);
652
653 int rc = RTCritSectEnter(&pThis->CritSect);
654 if (RT_SUCCESS(rc))
655 {
656 rc = rtPipeTryNonBlocking(pThis);
657 if (RT_SUCCESS(rc))
658 {
659 if (cbToWrite > 0)
660 {
661 ULONG cbActual = 0;
662 APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
663 if (orc == NO_ERROR && cbActual == 0)
664 {
665 /* Retry with the request adjusted to the available buffer space. */
666 ULONG cbAvail = rtPipeOs2GetSpace(pThis);
667 orc = DosWrite(pThis->hPipe, pvBuf, RT_MIN(cbAvail, cbToWrite), &cbActual);
668 }
669
670 if (orc == NO_ERROR)
671 {
672 *pcbWritten = cbActual;
673 if (cbActual == 0)
674 rc = VINF_TRY_AGAIN;
675 }
676 else
677 {
678 rc = RTErrConvertFromOS2(orc);
679 if (rc == VERR_PIPE_NOT_CONNECTED)
680 rc = VERR_BROKEN_PIPE;
681 }
682 }
683 else
684 *pcbWritten = 0;
685
686 if (rc == VERR_BROKEN_PIPE)
687 pThis->fBrokenPipe = true;
688 pThis->cUsers--;
689 }
690 else
691 rc = VERR_WRONG_ORDER;
692 RTCritSectLeave(&pThis->CritSect);
693 }
694 return rc;
695}
696
697
698RTDECL(int) RTPipeWriteBlocking(RTPIPE hPipe, const void *pvBuf, size_t cbToWrite, size_t *pcbWritten)
699{
700 RTPIPEINTERNAL *pThis = hPipe;
701 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
702 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
703 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
704 AssertPtr(pvBuf);
705 AssertPtrNull(pcbWritten);
706
707 int rc = RTCritSectEnter(&pThis->CritSect);
708 if (RT_SUCCESS(rc))
709 {
710 rc = rtPipeTryBlocking(pThis);
711 if (RT_SUCCESS(rc))
712 {
713 RTCritSectLeave(&pThis->CritSect);
714
715 size_t cbTotalWritten = 0;
716 while (cbToWrite > 0)
717 {
718 ULONG cbActual = 0;
719 APIRET orc = DosWrite(pThis->hPipe, pvBuf, cbToWrite, &cbActual);
720 if (orc != NO_ERROR)
721 {
722 rc = RTErrConvertFromOS2(orc);
723 if (rc == VERR_PIPE_NOT_CONNECTED)
724 rc = VERR_BROKEN_PIPE;
725 break;
726 }
727 pvBuf = (char const *)pvBuf + cbActual;
728 cbToWrite -= cbActual;
729 cbTotalWritten += cbActual;
730 }
731
732 if (pcbWritten)
733 {
734 *pcbWritten = cbTotalWritten;
735 if ( RT_FAILURE(rc)
736 && cbTotalWritten)
737 rc = VINF_SUCCESS;
738 }
739
740 RTCritSectEnter(&pThis->CritSect);
741 if (rc == VERR_BROKEN_PIPE)
742 pThis->fBrokenPipe = true;
743 pThis->cUsers--;
744 }
745 else
746 rc = VERR_WRONG_ORDER;
747 RTCritSectLeave(&pThis->CritSect);
748 }
749 return rc;
750}
751
752
753RTDECL(int) RTPipeFlush(RTPIPE hPipe)
754{
755 RTPIPEINTERNAL *pThis = hPipe;
756 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
757 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
758 AssertReturn(!pThis->fRead, VERR_ACCESS_DENIED);
759
760 APIRET orc = DosResetBuffer(pThis->hPipe);
761 if (orc != NO_ERROR)
762 {
763 int rc = RTErrConvertFromOS2(orc);
764 if (rc == VERR_BROKEN_PIPE)
765 {
766 RTCritSectEnter(&pThis->CritSect);
767 pThis->fBrokenPipe = true;
768 RTCritSectLeave(&pThis->CritSect);
769 }
770 return rc;
771 }
772 return VINF_SUCCESS;
773}
774
775
776RTDECL(int) RTPipeSelectOne(RTPIPE hPipe, RTMSINTERVAL cMillies)
777{
778 RTPIPEINTERNAL *pThis = hPipe;
779 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
780 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
781
782 uint64_t const StartMsTS = RTTimeMilliTS();
783
784 int rc = RTCritSectEnter(&pThis->CritSect);
785 if (RT_FAILURE(rc))
786 return rc;
787
788 rc = rtPipeOs2EnsureSem(pThis);
789 if (RT_SUCCESS(rc) && cMillies > 0)
790 {
791 /* Stop polling attempts if we might block. */
792 if (pThis->hPollSet == NIL_RTPOLLSET)
793 pThis->hPollSet = (RTPOLLSET)(uintptr_t)0xbeef0042;
794 else
795 rc = VERR_WRONG_ORDER;
796 }
797 if (RT_SUCCESS(rc))
798 {
799 for (unsigned iLoop = 0;; iLoop++)
800 {
801 /*
802 * Check the handle state.
803 */
804 APIRET orc;
805 if (cMillies > 0)
806 {
807 ULONG ulIgnore;
808 orc = DosResetEventSem(pThis->hev, &ulIgnore);
809 AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc));
810 }
811
812 PIPESEMSTATE aStates[4]; RT_ZERO(aStates);
813 orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
814 if (orc != NO_ERROR)
815 {
816 rc = RTErrConvertFromOS2(orc);
817 break;
818 }
819 int i = 0;
820 if (pThis->fRead)
821 while (aStates[i].fStatus == NPSS_WSPACE)
822 i++;
823 else
824 while (aStates[i].fStatus == NPSS_RDATA)
825 i++;
826 if (aStates[i].fStatus == NPSS_CLOSE)
827 break;
828 Assert(aStates[i].fStatus == NPSS_WSPACE || aStates[i].fStatus == NPSS_RDATA || aStates[i].fStatus == NPSS_EOI);
829 if ( aStates[i].fStatus != NPSS_EOI
830 && aStates[i].usAvail > 0)
831 break;
832
833 /*
834 * Check for timeout.
835 */
836 ULONG cMsMaxWait = SEM_INDEFINITE_WAIT;
837 if (cMillies != RT_INDEFINITE_WAIT)
838 {
839 uint64_t cElapsed = RTTimeMilliTS() - StartMsTS;
840 if (cElapsed >= cMillies)
841 {
842 rc = VERR_TIMEOUT;
843 break;
844 }
845 cMsMaxWait = cMillies - (uint32_t)cElapsed;
846 }
847
848 /*
849 * Wait.
850 */
851 RTCritSectLeave(&pThis->CritSect);
852 orc = DosWaitEventSem(pThis->hev, cMsMaxWait);
853 RTCritSectEnter(&pThis->CritSect);
854 if (orc != NO_ERROR && orc != ERROR_TIMEOUT && orc != ERROR_SEM_TIMEOUT )
855 {
856 rc = RTErrConvertFromOS2(orc);
857 break;
858 }
859 }
860
861 if (rc == VERR_BROKEN_PIPE)
862 pThis->fBrokenPipe = true;
863 if (cMillies > 0)
864 pThis->hPollSet = NIL_RTPOLLSET;
865 }
866
867 RTCritSectLeave(&pThis->CritSect);
868 return rc;
869}
870
871
872RTDECL(int) RTPipeQueryReadable(RTPIPE hPipe, size_t *pcbReadable)
873{
874 RTPIPEINTERNAL *pThis = hPipe;
875 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
876 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
877 AssertReturn(pThis->fRead, VERR_PIPE_NOT_READ);
878 AssertPtrReturn(pcbReadable, VERR_INVALID_POINTER);
879
880 int rc = RTCritSectEnter(&pThis->CritSect);
881 if (RT_FAILURE(rc))
882 return rc;
883
884 ULONG cbActual = 0;
885 ULONG ulState = 0;
886 AVAILDATA Avail = { 0, 0 };
887 APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
888 if (orc == NO_ERROR)
889 {
890 if (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED)
891 *pcbReadable = Avail.cbpipe;
892 else
893 rc = VERR_PIPE_NOT_CONNECTED; /*??*/
894 }
895 else
896 rc = RTErrConvertFromOS2(orc);
897
898 RTCritSectLeave(&pThis->CritSect);
899 return rc;
900}
901
902
903RTDECL(int) RTPipeQueryInfo(RTPIPE hPipe, PRTFSOBJINFO pObjInfo, RTFSOBJATTRADD enmAddAttr)
904{
905 RTPIPEINTERNAL *pThis = hPipe;
906 AssertPtrReturn(pThis, 0);
907 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0);
908
909 int rc = RTCritSectEnter(&pThis->CritSect);
910 AssertRCReturn(rc, 0);
911
912 rtPipeFakeQueryInfo(pObjInfo, enmAddAttr, pThis->fRead);
913
914 if (pThis->fRead)
915 {
916 ULONG cbActual = 0;
917 ULONG ulState = 0;
918 AVAILDATA Avail = { 0, 0 };
919 APIRET orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
920 if (orc == NO_ERROR && (Avail.cbpipe > 0 || ulState == NP_STATE_CONNECTED))
921 pObjInfo->cbObject = Avail.cbpipe;
922 }
923 else
924 pObjInfo->cbObject = rtPipeOs2GetSpace(pThis);
925 pObjInfo->cbAllocated = RTPIPE_OS2_SIZE; /** @todo this isn't necessarily true if we didn't create it... but, whatever */
926
927 RTCritSectLeave(&pThis->CritSect);
928 return VINF_SUCCESS;
929}
930
931
932int rtPipePollGetHandle(RTPIPE hPipe, uint32_t fEvents, PRTHCINTPTR phNative)
933{
934 RTPIPEINTERNAL *pThis = hPipe;
935 AssertPtrReturn(pThis, VERR_INVALID_HANDLE);
936 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, VERR_INVALID_HANDLE);
937
938 AssertReturn(!(fEvents & RTPOLL_EVT_READ) || pThis->fRead, VERR_INVALID_PARAMETER);
939 AssertReturn(!(fEvents & RTPOLL_EVT_WRITE) || !pThis->fRead, VERR_INVALID_PARAMETER);
940
941 int rc = RTCritSectEnter(&pThis->CritSect);
942 if (RT_SUCCESS(rc))
943 {
944 rc = rtPipeOs2EnsureSem(pThis);
945 if (RT_SUCCESS(rc))
946 *phNative = (RTHCINTPTR)pThis->hev;
947 RTCritSectLeave(&pThis->CritSect);
948 }
949 return rc;
950}
951
952
953/**
954 * Checks for pending events.
955 *
956 * @returns Event mask or 0.
957 * @param pThis The pipe handle.
958 * @param fEvents The desired events.
959 * @param fResetEvtSem Whether to reset the event semaphore.
960 */
961static uint32_t rtPipePollCheck(RTPIPEINTERNAL *pThis, uint32_t fEvents, bool fResetEvtSem)
962{
963 /*
964 * Reset the event semaphore if we're gonna wait.
965 */
966 APIRET orc;
967 ULONG ulIgnore;
968 if (fResetEvtSem)
969 {
970 orc = DosResetEventSem(pThis->hev, &ulIgnore);
971 AssertMsg(orc == NO_ERROR || orc == ERROR_ALREADY_RESET, ("%d\n", orc));
972 }
973
974 /*
975 * Check for events.
976 */
977 uint32_t fRetEvents = 0;
978 if (pThis->fBrokenPipe)
979 fRetEvents |= RTPOLL_EVT_ERROR;
980 else if (pThis->fRead)
981 {
982 ULONG cbActual = 0;
983 ULONG ulState = 0;
984 AVAILDATA Avail = { 0, 0 };
985 orc = DosPeekNPipe(pThis->hPipe, NULL, 0, &cbActual, &Avail, &ulState);
986 if (orc != NO_ERROR)
987 {
988 fRetEvents |= RTPOLL_EVT_ERROR;
989 if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED)
990 pThis->fBrokenPipe = true;
991 }
992 else if (Avail.cbpipe > 0)
993 fRetEvents |= RTPOLL_EVT_READ;
994 else if (ulState != NP_STATE_CONNECTED)
995 {
996 fRetEvents |= RTPOLL_EVT_ERROR;
997 pThis->fBrokenPipe = true;
998 }
999 }
1000 else
1001 {
1002 PIPESEMSTATE aStates[4]; RT_ZERO(aStates);
1003 orc = DosQueryNPipeSemState((HSEM)pThis->hev, &aStates[0], sizeof(aStates));
1004 if (orc == NO_ERROR)
1005 {
1006 int i = 0;
1007 while (aStates[i].fStatus == NPSS_RDATA)
1008 i++;
1009 if (aStates[i].fStatus == NPSS_CLOSE)
1010 {
1011 fRetEvents |= RTPOLL_EVT_ERROR;
1012 pThis->fBrokenPipe = true;
1013 }
1014 else if ( aStates[i].fStatus == NPSS_WSPACE
1015 && aStates[i].usAvail > 0)
1016 fRetEvents |= RTPOLL_EVT_WRITE;
1017 }
1018 else
1019 {
1020 fRetEvents |= RTPOLL_EVT_ERROR;
1021 if (orc == ERROR_BROKEN_PIPE || orc == ERROR_PIPE_NOT_CONNECTED)
1022 pThis->fBrokenPipe = true;
1023 }
1024 }
1025
1026 return fRetEvents & (fEvents | RTPOLL_EVT_ERROR);
1027}
1028
1029
1030uint32_t rtPipePollStart(RTPIPE hPipe, RTPOLLSET hPollSet, uint32_t fEvents, bool fFinalEntry, bool fNoWait)
1031{
1032 RTPIPEINTERNAL *pThis = hPipe;
1033 AssertPtrReturn(pThis, UINT32_MAX);
1034 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, UINT32_MAX);
1035
1036 int rc = RTCritSectEnter(&pThis->CritSect);
1037 AssertRCReturn(rc, UINT32_MAX);
1038
1039 /* Check that this is the only current use of this pipe. */
1040 uint32_t fRetEvents;
1041 if ( pThis->cUsers == 0
1042 || pThis->hPollSet == NIL_RTPOLLSET)
1043 {
1044 fRetEvents = rtPipePollCheck(pThis, fEvents, fNoWait);
1045 if (!fRetEvents && !fNoWait)
1046 {
1047 /* Mark the set busy while waiting. */
1048 pThis->cUsers++;
1049 pThis->hPollSet = hPollSet;
1050 }
1051 }
1052 else
1053 {
1054 AssertFailed();
1055 fRetEvents = UINT32_MAX;
1056 }
1057
1058 RTCritSectLeave(&pThis->CritSect);
1059 return fRetEvents;
1060}
1061
1062
1063uint32_t rtPipePollDone(RTPIPE hPipe, uint32_t fEvents, bool fFinalEntry, bool fHarvestEvents)
1064{
1065 RTPIPEINTERNAL *pThis = hPipe;
1066 AssertPtrReturn(pThis, 0);
1067 AssertReturn(pThis->u32Magic == RTPIPE_MAGIC, 0);
1068
1069 int rc = RTCritSectEnter(&pThis->CritSect);
1070 AssertRCReturn(rc, 0);
1071
1072 Assert(pThis->cUsers > 0);
1073
1074 /* harvest events. */
1075 uint32_t fRetEvents = rtPipePollCheck(pThis, fEvents, false);
1076
1077 /* update counters. */
1078 pThis->cUsers--;
1079 pThis->hPollSet = NIL_RTPOLLSET;
1080
1081 RTCritSectLeave(&pThis->CritSect);
1082 return fRetEvents;
1083}
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette