Bug 1754004 - Part 14: Fix potential skipping of async streams in nsMultiplexInputStream, r=asuth

Before this change, we could run into issues when an async stream
accepts new data and queues a runnable to notify the stream listener,
but before that stream listener's runnable is run, the target thread
reads data from the async stream, emptying it of any available data.
This would mean that the stream appears empty in
nsMultiplexInputStream::OnInputStreamReady, and the rest of the stream
is skipped.

This patch changes the logic in OnInputStreamReady to re-call AsyncWait
in those situations and avoid skipping over the empty stream, preventing
data loss.

This situation came up when running some HTTP tests under rr with the
socket process enabled, as the upload stream can both be read due to an
OnInputStreamReady callback and due to other calls in the HTTP
machinery, leading to this potential race. With the socket process
enabled, it is possible for the upload stream to be an async pipe, which
doesn't happen in the parent process due to stream normalization.

This change makes an assumption about the behaviour of streams
implementing `nsIAsyncInputStream` which I believe is correct right now,
and has been documented. I originally had other patches which modified
`Available()`'s definition to add an extra outparameter or added extra
methods, but this seemed much simpler and accurate based on my memory of
having read the Available() implementations for async streams in-tree.

A gtest was added for the failing situation using both nsPipe and DataPipe.

Differential Revision: https://phabricator.services.mozilla.com/D144451
This commit is contained in:
Nika Layzell 2022-05-13 14:16:14 +00:00
Родитель 6556d0e4eb
Коммит ae579d351c
3 изменённых файлов: 135 добавлений и 3 удалений

Просмотреть файл

@ -80,6 +80,10 @@ interface nsIInputStream : nsISupports
* this method returns 0 bytes available. (Note: some nsIInputStream
* implementations automatically close when eof is reached; some do not).
*
* NOTE: Streams implementing nsIAsyncInputStream must automatically close
* when eof is reached, as otherwise it is impossible to distinguish between
* a stream waiting for more data and a stream at EOF using Available().
*
* @return number of bytes currently available in the stream.
*
* @throws NS_BASE_STREAM_CLOSED if the stream is closed normally.

Просмотреть файл

