зеркало из https://github.com/mozilla/gecko-dev.git
1056 строки
33 KiB
C++
1056 строки
33 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include <algorithm>
|
|
#include "gtest/gtest.h"
|
|
#include "Helpers.h"
|
|
#include "mozilla/ReentrantMonitor.h"
|
|
#include "mozilla/Printf.h"
|
|
#include "nsCOMPtr.h"
|
|
#include "nsCRT.h"
|
|
#include "nsIAsyncInputStream.h"
|
|
#include "nsIAsyncOutputStream.h"
|
|
#include "nsIBufferedStreams.h"
|
|
#include "nsIClassInfo.h"
|
|
#include "nsICloneableInputStream.h"
|
|
#include "nsIInputStream.h"
|
|
#include "nsIOutputStream.h"
|
|
#include "nsIPipe.h"
|
|
#include "nsITellableStream.h"
|
|
#include "nsIThread.h"
|
|
#include "nsIRunnable.h"
|
|
#include "nsStreamUtils.h"
|
|
#include "nsString.h"
|
|
#include "nsThreadUtils.h"
|
|
#include "prinrval.h"
|
|
|
|
using namespace mozilla;
|
|
|
|
#define ITERATIONS 33333
|
|
char kTestPattern[] = "My hovercraft is full of eels.\n";
|
|
|
|
bool gTrace = false;
|
|
|
|
static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
|
|
uint32_t* lenWritten) {
|
|
const char* p = buf;
|
|
*lenWritten = 0;
|
|
while (bufLen) {
|
|
uint32_t n;
|
|
nsresult rv = os->Write(p, bufLen, &n);
|
|
if (NS_FAILED(rv)) return rv;
|
|
p += n;
|
|
bufLen -= n;
|
|
*lenWritten += n;
|
|
}
|
|
return NS_OK;
|
|
}
|
|
|
|
class nsReceiver final : public nsIRunnable {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
NS_IMETHOD Run() override {
|
|
nsresult rv;
|
|
char buf[101];
|
|
uint32_t count;
|
|
PRIntervalTime start = PR_IntervalNow();
|
|
while (true) {
|
|
rv = mIn->Read(buf, 100, &count);
|
|
if (NS_FAILED(rv)) {
|
|
printf("read failed\n");
|
|
break;
|
|
}
|
|
if (count == 0) {
|
|
// printf("EOF count = %d\n", mCount);
|
|
break;
|
|
}
|
|
|
|
if (gTrace) {
|
|
buf[count] = '\0';
|
|
printf("read: %s\n", buf);
|
|
}
|
|
mCount += count;
|
|
}
|
|
PRIntervalTime end = PR_IntervalNow();
|
|
printf("read %d bytes, time = %dms\n", mCount,
|
|
PR_IntervalToMilliseconds(end - start));
|
|
return rv;
|
|
}
|
|
|
|
explicit nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) {}
|
|
|
|
uint32_t GetBytesRead() { return mCount; }
|
|
|
|
private:
|
|
~nsReceiver() {}
|
|
|
|
protected:
|
|
nsCOMPtr<nsIInputStream> mIn;
|
|
uint32_t mCount;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable)
|
|
|
|
static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
|
|
RefPtr<nsReceiver> receiver = new nsReceiver(in);
|
|
if (!receiver) return NS_ERROR_OUT_OF_MEMORY;
|
|
|
|
nsresult rv;
|
|
|
|
nsCOMPtr<nsIThread> thread;
|
|
rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
|
|
if (NS_FAILED(rv)) return rv;
|
|
|
|
uint32_t total = 0;
|
|
PRIntervalTime start = PR_IntervalNow();
|
|
for (uint32_t i = 0; i < ITERATIONS; i++) {
|
|
uint32_t writeCount;
|
|
SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
|
|
uint32_t len = strlen(buf.get());
|
|
rv = WriteAll(out, buf.get(), len, &writeCount);
|
|
if (gTrace) {
|
|
printf("wrote: ");
|
|
for (uint32_t j = 0; j < writeCount; j++) {
|
|
putc(buf.get()[j], stdout);
|
|
}
|
|
printf("\n");
|
|
}
|
|
if (NS_FAILED(rv)) return rv;
|
|
total += writeCount;
|
|
}
|
|
rv = out->Close();
|
|
if (NS_FAILED(rv)) return rv;
|
|
|
|
PRIntervalTime end = PR_IntervalNow();
|
|
|
|
thread->Shutdown();
|
|
|
|
printf("wrote %d bytes, time = %dms\n", total,
|
|
PR_IntervalToMilliseconds(end - start));
|
|
EXPECT_EQ(receiver->GetBytesRead(), total);
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
class nsShortReader final : public nsIRunnable {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
NS_IMETHOD Run() override {
|
|
nsresult rv;
|
|
char buf[101];
|
|
uint32_t count;
|
|
uint32_t total = 0;
|
|
while (true) {
|
|
// if (gTrace)
|
|
// printf("calling Read\n");
|
|
rv = mIn->Read(buf, 100, &count);
|
|
if (NS_FAILED(rv)) {
|
|
printf("read failed\n");
|
|
break;
|
|
}
|
|
if (count == 0) {
|
|
break;
|
|
}
|
|
|
|
if (gTrace) {
|
|
// For next |printf()| call and possible others elsewhere.
|
|
buf[count] = '\0';
|
|
|
|
printf("read %d bytes: %s\n", count, buf);
|
|
}
|
|
|
|
Received(count);
|
|
total += count;
|
|
}
|
|
printf("read %d bytes\n", total);
|
|
return rv;
|
|
}
|
|
|
|
explicit nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) {
|
|
mMon = new ReentrantMonitor("nsShortReader");
|
|
}
|
|
|
|
void Received(uint32_t count) {
|
|
ReentrantMonitorAutoEnter mon(*mMon);
|
|
mReceived += count;
|
|
mon.Notify();
|
|
}
|
|
|
|
uint32_t WaitForReceipt(const uint32_t aWriteCount) {
|
|
ReentrantMonitorAutoEnter mon(*mMon);
|
|
uint32_t result = mReceived;
|
|
|
|
while (result < aWriteCount) {
|
|
mon.Wait();
|
|
|
|
EXPECT_TRUE(mReceived > result);
|
|
result = mReceived;
|
|
}
|
|
|
|
mReceived = 0;
|
|
return result;
|
|
}
|
|
|
|
private:
|
|
~nsShortReader() {}
|
|
|
|
protected:
|
|
nsCOMPtr<nsIInputStream> mIn;
|
|
uint32_t mReceived;
|
|
ReentrantMonitor* mMon;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable)
|
|
|
|
static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
|
|
RefPtr<nsShortReader> receiver = new nsShortReader(in);
|
|
if (!receiver) return NS_ERROR_OUT_OF_MEMORY;
|
|
|
|
nsresult rv;
|
|
|
|
nsCOMPtr<nsIThread> thread;
|
|
rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
|
|
if (NS_FAILED(rv)) return rv;
|
|
|
|
uint32_t total = 0;
|
|
for (uint32_t i = 0; i < ITERATIONS; i++) {
|
|
uint32_t writeCount;
|
|
SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
|
|
uint32_t len = strlen(buf.get());
|
|
len = len * rand() / RAND_MAX;
|
|
len = std::min(1u, len);
|
|
rv = WriteAll(out, buf.get(), len, &writeCount);
|
|
if (NS_FAILED(rv)) return rv;
|
|
EXPECT_EQ(writeCount, len);
|
|
total += writeCount;
|
|
|
|
if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
|
|
// printf("calling Flush\n");
|
|
out->Flush();
|
|
// printf("calling WaitForReceipt\n");
|
|
|
|
#ifdef DEBUG
|
|
const uint32_t received = receiver->WaitForReceipt(writeCount);
|
|
EXPECT_EQ(received, writeCount);
|
|
#endif
|
|
}
|
|
rv = out->Close();
|
|
if (NS_FAILED(rv)) return rv;
|
|
|
|
thread->Shutdown();
|
|
|
|
printf("wrote %d bytes\n", total);
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
class nsPump final : public nsIRunnable {
|
|
public:
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
|
|
NS_IMETHOD Run() override {
|
|
nsresult rv;
|
|
uint32_t count;
|
|
while (true) {
|
|
rv = mOut->WriteFrom(mIn, ~0U, &count);
|
|
if (NS_FAILED(rv)) {
|
|
printf("Write failed\n");
|
|
break;
|
|
}
|
|
if (count == 0) {
|
|
printf("EOF count = %d\n", mCount);
|
|
break;
|
|
}
|
|
|
|
if (gTrace) {
|
|
printf("Wrote: %d\n", count);
|
|
}
|
|
mCount += count;
|
|
}
|
|
mOut->Close();
|
|
return rv;
|
|
}
|
|
|
|
nsPump(nsIInputStream* in, nsIOutputStream* out)
|
|
: mIn(in), mOut(out), mCount(0) {}
|
|
|
|
private:
|
|
~nsPump() {}
|
|
|
|
protected:
|
|
nsCOMPtr<nsIInputStream> mIn;
|
|
nsCOMPtr<nsIOutputStream> mOut;
|
|
uint32_t mCount;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(nsPump, nsIRunnable)
|
|
|
|
TEST(Pipes, ChainedPipes)
|
|
{
|
|
nsresult rv;
|
|
if (gTrace) {
|
|
printf("TestChainedPipes\n");
|
|
}
|
|
|
|
nsCOMPtr<nsIInputStream> in1;
|
|
nsCOMPtr<nsIOutputStream> out1;
|
|
rv = NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);
|
|
if (NS_FAILED(rv)) return;
|
|
|
|
nsCOMPtr<nsIInputStream> in2;
|
|
nsCOMPtr<nsIOutputStream> out2;
|
|
rv = NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);
|
|
if (NS_FAILED(rv)) return;
|
|
|
|
RefPtr<nsPump> pump = new nsPump(in1, out2);
|
|
if (pump == nullptr) return;
|
|
|
|
nsCOMPtr<nsIThread> thread;
|
|
rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
|
|
if (NS_FAILED(rv)) return;
|
|
|
|
RefPtr<nsReceiver> receiver = new nsReceiver(in2);
|
|
if (receiver == nullptr) return;
|
|
|
|
nsCOMPtr<nsIThread> receiverThread;
|
|
rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
|
|
receiver);
|
|
if (NS_FAILED(rv)) return;
|
|
|
|
uint32_t total = 0;
|
|
for (uint32_t i = 0; i < ITERATIONS; i++) {
|
|
uint32_t writeCount;
|
|
SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
|
|
uint32_t len = strlen(buf.get());
|
|
len = len * rand() / RAND_MAX;
|
|
len = std::max(1u, len);
|
|
rv = WriteAll(out1, buf.get(), len, &writeCount);
|
|
if (NS_FAILED(rv)) return;
|
|
EXPECT_EQ(writeCount, len);
|
|
total += writeCount;
|
|
|
|
if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
|
|
}
|
|
if (gTrace) {
|
|
printf("wrote total of %d bytes\n", total);
|
|
}
|
|
rv = out1->Close();
|
|
if (NS_FAILED(rv)) return;
|
|
|
|
thread->Shutdown();
|
|
receiverThread->Shutdown();
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
static void RunTests(uint32_t segSize, uint32_t segCount) {
|
|
nsresult rv;
|
|
nsCOMPtr<nsIInputStream> in;
|
|
nsCOMPtr<nsIOutputStream> out;
|
|
uint32_t bufSize = segSize * segCount;
|
|
if (gTrace) {
|
|
printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
|
|
bufSize);
|
|
printf("Testing long writes...\n");
|
|
}
|
|
rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
|
|
EXPECT_TRUE(NS_SUCCEEDED(rv));
|
|
rv = TestPipe(in, out);
|
|
EXPECT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
if (gTrace) {
|
|
printf("Testing short writes...\n");
|
|
}
|
|
rv = NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
|
|
EXPECT_TRUE(NS_SUCCEEDED(rv));
|
|
rv = TestShortWrites(in, out);
|
|
EXPECT_TRUE(NS_SUCCEEDED(rv));
|
|
}
|
|
|
|
TEST(Pipes, Main)
|
|
{
|
|
RunTests(16, 1);
|
|
RunTests(4096, 16);
|
|
}
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
namespace {
|
|
|
|
static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;
|
|
|
|
// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
|
|
// manual read loop.
|
|
static void TestPipe2(uint32_t aNumBytes,
|
|
uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
|
|
nsCOMPtr<nsIInputStream> reader;
|
|
nsCOMPtr<nsIOutputStream> writer;
|
|
|
|
uint32_t maxSize = std::max(aNumBytes, aSegmentSize);
|
|
|
|
nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
aSegmentSize, maxSize);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(aNumBytes, inputData);
|
|
testing::WriteAllAndClose(writer, inputData);
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
TEST(Pipes, Blocking_32k)
|
|
{ TestPipe2(32 * 1024); }
|
|
|
|
TEST(Pipes, Blocking_64k)
|
|
{ TestPipe2(64 * 1024); }
|
|
|
|
TEST(Pipes, Blocking_128k)
|
|
{ TestPipe2(128 * 1024); }
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
namespace {
|
|
|
|
// Utility routine to validate pipe clone before. There are many knobs.
|
|
//
|
|
// aTotalBytes Total number of bytes to write to the pipe.
|
|
// aNumWrites How many separate write calls should be made. Bytes
|
|
// are evenly distributed over these write calls.
|
|
// aNumInitialClones How many clones of the pipe input stream should be
|
|
// made before writing begins.
|
|
// aNumToCloseAfterWrite How many streams should be closed after each write.
|
|
// One stream is always kept open. This verifies that
|
|
// closing one stream does not effect other open
|
|
// streams.
|
|
// aNumToCloneAfterWrite How many clones to create after each write. Occurs
|
|
// after closing any streams. This tests cloning
|
|
// active streams on a pipe that is being written to.
|
|
// aNumStreamToReadPerWrite How many streams to read fully after each write.
|
|
// This tests reading cloned streams at different rates
|
|
// while the pipe is being written to.
|
|
static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
|
|
uint32_t aNumInitialClones,
|
|
uint32_t aNumToCloseAfterWrite,
|
|
uint32_t aNumToCloneAfterWrite,
|
|
uint32_t aNumStreamsToReadPerWrite,
|
|
uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
|
|
nsCOMPtr<nsIInputStream> reader;
|
|
nsCOMPtr<nsIOutputStream> writer;
|
|
|
|
uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);
|
|
|
|
// Use async input streams so we can NS_ConsumeStream() the current data
|
|
// while the pipe is still being written to.
|
|
nsresult rv =
|
|
NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
|
|
maxSize, true, false); // non-blocking - reader, writer
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
|
|
ASSERT_TRUE(cloneable);
|
|
ASSERT_TRUE(cloneable->GetCloneable());
|
|
|
|
nsTArray<nsCString> outputDataList;
|
|
|
|
nsTArray<nsCOMPtr<nsIInputStream>> streamList;
|
|
|
|
// first stream is our original reader from the pipe
|
|
streamList.AppendElement(reader);
|
|
outputDataList.AppendElement();
|
|
|
|
// Clone the initial input stream the specified number of times
|
|
// before performing any writes.
|
|
for (uint32_t i = 0; i < aNumInitialClones; ++i) {
|
|
nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
|
|
rv = cloneable->Clone(getter_AddRefs(*clone));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
ASSERT_TRUE(*clone);
|
|
|
|
outputDataList.AppendElement();
|
|
}
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(aTotalBytes, inputData);
|
|
|
|
const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
|
|
uint32_t offset = 0;
|
|
uint32_t remaining = aTotalBytes;
|
|
uint32_t nextStreamToRead = 0;
|
|
|
|
while (remaining) {
|
|
uint32_t numToWrite = std::min(bytesPerWrite, remaining);
|
|
testing::Write(writer, inputData, offset, numToWrite);
|
|
offset += numToWrite;
|
|
remaining -= numToWrite;
|
|
|
|
// Close the specified number of streams. This allows us to
|
|
// test that one closed clone does not break other open clones.
|
|
for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
|
|
++i) {
|
|
uint32_t lastIndex = streamList.Length() - 1;
|
|
streamList[lastIndex]->Close();
|
|
streamList.RemoveElementAt(lastIndex);
|
|
outputDataList.RemoveElementAt(lastIndex);
|
|
|
|
if (nextStreamToRead >= streamList.Length()) {
|
|
nextStreamToRead = 0;
|
|
}
|
|
}
|
|
|
|
// Create the specified number of clones. This lets us verify
|
|
// that we can create clones in the middle of pipe reading and
|
|
// writing.
|
|
for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
|
|
nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
|
|
rv = cloneable->Clone(getter_AddRefs(*clone));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
ASSERT_TRUE(*clone);
|
|
|
|
// Initialize the new output data to make whats been read to data for
|
|
// the original stream. First stream is always the original stream.
|
|
nsCString* outputData = outputDataList.AppendElement();
|
|
*outputData = outputDataList[0];
|
|
}
|
|
|
|
// Read the specified number of streams. This lets us verify that we
|
|
// can read from the clones at different rates while the pipe is being
|
|
// written to.
|
|
for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
|
|
nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
|
|
nsCString& outputData = outputDataList[nextStreamToRead];
|
|
|
|
// Can't use ConsumeAndValidateStream() here because we're not
|
|
// guaranteed the exact amount read. It should just be at least
|
|
// as many as numToWrite.
|
|
nsAutoCString tmpOutputData;
|
|
rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
|
|
ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
|
|
ASSERT_GE(tmpOutputData.Length(), numToWrite);
|
|
|
|
outputData += tmpOutputData;
|
|
|
|
nextStreamToRead += 1;
|
|
if (nextStreamToRead >= streamList.Length()) {
|
|
// Note: When we wrap around on the streams being read, its possible
|
|
// we will trigger a segment to be deleted from the pipe. It
|
|
// would be nice to validate this here, but we don't have any
|
|
// QI'able interface that would let us check easily.
|
|
|
|
nextStreamToRead = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
rv = writer->Close();
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());
|
|
|
|
// Finally, read the remaining bytes from each stream. This may be
|
|
// different amounts of data depending on how much reading we did while
|
|
// writing. Verify that the end result matches the input data.
|
|
for (uint32_t i = 0; i < streamList.Length(); ++i) {
|
|
nsCOMPtr<nsIInputStream>& stream = streamList[i];
|
|
nsCString& outputData = outputDataList[i];
|
|
|
|
nsAutoCString tmpOutputData;
|
|
rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
|
|
ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
|
|
stream->Close();
|
|
|
|
// Append to total amount read from the stream
|
|
outputData += tmpOutputData;
|
|
|
|
ASSERT_EQ(inputString.Length(), outputData.Length());
|
|
ASSERT_TRUE(inputString.Equals(outputData));
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
|
|
{
|
|
TestPipeClone(32 * 1024, // total bytes
|
|
16, // num writes
|
|
3, // num initial clones
|
|
0, // num streams to close after each write
|
|
0, // num clones to add after each write
|
|
0); // num streams to read after each write
|
|
}
|
|
|
|
TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
|
|
{
|
|
// Since this reads all streams on every write, it should trigger the
|
|
// pipe cursor roll back optimization. Currently we can only verify
|
|
// this with logging.
|
|
|
|
TestPipeClone(32 * 1024, // total bytes
|
|
16, // num writes
|
|
3, // num initial clones
|
|
0, // num streams to close after each write
|
|
0, // num clones to add after each write
|
|
4); // num streams to read after each write
|
|
}
|
|
|
|
TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
|
|
{
|
|
TestPipeClone(32 * 1024, // total bytes
|
|
16, // num writes
|
|
0, // num initial clones
|
|
0, // num streams to close after each write
|
|
1, // num clones to add after each write
|
|
0); // num streams to read after each write
|
|
}
|
|
|
|
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
|
|
{
|
|
TestPipeClone(32 * 1024, // total bytes
|
|
16, // num writes
|
|
0, // num initial clones
|
|
0, // num streams to close after each write
|
|
1, // num clones to add after each write
|
|
1); // num streams to read after each write
|
|
}
|
|
|
|
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
|
|
{
|
|
// Since this reads streams faster than we clone new ones, it should
|
|
// trigger pipe segment deletion periodically. Currently we can
|
|
// only verify this with logging.
|
|
|
|
TestPipeClone(32 * 1024, // total bytes
|
|
16, // num writes
|
|
1, // num initial clones
|
|
1, // num streams to close after each write
|
|
2, // num clones to add after each write
|
|
3); // num streams to read after each write
|
|
}
|
|
|
|
TEST(Pipes, Write_AsyncWait)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> reader;
|
|
nsCOMPtr<nsIAsyncOutputStream> writer;
|
|
|
|
const uint32_t segmentSize = 1024;
|
|
const uint32_t numSegments = 1;
|
|
|
|
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
true, true, // non-blocking - reader, writer
|
|
segmentSize, numSegments);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(segmentSize, inputData);
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
|
|
|
|
RefPtr<testing::OutputStreamCallback> cb =
|
|
new testing::OutputStreamCallback();
|
|
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
TEST(Pipes, Write_AsyncWait_Clone)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> reader;
|
|
nsCOMPtr<nsIAsyncOutputStream> writer;
|
|
|
|
const uint32_t segmentSize = 1024;
|
|
const uint32_t numSegments = 1;
|
|
|
|
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
true, true, // non-blocking - reader, writer
|
|
segmentSize, numSegments);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsIInputStream> clone;
|
|
rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(segmentSize, inputData);
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// This attempts to write data beyond the original pipe size limit. It
|
|
// should fail since neither side of the clone has been read yet.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
|
|
|
|
RefPtr<testing::OutputStreamCallback> cb =
|
|
new testing::OutputStreamCallback();
|
|
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Consume data on the original stream, but the clone still has not been read.
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
|
|
// A clone that is not being read should not stall the other input stream
|
|
// reader. Therefore the writer callback should trigger when the fastest
|
|
// reader drains the other input stream.
|
|
ASSERT_TRUE(cb->Called());
|
|
|
|
// Attempt to write data. This will buffer data beyond the pipe size limit in
|
|
// order for the clone stream to still work. This is allowed because the
|
|
// other input stream has drained its buffered segments and is ready for more
|
|
// data.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// Again, this should fail since the origin stream has not been read again.
|
|
// The pipe size should still restrict how far ahead we can buffer even
|
|
// when there is a cloned stream not being read.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_FAILED(rv));
|
|
|
|
cb = new testing::OutputStreamCallback();
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// The write should again be blocked since we have written data and the
|
|
// main reader is at its maximum advance buffer.
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
nsTArray<char> expectedCloneData;
|
|
expectedCloneData.AppendElements(inputData);
|
|
expectedCloneData.AppendElements(inputData);
|
|
|
|
// We should now be able to consume the entire backlog of buffered data on
|
|
// the cloned stream.
|
|
testing::ConsumeAndValidateStream(clone, expectedCloneData);
|
|
|
|
// Draining the clone side should also trigger the AsyncWait() writer
|
|
// callback
|
|
ASSERT_TRUE(cb->Called());
|
|
|
|
// Finally, we should be able to consume the remaining data on the original
|
|
// reader.
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
}
|
|
|
|
TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> reader;
|
|
nsCOMPtr<nsIAsyncOutputStream> writer;
|
|
|
|
const uint32_t segmentSize = 1024;
|
|
const uint32_t numSegments = 1;
|
|
|
|
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
true, true, // non-blocking - reader, writer
|
|
segmentSize, numSegments);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsIInputStream> clone;
|
|
rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(segmentSize, inputData);
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// This attempts to write data beyond the original pipe size limit. It
|
|
// should fail since neither side of the clone has been read yet.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
|
|
|
|
RefPtr<testing::OutputStreamCallback> cb =
|
|
new testing::OutputStreamCallback();
|
|
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Consume data on the original stream, but the clone still has not been read.
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
|
|
// A clone that is not being read should not stall the other input stream
|
|
// reader. Therefore the writer callback should trigger when the fastest
|
|
// reader drains the other input stream.
|
|
ASSERT_TRUE(cb->Called());
|
|
|
|
// Attempt to write data. This will buffer data beyond the pipe size limit in
|
|
// order for the clone stream to still work. This is allowed because the
|
|
// other input stream has drained its buffered segments and is ready for more
|
|
// data.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// Again, this should fail since the origin stream has not been read again.
|
|
// The pipe size should still restrict how far ahead we can buffer even
|
|
// when there is a cloned stream not being read.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_FAILED(rv));
|
|
|
|
cb = new testing::OutputStreamCallback();
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// The write should again be blocked since we have written data and the
|
|
// main reader is at its maximum advance buffer.
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Close the original reader input stream. This was the fastest reader,
|
|
// so we should have a single stream that is buffered beyond our nominal
|
|
// limit.
|
|
reader->Close();
|
|
|
|
// Because the clone stream is still buffered the writable callback should
|
|
// not be fired.
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// And we should not be able to perform a write.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_FAILED(rv));
|
|
|
|
// Create another clone stream. Now we have two streams that exceed our
|
|
// maximum size limit
|
|
nsCOMPtr<nsIInputStream> clone2;
|
|
rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> expectedCloneData;
|
|
expectedCloneData.AppendElements(inputData);
|
|
expectedCloneData.AppendElements(inputData);
|
|
|
|
// We should now be able to consume the entire backlog of buffered data on
|
|
// the cloned stream.
|
|
testing::ConsumeAndValidateStream(clone, expectedCloneData);
|
|
|
|
// The pipe should now be writable because we have two open streams, one of
|
|
// which is completely drained.
|
|
ASSERT_TRUE(cb->Called());
|
|
|
|
// Write again to reach our limit again.
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
// The stream is again non-writeable.
|
|
cb = new testing::OutputStreamCallback();
|
|
rv = writer->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
// Close the empty stream. This is different from our previous close since
|
|
// before we were closing a stream with some data still buffered.
|
|
clone->Close();
|
|
|
|
// The pipe should not be writable. The second clone is still fully buffered
|
|
// over our limit.
|
|
ASSERT_FALSE(cb->Called());
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_FAILED(rv));
|
|
|
|
// Finally consume all of the buffered data on the second clone.
|
|
expectedCloneData.AppendElements(inputData);
|
|
testing::ConsumeAndValidateStream(clone2, expectedCloneData);
|
|
|
|
// Draining the final clone should make the pipe writable again.
|
|
ASSERT_TRUE(cb->Called());
|
|
}
|
|
|
|
TEST(Pipes, Read_AsyncWait)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> reader;
|
|
nsCOMPtr<nsIAsyncOutputStream> writer;
|
|
|
|
const uint32_t segmentSize = 1024;
|
|
const uint32_t numSegments = 1;
|
|
|
|
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
true, true, // non-blocking - reader, writer
|
|
segmentSize, numSegments);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(segmentSize, inputData);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
|
|
rv = reader->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_TRUE(cb->Called());
|
|
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
}
|
|
|
|
TEST(Pipes, Read_AsyncWait_Clone)
|
|
{
|
|
nsCOMPtr<nsIAsyncInputStream> reader;
|
|
nsCOMPtr<nsIAsyncOutputStream> writer;
|
|
|
|
const uint32_t segmentSize = 1024;
|
|
const uint32_t numSegments = 1;
|
|
|
|
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
true, true, // non-blocking - reader, writer
|
|
segmentSize, numSegments);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsIInputStream> clone;
|
|
rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
|
|
ASSERT_TRUE(asyncClone);
|
|
|
|
nsTArray<char> inputData;
|
|
testing::CreateData(segmentSize, inputData);
|
|
|
|
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
|
|
|
|
RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();
|
|
|
|
rv = reader->AsyncWait(cb, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb->Called());
|
|
|
|
rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_FALSE(cb2->Called());
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
ASSERT_TRUE(cb->Called());
|
|
ASSERT_TRUE(cb2->Called());
|
|
|
|
testing::ConsumeAndValidateStream(reader, inputData);
|
|
}
|
|
|
|
namespace {
|
|
|
|
nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
|
|
const char* aFromSegment, uint32_t aToOffset,
|
|
uint32_t aCount, uint32_t* aWriteCountOut) {
|
|
MOZ_RELEASE_ASSERT(aReader);
|
|
MOZ_RELEASE_ASSERT(aClosure);
|
|
MOZ_RELEASE_ASSERT(aFromSegment);
|
|
MOZ_RELEASE_ASSERT(aWriteCountOut);
|
|
MOZ_RELEASE_ASSERT(aToOffset == 0);
|
|
|
|
// This is insanity and you probably should not do this under normal
|
|
// conditions. We want to simulate the case where the pipe is closed
|
|
// (possibly from other end on another thread) simultaneously with the
|
|
// read. This is the easiest way to do trigger this case in a synchronous
|
|
// gtest.
|
|
MOZ_ALWAYS_SUCCEEDS(aReader->Close());
|
|
|
|
nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
|
|
buffer->AppendElements(aFromSegment, aCount);
|
|
|
|
*aWriteCountOut = aCount;
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
|
|
nsCOMPtr<nsIInputStream> reader;
|
|
nsCOMPtr<nsIOutputStream> writer;
|
|
|
|
const uint32_t maxSize = aSegmentSize;
|
|
|
|
nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
|
|
aSegmentSize, maxSize);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> inputData;
|
|
|
|
testing::CreateData(aDataSize, inputData);
|
|
|
|
uint32_t numWritten = 0;
|
|
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsTArray<char> outputData;
|
|
|
|
uint32_t numRead = 0;
|
|
rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
|
|
inputData.Length(), &numRead);
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
ASSERT_EQ(inputData.Length(), numRead);
|
|
|
|
ASSERT_EQ(inputData, outputData);
|
|
|
|
uint64_t available;
|
|
rv = reader->Available(&available);
|
|
ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
TEST(Pipes, Close_During_Read_Partial_Segment)
|
|
{ TestCloseDuringRead(1024, 512); }
|
|
|
|
TEST(Pipes, Close_During_Read_Full_Segment)
|
|
{ TestCloseDuringRead(1024, 1024); }
|
|
|
|
TEST(Pipes, Interfaces)
|
|
{
|
|
nsCOMPtr<nsIInputStream> reader;
|
|
nsCOMPtr<nsIOutputStream> writer;
|
|
|
|
nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));
|
|
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
|
|
|
nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType1);
|
|
|
|
nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType2);
|
|
|
|
nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType3);
|
|
|
|
nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType4);
|
|
|
|
nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType5);
|
|
|
|
nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
|
|
ASSERT_TRUE(readerType6);
|
|
}
|