VirtualBox

source: vbox/trunk/src/libs/xpcom18a4/xpcom/io/nsPipe3.cpp@ 4837

Last change on this file since 4837 was 1, checked in by vboxsync, 54 years ago

import

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 36.4 KB
Line 
1/* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3 *
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
8 *
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
12 * License.
13 *
14 * The Original Code is Mozilla.
15 *
16 * The Initial Developer of the Original Code is
17 * Netscape Communications Corporation.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
20 *
21 * Contributor(s):
22 * Darin Fisher <darin@netscape.com>
23 *
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
35 *
36 * ***** END LICENSE BLOCK ***** */
37
38#include "nsIPipe.h"
39#include "nsIEventTarget.h"
40#include "nsISeekableStream.h"
41#include "nsSegmentedBuffer.h"
42#include "nsStreamUtils.h"
43#include "nsAutoLock.h"
44#include "nsCOMPtr.h"
45#include "nsCRT.h"
46#include "prlog.h"
47#include "nsInt64.h"
48
49#if defined(PR_LOGGING)
50//
51// set NSPR_LOG_MODULES=nsPipe:5
52//
53static PRLogModuleInfo *gPipeLog = nsnull;
54#define LOG(args) PR_LOG(gPipeLog, PR_LOG_DEBUG, args)
55#else
56#define LOG(args)
57#endif
58
59#define DEFAULT_SEGMENT_SIZE 4096
60#define DEFAULT_SEGMENT_COUNT 16
61
62class nsPipe;
63class nsPipeEvents;
64class nsPipeInputStream;
65class nsPipeOutputStream;
66
67//-----------------------------------------------------------------------------
68
69// this class is used to delay notifications until the end of a particular
70// scope. it helps avoid the complexity of issuing callbacks while inside
71// a critical section.
72class nsPipeEvents
73{
74public:
75 nsPipeEvents() { }
76 ~nsPipeEvents();
77
78 inline void NotifyInputReady(nsIAsyncInputStream *stream,
79 nsIInputStreamCallback *callback)
80 {
81 NS_ASSERTION(!mInputCallback, "already have an input event");
82 mInputStream = stream;
83 mInputCallback = callback;
84 }
85
86 inline void NotifyOutputReady(nsIAsyncOutputStream *stream,
87 nsIOutputStreamCallback *callback)
88 {
89 NS_ASSERTION(!mOutputCallback, "already have an output event");
90 mOutputStream = stream;
91 mOutputCallback = callback;
92 }
93
94private:
95 nsCOMPtr<nsIAsyncInputStream> mInputStream;
96 nsCOMPtr<nsIInputStreamCallback> mInputCallback;
97 nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
98 nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
99};
100
101//-----------------------------------------------------------------------------
102
103// the input end of a pipe (allocated as a member of the pipe).
104class nsPipeInputStream : public nsIAsyncInputStream
105 , public nsISeekableStream
106 , public nsISearchableInputStream
107{
108public:
109 // since this class will be allocated as a member of the pipe, we do not
110 // need our own ref count. instead, we share the lifetime (the ref count)
111 // of the entire pipe. this macro is just convenience since it does not
112 // declare a mRefCount variable; however, don't let the name fool you...
113 // we are not inheriting from nsPipe ;-)
114 NS_DECL_ISUPPORTS_INHERITED
115
116 NS_DECL_NSIINPUTSTREAM
117 NS_DECL_NSIASYNCINPUTSTREAM
118 NS_DECL_NSISEEKABLESTREAM
119 NS_DECL_NSISEARCHABLEINPUTSTREAM
120
121 nsPipeInputStream(nsPipe *pipe)
122 : mPipe(pipe)
123 , mReaderRefCnt(0)
124 , mLogicalOffset(0)
125 , mBlocking(PR_TRUE)
126 , mBlocked(PR_FALSE)
127 , mAvailable(0)
128 , mCallbackFlags(0)
129 { }
130
131 nsresult Fill();
132 void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
133
134 PRUint32 Available() { return mAvailable; }
135 void ReduceAvailable(PRUint32 avail) { mAvailable -= avail; }
136
137 // synchronously wait for the pipe to become readable.
138 nsresult Wait();
139
140 // these functions return true to indicate that the pipe's monitor should
141 // be notified, to wake up a blocked reader if any.
142 PRBool OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &);
143 PRBool OnInputException(nsresult, nsPipeEvents &);
144
145private:
146 nsPipe *mPipe;
147
148 // separate refcnt so that we know when to close the consumer
149 nsrefcnt mReaderRefCnt;
150 nsInt64 mLogicalOffset;
151 PRPackedBool mBlocking;
152
153 // these variables can only be accessed while inside the pipe's monitor
154 PRPackedBool mBlocked;
155 PRUint32 mAvailable;
156 nsCOMPtr<nsIInputStreamCallback> mCallback;
157 PRUint32 mCallbackFlags;
158};
159
160//-----------------------------------------------------------------------------
161
162// the output end of a pipe (allocated as a member of the pipe).
163class nsPipeOutputStream : public nsIAsyncOutputStream
164 , public nsISeekableStream
165{
166public:
167 // since this class will be allocated as a member of the pipe, we do not
168 // need our own ref count. instead, we share the lifetime (the ref count)
169 // of the entire pipe. this macro is just convenience since it does not
170 // declare a mRefCount variable; however, don't let the name fool you...
171 // we are not inheriting from nsPipe ;-)
172 NS_DECL_ISUPPORTS_INHERITED
173
174 NS_DECL_NSIOUTPUTSTREAM
175 NS_DECL_NSIASYNCOUTPUTSTREAM
176 NS_DECL_NSISEEKABLESTREAM
177
178 nsPipeOutputStream(nsPipe *pipe)
179 : mPipe(pipe)
180 , mWriterRefCnt(0)
181 , mLogicalOffset(0)
182 , mBlocking(PR_TRUE)
183 , mBlocked(PR_FALSE)
184 , mWritable(PR_TRUE)
185 , mCallbackFlags(0)
186 { }
187
188 void SetNonBlocking(PRBool aNonBlocking) { mBlocking = !aNonBlocking; }
189 void SetWritable(PRBool writable) { mWritable = writable; }
190
191 // synchronously wait for the pipe to become writable.
192 nsresult Wait();
193
194 // these functions return true to indicate that the pipe's monitor should
195 // be notified, to wake up a blocked writer if any.
196 PRBool OnOutputWritable(nsPipeEvents &);
197 PRBool OnOutputException(nsresult, nsPipeEvents &);
198
199private:
200 nsPipe *mPipe;
201
202 // separate refcnt so that we know when to close the producer
203 nsrefcnt mWriterRefCnt;
204 nsInt64 mLogicalOffset;
205 PRPackedBool mBlocking;
206
207 // these variables can only be accessed while inside the pipe's monitor
208 PRPackedBool mBlocked;
209 PRPackedBool mWritable;
210 nsCOMPtr<nsIOutputStreamCallback> mCallback;
211 PRUint32 mCallbackFlags;
212};
213
214//-----------------------------------------------------------------------------
215
216class nsPipe : public nsIPipe
217{
218public:
219 friend class nsPipeInputStream;
220 friend class nsPipeOutputStream;
221
222 NS_DECL_ISUPPORTS
223 NS_DECL_NSIPIPE
224
225 // nsPipe methods:
226 nsPipe();
227
228private:
229 ~nsPipe();
230
231public:
232 //
233 // methods below may only be called while inside the pipe's monitor
234 //
235
236 void PeekSegment(PRUint32 n, char *&cursor, char *&limit);
237
238 //
239 // methods below may be called while outside the pipe's monitor
240 //
241
242 nsresult GetReadSegment(const char *&segment, PRUint32 &segmentLen);
243 void AdvanceReadCursor(PRUint32 count);
244
245 nsresult GetWriteSegment(char *&segment, PRUint32 &segmentLen);
246 void AdvanceWriteCursor(PRUint32 count);
247
248 void OnPipeException(nsresult reason, PRBool outputOnly = PR_FALSE);
249
250protected:
251 // We can't inherit from both nsIInputStream and nsIOutputStream
252 // because they collide on their Close method. Consequently we nest their
253 // implementations to avoid the extra object allocation.
254 nsPipeInputStream mInput;
255 nsPipeOutputStream mOutput;
256
257 PRMonitor* mMonitor;
258 nsSegmentedBuffer mBuffer;
259
260 char* mReadCursor;
261 char* mReadLimit;
262
263 PRInt32 mWriteSegment;
264 char* mWriteCursor;
265 char* mWriteLimit;
266
267 nsresult mStatus;
268};
269
270//
271// NOTES on buffer architecture:
272//
273// +-----------------+ - - mBuffer.GetSegment(0)
274// | |
275// + - - - - - - - - + - - mReadCursor
276// |/////////////////|
277// |/////////////////|
278// |/////////////////|
279// |/////////////////|
280// +-----------------+ - - mReadLimit
281// |
282// +-----------------+
283// |/////////////////|
284// |/////////////////|
285// |/////////////////|
286// |/////////////////|
287// |/////////////////|
288// |/////////////////|
289// +-----------------+
290// |
291// +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
292// |/////////////////|
293// |/////////////////|
294// |/////////////////|
295// + - - - - - - - - + - - mWriteCursor
296// | |
297// | |
298// +-----------------+ - - mWriteLimit
299//
300// (shaded region contains data)
301//
302// NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
303// small allocations (e.g., 64 byte allocations). this means that buffers may
304// be allocated back-to-back. in the diagram above, for example, mReadLimit
305// would actually be pointing at the beginning of the next segment. when
306// making changes to this file, please keep this fact in mind.
307//
308
309//-----------------------------------------------------------------------------
310// nsPipe methods:
311//-----------------------------------------------------------------------------
312
313nsPipe::nsPipe()
314 : mInput(this)
315 , mOutput(this)
316 , mMonitor(nsnull)
317 , mReadCursor(nsnull)
318 , mReadLimit(nsnull)
319 , mWriteSegment(-1)
320 , mWriteCursor(nsnull)
321 , mWriteLimit(nsnull)
322 , mStatus(NS_OK)
323{
324}
325
326nsPipe::~nsPipe()
327{
328 if (mMonitor)
329 PR_DestroyMonitor(mMonitor);
330}
331
332NS_IMPL_THREADSAFE_ISUPPORTS1(nsPipe, nsIPipe)
333
334NS_IMETHODIMP
335nsPipe::Init(PRBool nonBlockingIn,
336 PRBool nonBlockingOut,
337 PRUint32 segmentSize,
338 PRUint32 segmentCount,
339 nsIMemory *segmentAlloc)
340{
341 mMonitor = PR_NewMonitor();
342 if (!mMonitor)
343 return NS_ERROR_OUT_OF_MEMORY;
344
345 if (segmentSize == 0)
346 segmentSize = DEFAULT_SEGMENT_SIZE;
347 if (segmentCount == 0)
348 segmentCount = DEFAULT_SEGMENT_COUNT;
349
350 // protect against overflow
351 PRUint32 maxCount = PRUint32(-1) / segmentSize;
352 if (segmentCount > maxCount)
353 segmentCount = maxCount;
354
355 nsresult rv = mBuffer.Init(segmentSize, segmentSize * segmentCount, segmentAlloc);
356 if (NS_FAILED(rv))
357 return rv;
358
359 mInput.SetNonBlocking(nonBlockingIn);
360 mOutput.SetNonBlocking(nonBlockingOut);
361 return NS_OK;
362}
363
364NS_IMETHODIMP
365nsPipe::GetInputStream(nsIAsyncInputStream **aInputStream)
366{
367 NS_ADDREF(*aInputStream = &mInput);
368 return NS_OK;
369}
370
371NS_IMETHODIMP
372nsPipe::GetOutputStream(nsIAsyncOutputStream **aOutputStream)
373{
374 NS_ADDREF(*aOutputStream = &mOutput);
375 return NS_OK;
376}
377
378void
379nsPipe::PeekSegment(PRUint32 index, char *&cursor, char *&limit)
380{
381 if (index == 0) {
382 NS_ASSERTION(!mReadCursor || mBuffer.GetSegmentCount(), "unexpected state");
383 cursor = mReadCursor;
384 limit = mReadLimit;
385 }
386 else {
387 PRUint32 numSegments = mBuffer.GetSegmentCount();
388 if (index >= numSegments)
389 cursor = limit = nsnull;
390 else {
391 cursor = mBuffer.GetSegment(index);
392 if (mWriteSegment == (PRInt32) index)
393 limit = mWriteCursor;
394 else
395 limit = cursor + mBuffer.GetSegmentSize();
396 }
397 }
398}
399
400nsresult
401nsPipe::GetReadSegment(const char *&segment, PRUint32 &segmentLen)
402{
403 nsAutoMonitor mon(mMonitor);
404
405 if (mReadCursor == mReadLimit)
406 return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
407
408 segment = mReadCursor;
409 segmentLen = mReadLimit - mReadCursor;
410 return NS_OK;
411}
412
413void
414nsPipe::AdvanceReadCursor(PRUint32 bytesRead)
415{
416 NS_ASSERTION(bytesRead, "dont call if no bytes read");
417
418 nsPipeEvents events;
419 {
420 nsAutoMonitor mon(mMonitor);
421
422 LOG(("III advancing read cursor by %u\n", bytesRead));
423 NS_ASSERTION(bytesRead <= mBuffer.GetSegmentSize(), "read too much");
424
425 mReadCursor += bytesRead;
426 NS_ASSERTION(mReadCursor <= mReadLimit, "read cursor exceeds limit");
427
428 mInput.ReduceAvailable(bytesRead);
429
430 if (mReadCursor == mReadLimit) {
431 // we've reached the limit of how much we can read from this segment.
432 // if at the end of this segment, then we must discard this segment.
433
434 // if still writing in this segment then bail because we're not done
435 // with the segment and have to wait for now...
436 if (mWriteSegment == 0 && mWriteLimit > mWriteCursor) {
437 NS_ASSERTION(mReadLimit == mWriteCursor, "unexpected state");
438 return;
439 }
440
441 // shift write segment index (-1 indicates an empty buffer).
442 --mWriteSegment;
443
444 // done with this segment
445 mBuffer.DeleteFirstSegment();
446 LOG(("III deleting first segment\n"));
447
448 if (mWriteSegment == -1) {
449 // buffer is completely empty
450 mReadCursor = nsnull;
451 mReadLimit = nsnull;
452 mWriteCursor = nsnull;
453 mWriteLimit = nsnull;
454 }
455 else {
456 // advance read cursor and limit to next buffer segment
457 mReadCursor = mBuffer.GetSegment(0);
458 if (mWriteSegment == 0)
459 mReadLimit = mWriteCursor;
460 else
461 mReadLimit = mReadCursor + mBuffer.GetSegmentSize();
462 }
463
464 // we've free'd up a segment, so notify output stream that pipe has
465 // room for a new segment.
466 if (mOutput.OnOutputWritable(events))
467 mon.Notify();
468 }
469 }
470}
471
472nsresult
473nsPipe::GetWriteSegment(char *&segment, PRUint32 &segmentLen)
474{
475 nsAutoMonitor mon(mMonitor);
476
477 if (NS_FAILED(mStatus))
478 return mStatus;
479
480 // write cursor and limit may both be null indicating an empty buffer.
481 if (mWriteCursor == mWriteLimit) {
482 char *seg = mBuffer.AppendNewSegment();
483 // pipe is full
484 if (seg == nsnull)
485 return NS_BASE_STREAM_WOULD_BLOCK;
486 LOG(("OOO appended new segment\n"));
487 mWriteCursor = seg;
488 mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
489 ++mWriteSegment;
490 }
491
492 // make sure read cursor is initialized
493 if (mReadCursor == nsnull) {
494 NS_ASSERTION(mWriteSegment == 0, "unexpected null read cursor");
495 mReadCursor = mReadLimit = mWriteCursor;
496 }
497
498 // check to see if we can roll-back our read and write cursors to the
499 // beginning of the current/first segment. this is purely an optimization.
500 if (mReadCursor == mWriteCursor && mWriteSegment == 0) {
501 char *head = mBuffer.GetSegment(0);
502 LOG(("OOO rolling back write cursor %u bytes\n", mWriteCursor - head));
503 mWriteCursor = mReadCursor = mReadLimit = head;
504 }
505
506 segment = mWriteCursor;
507 segmentLen = mWriteLimit - mWriteCursor;
508 return NS_OK;
509}
510
511void
512nsPipe::AdvanceWriteCursor(PRUint32 bytesWritten)
513{
514 NS_ASSERTION(bytesWritten, "dont call if no bytes written");
515
516 nsPipeEvents events;
517 {
518 nsAutoMonitor mon(mMonitor);
519
520 LOG(("OOO advancing write cursor by %u\n", bytesWritten));
521
522 char *newWriteCursor = mWriteCursor + bytesWritten;
523 NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
524
525 // update read limit if reading in the same segment
526 if (mWriteSegment == 0 && mReadLimit == mWriteCursor)
527 mReadLimit = newWriteCursor;
528
529 mWriteCursor = newWriteCursor;
530
531 NS_ASSERTION(mReadCursor != mWriteCursor, "read cursor is bad");
532
533 // update the writable flag on the output stream
534 if (mWriteCursor == mWriteLimit) {
535 if (mBuffer.GetSize() >= mBuffer.GetMaxSize())
536 mOutput.SetWritable(PR_FALSE);
537 }
538
539 // notify input stream that pipe now contains additional data
540 if (mInput.OnInputReadable(bytesWritten, events))
541 mon.Notify();
542 }
543}
544
545void
546nsPipe::OnPipeException(nsresult reason, PRBool outputOnly)
547{
548 LOG(("PPP nsPipe::OnPipeException [reason=%x output-only=%d]\n",
549 reason, outputOnly));
550
551 nsPipeEvents events;
552 {
553 nsAutoMonitor mon(mMonitor);
554
555 // if we've already hit an exception, then ignore this one.
556 if (NS_FAILED(mStatus))
557 return;
558
559 mStatus = reason;
560
561 // an output-only exception applies to the input end if the pipe has
562 // zero bytes available.
563 if (outputOnly && !mInput.Available())
564 outputOnly = PR_FALSE;
565
566 if (!outputOnly)
567 if (mInput.OnInputException(reason, events))
568 mon.Notify();
569
570 if (mOutput.OnOutputException(reason, events))
571 mon.Notify();
572 }
573}
574
575//-----------------------------------------------------------------------------
576// nsPipeEvents methods:
577//-----------------------------------------------------------------------------
578
579nsPipeEvents::~nsPipeEvents()
580{
581 // dispatch any pending events
582
583 if (mInputCallback) {
584 mInputCallback->OnInputStreamReady(mInputStream);
585 mInputCallback = 0;
586 mInputStream = 0;
587 }
588 if (mOutputCallback) {
589 mOutputCallback->OnOutputStreamReady(mOutputStream);
590 mOutputCallback = 0;
591 mOutputStream = 0;
592 }
593}
594
595//-----------------------------------------------------------------------------
596// nsPipeInputStream methods:
597//-----------------------------------------------------------------------------
598
599nsresult
600nsPipeInputStream::Wait()
601{
602 NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
603
604 nsAutoMonitor mon(mPipe->mMonitor);
605
606 while (NS_SUCCEEDED(mPipe->mStatus) && (mAvailable == 0)) {
607 LOG(("III pipe input: waiting for data\n"));
608
609 mBlocked = PR_TRUE;
610 mon.Wait();
611 mBlocked = PR_FALSE;
612
613 LOG(("III pipe input: woke up [pipe-status=%x available=%u]\n",
614 mPipe->mStatus, mAvailable));
615 }
616
617 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
618}
619
620PRBool
621nsPipeInputStream::OnInputReadable(PRUint32 bytesWritten, nsPipeEvents &events)
622{
623 PRBool result = PR_FALSE;
624
625 mAvailable += bytesWritten;
626
627 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
628 events.NotifyInputReady(this, mCallback);
629 mCallback = 0;
630 mCallbackFlags = 0;
631 }
632 else if (mBlocked)
633 result = PR_TRUE;
634
635 return result;
636}
637
638PRBool
639nsPipeInputStream::OnInputException(nsresult reason, nsPipeEvents &events)
640{
641 LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
642 this, reason));
643
644 PRBool result = PR_FALSE;
645
646 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
647
648 // force count of available bytes to zero.
649 mAvailable = 0;
650
651 if (mCallback) {
652 events.NotifyInputReady(this, mCallback);
653 mCallback = 0;
654 mCallbackFlags = 0;
655 }
656 else if (mBlocked)
657 result = PR_TRUE;
658
659 return result;
660}
661
662NS_IMETHODIMP_(nsrefcnt)
663nsPipeInputStream::AddRef(void)
664{
665 ++mReaderRefCnt;
666 return mPipe->AddRef();
667}
668
669NS_IMETHODIMP_(nsrefcnt)
670nsPipeInputStream::Release(void)
671{
672 if (--mReaderRefCnt == 0)
673 Close();
674 return mPipe->Release();
675}
676
677NS_IMPL_QUERY_INTERFACE4(nsPipeInputStream,
678 nsIInputStream,
679 nsIAsyncInputStream,
680 nsISeekableStream,
681 nsISearchableInputStream)
682
683NS_IMETHODIMP
684nsPipeInputStream::CloseWithStatus(nsresult reason)
685{
686 LOG(("III CloseWithStatus [this=%x reason=%x]\n", this, reason));
687
688 if (NS_SUCCEEDED(reason))
689 reason = NS_BASE_STREAM_CLOSED;
690
691 mPipe->OnPipeException(reason);
692 return NS_OK;
693}
694
695NS_IMETHODIMP
696nsPipeInputStream::Close()
697{
698 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
699}
700
701NS_IMETHODIMP
702nsPipeInputStream::Available(PRUint32 *result)
703{
704 nsAutoMonitor mon(mPipe->mMonitor);
705
706 // return error if pipe closed
707 if (!mAvailable && NS_FAILED(mPipe->mStatus))
708 return mPipe->mStatus;
709
710 *result = mAvailable;
711 return NS_OK;
712}
713
714NS_IMETHODIMP
715nsPipeInputStream::ReadSegments(nsWriteSegmentFun writer,
716 void *closure,
717 PRUint32 count,
718 PRUint32 *readCount)
719{
720 LOG(("III ReadSegments [this=%x count=%u]\n", this, count));
721
722 nsresult rv = NS_OK;
723
724 const char *segment;
725 PRUint32 segmentLen;
726
727 *readCount = 0;
728 while (count) {
729 rv = mPipe->GetReadSegment(segment, segmentLen);
730 if (NS_FAILED(rv)) {
731 // ignore this error if we've already read something.
732 if (*readCount > 0) {
733 rv = NS_OK;
734 break;
735 }
736 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
737 // pipe is empty
738 if (!mBlocking)
739 break;
740 // wait for some data to be written to the pipe
741 rv = Wait();
742 if (NS_SUCCEEDED(rv))
743 continue;
744 }
745 // ignore this error, just return.
746 if (rv == NS_BASE_STREAM_CLOSED) {
747 rv = NS_OK;
748 break;
749 }
750 mPipe->OnPipeException(rv);
751 break;
752 }
753
754 // read no more than count
755 if (segmentLen > count)
756 segmentLen = count;
757
758 PRUint32 writeCount, originalLen = segmentLen;
759 while (segmentLen) {
760 writeCount = 0;
761
762 rv = writer(this, closure, segment, *readCount, segmentLen, &writeCount);
763
764 if (NS_FAILED(rv) || writeCount == 0) {
765 count = 0;
766 // any errors returned from the writer end here: do not
767 // propogate to the caller of ReadSegments.
768 rv = NS_OK;
769 break;
770 }
771
772 NS_ASSERTION(writeCount <= segmentLen, "wrote more than expected");
773 segment += writeCount;
774 segmentLen -= writeCount;
775 count -= writeCount;
776 *readCount += writeCount;
777 mLogicalOffset += writeCount;
778 }
779
780 if (segmentLen < originalLen)
781 mPipe->AdvanceReadCursor(originalLen - segmentLen);
782 }
783
784 return rv;
785}
786
787static NS_METHOD
788nsWriteToRawBuffer(nsIInputStream* inStr,
789 void *closure,
790 const char *fromRawSegment,
791 PRUint32 offset,
792 PRUint32 count,
793 PRUint32 *writeCount)
794{
795 char *toBuf = (char*)closure;
796 memcpy(&toBuf[offset], fromRawSegment, count);
797 *writeCount = count;
798 return NS_OK;
799}
800
801NS_IMETHODIMP
802nsPipeInputStream::Read(char* toBuf, PRUint32 bufLen, PRUint32 *readCount)
803{
804 return ReadSegments(nsWriteToRawBuffer, toBuf, bufLen, readCount);
805}
806
807NS_IMETHODIMP
808nsPipeInputStream::IsNonBlocking(PRBool *aNonBlocking)
809{
810 *aNonBlocking = !mBlocking;
811 return NS_OK;
812}
813
814NS_IMETHODIMP
815nsPipeInputStream::AsyncWait(nsIInputStreamCallback *callback,
816 PRUint32 flags,
817 PRUint32 requestedCount,
818 nsIEventTarget *target)
819{
820 LOG(("III AsyncWait [this=%x]\n", this));
821
822 nsPipeEvents pipeEvents;
823 {
824 nsAutoMonitor mon(mPipe->mMonitor);
825
826 // replace a pending callback
827 mCallback = 0;
828 mCallbackFlags = 0;
829
830 nsCOMPtr<nsIInputStreamCallback> proxy;
831 if (target) {
832 nsresult rv = NS_NewInputStreamReadyEvent(getter_AddRefs(proxy),
833 callback, target);
834 if (NS_FAILED(rv)) return rv;
835 callback = proxy;
836 }
837
838 if (NS_FAILED(mPipe->mStatus) ||
839 (mAvailable && !(flags & WAIT_CLOSURE_ONLY))) {
840 // stream is already closed or readable; post event.
841 pipeEvents.NotifyInputReady(this, callback);
842 }
843 else {
844 // queue up callback object to be notified when data becomes available
845 mCallback = callback;
846 mCallbackFlags = flags;
847 }
848 }
849 return NS_OK;
850}
851
852NS_IMETHODIMP
853nsPipeInputStream::Seek(PRInt32 whence, PRInt64 offset)
854{
855 NS_NOTREACHED("nsPipeInputStream::Seek");
856 return NS_ERROR_NOT_IMPLEMENTED;
857}
858
859NS_IMETHODIMP
860nsPipeInputStream::Tell(PRInt64 *offset)
861{
862 *offset = mLogicalOffset;
863 return NS_OK;
864}
865
866NS_IMETHODIMP
867nsPipeInputStream::SetEOF()
868{
869 NS_NOTREACHED("nsPipeInputStream::SetEOF");
870 return NS_ERROR_NOT_IMPLEMENTED;
871}
872
873#define COMPARE(s1, s2, i) \
874 (ignoreCase \
875 ? nsCRT::strncasecmp((const char *)s1, (const char *)s2, (PRUint32)i) \
876 : nsCRT::strncmp((const char *)s1, (const char *)s2, (PRUint32)i))
877
878NS_IMETHODIMP
879nsPipeInputStream::Search(const char *forString,
880 PRBool ignoreCase,
881 PRBool *found,
882 PRUint32 *offsetSearchedTo)
883{
884 LOG(("III Search [for=%s ic=%u]\n", forString, ignoreCase));
885
886 nsAutoMonitor mon(mPipe->mMonitor);
887
888 char *cursor1, *limit1;
889 PRUint32 index = 0, offset = 0;
890 PRUint32 strLen = strlen(forString);
891
892 mPipe->PeekSegment(0, cursor1, limit1);
893 if (cursor1 == limit1) {
894 *found = PR_FALSE;
895 *offsetSearchedTo = 0;
896 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
897 return NS_OK;
898 }
899
900 while (PR_TRUE) {
901 PRUint32 i, len1 = limit1 - cursor1;
902
903 // check if the string is in the buffer segment
904 for (i = 0; i < len1 - strLen + 1; i++) {
905 if (COMPARE(&cursor1[i], forString, strLen) == 0) {
906 *found = PR_TRUE;
907 *offsetSearchedTo = offset + i;
908 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
909 return NS_OK;
910 }
911 }
912
913 // get the next segment
914 char *cursor2, *limit2;
915 PRUint32 len2;
916
917 index++;
918 offset += len1;
919
920 mPipe->PeekSegment(index, cursor2, limit2);
921 if (cursor2 == limit2) {
922 *found = PR_FALSE;
923 *offsetSearchedTo = offset - strLen + 1;
924 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
925 return NS_OK;
926 }
927 len2 = limit2 - cursor2;
928
929 // check if the string is straddling the next buffer segment
930 PRUint32 lim = PR_MIN(strLen, len2 + 1);
931 for (i = 0; i < lim; ++i) {
932 PRUint32 strPart1Len = strLen - i - 1;
933 PRUint32 strPart2Len = strLen - strPart1Len;
934 const char* strPart2 = &forString[strLen - strPart2Len];
935 PRUint32 bufSeg1Offset = len1 - strPart1Len;
936 if (COMPARE(&cursor1[bufSeg1Offset], forString, strPart1Len) == 0 &&
937 COMPARE(cursor2, strPart2, strPart2Len) == 0) {
938 *found = PR_TRUE;
939 *offsetSearchedTo = offset - strPart1Len;
940 LOG((" result [found=%u offset=%u]\n", *found, *offsetSearchedTo));
941 return NS_OK;
942 }
943 }
944
945 // finally continue with the next buffer
946 cursor1 = cursor2;
947 limit1 = limit2;
948 }
949
950 NS_NOTREACHED("can't get here");
951 return NS_ERROR_UNEXPECTED; // keep compiler happy
952}
953
954//-----------------------------------------------------------------------------
955// nsPipeOutputStream methods:
956//-----------------------------------------------------------------------------
957
958nsresult
959nsPipeOutputStream::Wait()
960{
961 NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
962
963 nsAutoMonitor mon(mPipe->mMonitor);
964
965 if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
966 LOG(("OOO pipe output: waiting for space\n"));
967 mBlocked = PR_TRUE;
968 mon.Wait();
969 mBlocked = PR_FALSE;
970 LOG(("OOO pipe output: woke up [pipe-status=%x writable=%u]\n",
971 mPipe->mStatus, mWritable == PR_TRUE));
972 }
973
974 return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
975}
976
977PRBool
978nsPipeOutputStream::OnOutputWritable(nsPipeEvents &events)
979{
980 PRBool result = PR_FALSE;
981
982 mWritable = PR_TRUE;
983
984 if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
985 events.NotifyOutputReady(this, mCallback);
986 mCallback = 0;
987 mCallbackFlags = 0;
988 }
989 else if (mBlocked)
990 result = PR_TRUE;
991
992 return result;
993}
994
995PRBool
996nsPipeOutputStream::OnOutputException(nsresult reason, nsPipeEvents &events)
997{
998 LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
999 this, reason));
1000
1001 nsresult result = PR_FALSE;
1002
1003 NS_ASSERTION(NS_FAILED(reason), "huh? successful exception");
1004 mWritable = PR_FALSE;
1005
1006 if (mCallback) {
1007 events.NotifyOutputReady(this, mCallback);
1008 mCallback = 0;
1009 mCallbackFlags = 0;
1010 }
1011 else if (mBlocked)
1012 result = PR_TRUE;
1013
1014 return result;
1015}
1016
1017
1018NS_IMETHODIMP_(nsrefcnt)
1019nsPipeOutputStream::AddRef()
1020{
1021 mWriterRefCnt++;
1022 return mPipe->AddRef();
1023}
1024
1025NS_IMETHODIMP_(nsrefcnt)
1026nsPipeOutputStream::Release()
1027{
1028 if (--mWriterRefCnt == 0)
1029 Close();
1030 return mPipe->Release();
1031}
1032
1033NS_IMPL_QUERY_INTERFACE2(nsPipeOutputStream,
1034 nsIOutputStream,
1035 nsIAsyncOutputStream)
1036
1037NS_IMETHODIMP
1038nsPipeOutputStream::CloseWithStatus(nsresult reason)
1039{
1040 LOG(("OOO CloseWithStatus [this=%x reason=%x]\n", this, reason));
1041
1042 if (NS_SUCCEEDED(reason))
1043 reason = NS_BASE_STREAM_CLOSED;
1044
1045 // input stream may remain open
1046 mPipe->OnPipeException(reason, PR_TRUE);
1047 return NS_OK;
1048}
1049
1050NS_IMETHODIMP
1051nsPipeOutputStream::Close()
1052{
1053 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1054}
1055
1056NS_IMETHODIMP
1057nsPipeOutputStream::WriteSegments(nsReadSegmentFun reader,
1058 void* closure,
1059 PRUint32 count,
1060 PRUint32 *writeCount)
1061{
1062 LOG(("OOO WriteSegments [this=%x count=%u]\n", this, count));
1063
1064 nsresult rv = NS_OK;
1065
1066 char *segment;
1067 PRUint32 segmentLen;
1068
1069 *writeCount = 0;
1070 while (count) {
1071 rv = mPipe->GetWriteSegment(segment, segmentLen);
1072 if (NS_FAILED(rv)) {
1073 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1074 // pipe is full
1075 if (!mBlocking) {
1076 // ignore this error if we've already written something
1077 if (*writeCount > 0)
1078 rv = NS_OK;
1079 break;
1080 }
1081 // wait for the pipe to have an empty segment.
1082 rv = Wait();
1083 if (NS_SUCCEEDED(rv))
1084 continue;
1085 }
1086 mPipe->OnPipeException(rv);
1087 break;
1088 }
1089
1090 // write no more than count
1091 if (segmentLen > count)
1092 segmentLen = count;
1093
1094 PRUint32 readCount, originalLen = segmentLen;
1095 while (segmentLen) {
1096 readCount = 0;
1097
1098 rv = reader(this, closure, segment, *writeCount, segmentLen, &readCount);
1099
1100 if (NS_FAILED(rv) || readCount == 0) {
1101 count = 0;
1102 // any errors returned from the reader end here: do not
1103 // propogate to the caller of WriteSegments.
1104 rv = NS_OK;
1105 break;
1106 }
1107
1108 NS_ASSERTION(readCount <= segmentLen, "read more than expected");
1109 segment += readCount;
1110 segmentLen -= readCount;
1111 count -= readCount;
1112 *writeCount += readCount;
1113 mLogicalOffset += readCount;
1114 }
1115
1116 if (segmentLen < originalLen)
1117 mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1118 }
1119
1120 return rv;
1121}
1122
1123static NS_METHOD
1124nsReadFromRawBuffer(nsIOutputStream* outStr,
1125 void* closure,
1126 char* toRawSegment,
1127 PRUint32 offset,
1128 PRUint32 count,
1129 PRUint32 *readCount)
1130{
1131 const char* fromBuf = (const char*)closure;
1132 memcpy(toRawSegment, &fromBuf[offset], count);
1133 *readCount = count;
1134 return NS_OK;
1135}
1136
1137NS_IMETHODIMP
1138nsPipeOutputStream::Write(const char* fromBuf,
1139 PRUint32 bufLen,
1140 PRUint32 *writeCount)
1141{
1142 return WriteSegments(nsReadFromRawBuffer, (void*)fromBuf, bufLen, writeCount);
1143}
1144
1145NS_IMETHODIMP
1146nsPipeOutputStream::Flush(void)
1147{
1148 // nothing to do
1149 return NS_OK;
1150}
1151
1152static NS_METHOD
1153nsReadFromInputStream(nsIOutputStream* outStr,
1154 void* closure,
1155 char* toRawSegment,
1156 PRUint32 offset,
1157 PRUint32 count,
1158 PRUint32 *readCount)
1159{
1160 nsIInputStream* fromStream = (nsIInputStream*)closure;
1161 return fromStream->Read(toRawSegment, count, readCount);
1162}
1163
1164NS_IMETHODIMP
1165nsPipeOutputStream::WriteFrom(nsIInputStream* fromStream,
1166 PRUint32 count,
1167 PRUint32 *writeCount)
1168{
1169 return WriteSegments(nsReadFromInputStream, fromStream, count, writeCount);
1170}
1171
1172NS_IMETHODIMP
1173nsPipeOutputStream::IsNonBlocking(PRBool *aNonBlocking)
1174{
1175 *aNonBlocking = !mBlocking;
1176 return NS_OK;
1177}
1178
1179NS_IMETHODIMP
1180nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback *callback,
1181 PRUint32 flags,
1182 PRUint32 requestedCount,
1183 nsIEventTarget *target)
1184{
1185 LOG(("OOO AsyncWait [this=%x]\n", this));
1186
1187 nsPipeEvents pipeEvents;
1188 {
1189 nsAutoMonitor mon(mPipe->mMonitor);
1190
1191 // replace a pending callback
1192 mCallback = 0;
1193 mCallbackFlags = 0;
1194
1195 nsCOMPtr<nsIOutputStreamCallback> proxy;
1196 if (target) {
1197 nsresult rv = NS_NewOutputStreamReadyEvent(getter_AddRefs(proxy),
1198 callback, target);
1199 if (NS_FAILED(rv)) return rv;
1200 callback = proxy;
1201 }
1202
1203 if (NS_FAILED(mPipe->mStatus) ||
1204 (mWritable && !(flags & WAIT_CLOSURE_ONLY))) {
1205 // stream is already closed or writable; post event.
1206 pipeEvents.NotifyOutputReady(this, callback);
1207 }
1208 else {
1209 // queue up callback object to be notified when data becomes available
1210 mCallback = callback;
1211 mCallbackFlags = flags;
1212 }
1213 }
1214 return NS_OK;
1215}
1216
1217NS_IMETHODIMP
1218nsPipeOutputStream::Seek(PRInt32 whence, PRInt64 offset)
1219{
1220 NS_NOTREACHED("nsPipeOutputStream::Seek");
1221 return NS_ERROR_NOT_IMPLEMENTED;
1222}
1223
1224NS_IMETHODIMP
1225nsPipeOutputStream::Tell(PRInt64 *offset)
1226{
1227 *offset = mLogicalOffset;
1228 return NS_OK;
1229}
1230
1231NS_IMETHODIMP
1232nsPipeOutputStream::SetEOF()
1233{
1234 NS_NOTREACHED("nsPipeOutputStream::SetEOF");
1235 return NS_ERROR_NOT_IMPLEMENTED;
1236}
1237
1238////////////////////////////////////////////////////////////////////////////////
1239
1240NS_COM nsresult
1241NS_NewPipe2(nsIAsyncInputStream **pipeIn,
1242 nsIAsyncOutputStream **pipeOut,
1243 PRBool nonBlockingInput,
1244 PRBool nonBlockingOutput,
1245 PRUint32 segmentSize,
1246 PRUint32 segmentCount,
1247 nsIMemory *segmentAlloc)
1248{
1249 nsresult rv;
1250
1251#if defined(PR_LOGGING)
1252 if (!gPipeLog)
1253 gPipeLog = PR_NewLogModule("nsPipe");
1254#endif
1255
1256 nsPipe *pipe = new nsPipe();
1257 if (!pipe)
1258 return NS_ERROR_OUT_OF_MEMORY;
1259
1260 rv = pipe->Init(nonBlockingInput,
1261 nonBlockingOutput,
1262 segmentSize,
1263 segmentCount,
1264 segmentAlloc);
1265 if (NS_FAILED(rv)) {
1266 NS_ADDREF(pipe);
1267 NS_RELEASE(pipe);
1268 return rv;
1269 }
1270
1271 pipe->GetInputStream(pipeIn);
1272 pipe->GetOutputStream(pipeOut);
1273 return NS_OK;
1274}
1275
1276////////////////////////////////////////////////////////////////////////////////
Note: See TracBrowser for help on using the repository browser.

© 2023 Oracle
ContactPrivacy policyTerms of Use