@ -905,9 +905,15 @@ nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
if (NS_SUCCEEDED(mStatus)) {
uint64_t avail = 0;
nsresult rv = aStream->Available(&avail);
if (rv == NS_BASE_STREAM_CLOSED || avail == 0) {
// This stream is closed or empty, let's move to the following one.
++mCurrentStream;
if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
// This stream is closed or empty, let's move to the following one,
// otherwise we need to re-wait on the current stream, as it currently
// has no data available.
// Unlike streams not implementing nsIAsyncInputStream, async streams
// cannot use `Available() == 0` to indicate EOF.
if (NS_FAILED(rv)) {
++mCurrentStream;
}
MutexAutoUnlock unlock(mLock);
return AsyncWaitInternal();
}

Просмотреть файл

@ -5,11 +5,14 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "gtest/gtest.h"
#include "mozilla/ipc/DataPipe.h"
#include "mozilla/SpinEventLoopUntil.h"
#include "nsIAsyncInputStream.h"
#include "nsComponentManagerUtils.h"
#include "nsIAsyncOutputStream.h"
#include "nsIInputStream.h"
#include "nsIMultiplexInputStream.h"
#include "nsIPipe.h"
#include "nsISeekableStream.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"
@ -820,3 +823,122 @@ TEST(MultiplexInputStream, LengthInputStream)
ASSERT_FALSE(callback1->Called());
ASSERT_TRUE(callback2->Called());
}
void TestMultiplexStreamReadWhileWaiting(nsIAsyncInputStream* pipeIn,
nsIAsyncOutputStream* pipeOut) {
// We had an issue where a stream which was read while a message was in-flight
// to report the stream was ready, meaning that the stream reported 0 bytes
// available when checked in the MultiplexInputStream's callback, and was
// skipped over.
nsCOMPtr<nsIThread> mainThread = NS_GetCurrentThread();
nsCOMPtr<nsIMultiplexInputStream> multiplexStream =
do_CreateInstance("@mozilla.org/io/multiplex-input-stream;1");
ASSERT_TRUE(NS_SUCCEEDED(multiplexStream->AppendStream(pipeIn)));
nsCOMPtr<nsIInputStream> stringStream;
ASSERT_TRUE(NS_SUCCEEDED(
NS_NewCStringInputStream(getter_AddRefs(stringStream), "xxxx\0"_ns)));
ASSERT_TRUE(NS_SUCCEEDED(multiplexStream->AppendStream(stringStream)));
nsCOMPtr<nsIAsyncInputStream> asyncMultiplex =
do_QueryInterface(multiplexStream);
ASSERT_TRUE(asyncMultiplex);
RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->AsyncWait(cb, 0, 0, mainThread)));
EXPECT_FALSE(cb->Called());
NS_ProcessPendingEvents(mainThread);
EXPECT_FALSE(cb->Called());
uint64_t available;
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, 0u);
// Write some data to the pipe, which should wake up the async wait message to
// be delivered.
char toWrite[] = "1234";
uint32_t written;
ASSERT_TRUE(NS_SUCCEEDED(pipeOut->Write(toWrite, sizeof(toWrite), &written)));
EXPECT_EQ(written, sizeof(toWrite));
EXPECT_FALSE(cb->Called());
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, sizeof(toWrite));
// Read that data from the stream
char toRead[sizeof(toWrite)];
uint32_t read;
ASSERT_TRUE(
NS_SUCCEEDED(asyncMultiplex->Read(toRead, sizeof(toRead), &read)));
EXPECT_EQ(read, sizeof(toRead));
EXPECT_STREQ(toRead, toWrite);
EXPECT_FALSE(cb->Called());
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, 0u);
// The multiplex stream will have detected the read and prevented the callback
// from having been called yet.
NS_ProcessPendingEvents(mainThread);
EXPECT_FALSE(cb->Called());
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, 0u);
// Write more data and close, then make sure we can read everything else in
// the stream.
char toWrite2[] = "56789";
ASSERT_TRUE(
NS_SUCCEEDED(pipeOut->Write(toWrite2, sizeof(toWrite2), &written)));
EXPECT_EQ(written, sizeof(toWrite2));
EXPECT_FALSE(cb->Called());
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, sizeof(toWrite2));
ASSERT_TRUE(NS_SUCCEEDED(pipeOut->Close()));
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
// XXX: Theoretically if the multiplex stream could detect it, we could report
// `sizeof(toWrite2) + 4` because the stream is complete, but there's no way
// for the multiplex stream to know.
EXPECT_EQ(available, sizeof(toWrite2));
NS_ProcessPendingEvents(mainThread);
EXPECT_TRUE(cb->Called());
// Read that final bit of data and make sure we read it.
char toRead2[sizeof(toWrite2)];
ASSERT_TRUE(
NS_SUCCEEDED(asyncMultiplex->Read(toRead2, sizeof(toRead2), &read)));
EXPECT_EQ(read, sizeof(toRead2));
EXPECT_STREQ(toRead2, toWrite2);
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, 5u);
// Read the extra data as well.
char extraRead[5];
ASSERT_TRUE(
NS_SUCCEEDED(asyncMultiplex->Read(extraRead, sizeof(extraRead), &read)));
EXPECT_EQ(read, sizeof(extraRead));
EXPECT_STREQ(extraRead, "xxxx");
ASSERT_TRUE(NS_SUCCEEDED(asyncMultiplex->Available(&available)));
EXPECT_EQ(available, 0u);
}
TEST(MultiplexInputStream, ReadWhileWaiting_nsPipe)
{
nsCOMPtr<nsIAsyncInputStream> pipeIn;
nsCOMPtr<nsIAsyncOutputStream> pipeOut;
ASSERT_TRUE(NS_SUCCEEDED(NS_NewPipe2(getter_AddRefs(pipeIn),
getter_AddRefs(pipeOut), true, true)));
TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut);
}
TEST(MultiplexInputStream, ReadWhileWaiting_DataPipe)
{
RefPtr<mozilla::ipc::DataPipeReceiver> pipeIn;
RefPtr<mozilla::ipc::DataPipeSender> pipeOut;
ASSERT_TRUE(NS_SUCCEEDED(mozilla::ipc::NewDataPipe(
mozilla::ipc::kDefaultDataPipeCapacity, getter_AddRefs(pipeOut),
getter_AddRefs(pipeIn))));
TestMultiplexStreamReadWhileWaiting(pipeIn, pipeOut);
}