Bug 1346446 - Add a DataPipe shared memory pipe type, r=ipc-reviewers,handyman

This is a high-level async pipe which can be cheaply transferred between
processes and uses a shared memory ring buffer as its implementation. This can
be used to efficiently stream non-message oriented data between processes and
is not bound to any particular protocol or thread.

Differential Revision: https://phabricator.services.mozilla.com/D135161
This commit is contained in:
Nika Layzell 2022-02-14 23:59:35 +00:00
Родитель de1e97c891
Коммит 40b4794120
9 изменённых файлов: 1162 добавлений и 0 удалений

705
ipc/glue/DataPipe.cpp Normal file
Просмотреть файл

@ -0,0 +1,705 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "DataPipe.h"
#include "mozilla/AlreadyAddRefed.h"
#include "mozilla/Assertions.h"
#include "mozilla/CheckedInt.h"
#include "mozilla/ErrorNames.h"
#include "mozilla/Logging.h"
#include "mozilla/ipc/InputStreamParams.h"
#include "nsIAsyncInputStream.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"
namespace mozilla {
namespace ipc {
LazyLogModule gDataPipeLog("DataPipe");
namespace data_pipe_detail {
// Helper for queueing up actions to be run once the mutex has been unlocked.
// Actions will be run in-order.
class DataPipeAutoLock {
public:
explicit DataPipeAutoLock(Mutex& aMutex) : mMutex(aMutex) { mMutex.Lock(); }
DataPipeAutoLock(const DataPipeAutoLock&) = delete;
DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete;
template <typename F>
void AddUnlockAction(F aAction) {
mActions.AppendElement(MakeUnique<Action<F>>(std::move(aAction)));
}
~DataPipeAutoLock() {
mMutex.Unlock();
for (auto& action : mActions) {
action->Run();
}
}
private:
// NOTE: This would be better served by something like llvm's
// `UniqueFunction`, but this works as well. We can't use `std::function`, as
// the actions may not be copy constructable.
struct IAction {
virtual void Run() = 0;
virtual ~IAction() = default;
};
template <typename F>
struct Action : IAction {
explicit Action(F&& aAction) : mAction(std::move(aAction)) {}
void Run() override { mAction(); }
F mAction;
};
Mutex& mMutex;
AutoTArray<UniquePtr<IAction>, 4> mActions;
};
static void DoNotifyOnUnlock(DataPipeAutoLock& aLock,
already_AddRefed<nsIRunnable> aCallback,
already_AddRefed<nsIEventTarget> aTarget) {
nsCOMPtr<nsIRunnable> callback{std::move(aCallback)};
nsCOMPtr<nsIEventTarget> target{std::move(aTarget)};
if (callback) {
aLock.AddUnlockAction(
[callback = std::move(callback), target = std::move(target)]() mutable {
if (target) {
target->Dispatch(callback.forget());
} else {
NS_DispatchBackgroundTask(callback.forget());
}
});
}
}
class DataPipeLink : public NodeController::PortObserver {
public:
DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex,
ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable)
: mMutex(std::move(aMutex)),
mPort(std::move(aPort)),
mShmem(aShmem),
mCapacity(aCapacity),
mReceiverSide(aReceiverSide),
mPeerStatus(aPeerStatus),
mOffset(aOffset),
mAvailable(aAvailable) {}
void Init() {
if (NS_SUCCEEDED(mPeerStatus)) {
MOZ_ASSERT(mPort.IsValid());
mPort.Controller()->SetPortObserver(mPort.Port(), this);
OnPortStatusChanged();
}
}
void OnPortStatusChanged() final;
// Add a task to notify the callback after `aLock` is unlocked.
//
// This method is safe to call multiple times, as after the first time it is
// called, `mCallback` will be cleared.
void NotifyOnUnlock(DataPipeAutoLock& aLock) {
DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget());
}
void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes) {
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get()));
if (NS_FAILED(mPeerStatus)) {
return;
}
// `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked
// but before we send the message. The strong controller and port references
// will allow us to try to send the message anyway, and it will be safely
// dropped if the port has already been closed. CONSUMED messages are safe
// to deliver out-of-order, so we don't need to worry about ordering here.
aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()},
port = mPort.Port(), aBytes]() mutable {
auto message = MakeUnique<IPC::Message>(
MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE);
WriteParam(message.get(), aBytes);
controller->SendUserMessage(port, std::move(message));
});
}
void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus,
bool aSendClosed = false) {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus),
aSendClosed ? ", send" : "", Describe(aLock).get()));
// The pipe was closed or errored. Clear the observer reference back
// to this type from the port layer, and ensure we notify waiters.
MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus));
mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] {
if (aSendClosed) {
auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE,
DATA_PIPE_CLOSED_MESSAGE_TYPE);
WriteParam(message.get(), aStatus);
port.Controller()->SendUserMessage(port.Port(), std::move(message));
}
// The `ScopedPort` being destroyed with this action will close it,
// clearing the observer reference from the ports layer.
});
NotifyOnUnlock(aLock);
}
nsCString Describe(DataPipeAutoLock& aLock) const {
return nsPrintfCString(
"[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]",
mReceiverSide ? "Receiver" : "Sender", this, mCapacity,
GetStaticErrorName(mPeerStatus), mOffset, mAvailable,
mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no");
}
// This mutex is shared with the `DataPipeBase` which owns this
// `DataPipeLink`.
std::shared_ptr<Mutex> mMutex;
ScopedPort mPort;
const RefPtr<SharedMemory> mShmem;
const uint32_t mCapacity;
const bool mReceiverSide;
bool mProcessingSegment = false;
nsresult mPeerStatus = NS_OK;
uint32_t mOffset = 0;
uint32_t mAvailable = 0;
bool mCallbackClosureOnly = false;
nsCOMPtr<nsIRunnable> mCallback;
nsCOMPtr<nsIEventTarget> mCallbackTarget;
};
void DataPipeLink::OnPortStatusChanged() {
DataPipeAutoLock lock(*mMutex);
while (NS_SUCCEEDED(mPeerStatus)) {
UniquePtr<IPC::Message> message;
if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) {
SetPeerError(lock, NS_BASE_STREAM_CLOSED);
return;
}
if (!message) {
return; // no more messages
}
PickleIterator iter(*message);
switch (message->type()) {
case DATA_PIPE_CLOSED_MESSAGE_TYPE: {
nsresult status = NS_OK;
if (!ReadParam(message.get(), &iter, &status)) {
NS_WARNING("Unable to parse nsresult error from peer");
status = NS_ERROR_UNEXPECTED;
}
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Got CLOSED(%s) %s", GetStaticErrorName(status),
Describe(lock).get()));
SetPeerError(lock, status);
return;
}
case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: {
uint32_t consumed = 0;
if (!ReadParam(message.get(), &iter, &consumed)) {
NS_WARNING("Unable to parse bytes consumed from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Got CONSUMED(%u) %s", consumed, Describe(lock).get()));
auto newAvailable = CheckedUint32{mAvailable} + consumed;
if (!newAvailable.isValid() || newAvailable.value() > mCapacity) {
NS_WARNING("Illegal bytes consumed message received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
mAvailable = newAvailable.value();
if (!mCallbackClosureOnly) {
NotifyOnUnlock(lock);
}
break;
}
default: {
NS_WARNING("Illegal message type received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED);
return;
}
}
}
}
DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError)
: mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
: "DataPipeSender")),
mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {}
DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort,
SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset,
uint32_t aAvailable)
: mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
: "DataPipeSender")),
mStatus(NS_OK),
mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort), aShmem,
aCapacity, aPeerStatus, aOffset, aAvailable)) {
mLink->Init();
}
DataPipeBase::~DataPipeBase() {
DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, NS_BASE_STREAM_CLOSED);
}
void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) {
if (NS_FAILED(mStatus)) {
return;
}
MOZ_LOG(
gDataPipeLog, LogLevel::Debug,
("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get()));
// Set our status to an errored status.
mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
RefPtr<DataPipeLink> link = mLink.forget();
link->NotifyOnUnlock(aLock);
// If our peer hasn't disappeared yet, clean up our connection to it.
if (NS_SUCCEEDED(link->mPeerStatus)) {
link->SetPeerError(aLock, mStatus, /* aSendClosed */ true);
}
}
nsresult DataPipeBase::ProcessSegmentsInternal(
uint32_t aCount, ProcessSegmentFun aProcessSegment,
uint32_t* aProcessedCount) {
*aProcessedCount = 0;
while (*aProcessedCount < aCount) {
DataPipeAutoLock lock(*mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount,
Describe(lock).get()));
nsresult status = CheckStatus(lock);
if (NS_FAILED(status)) {
if (*aProcessedCount > 0) {
return NS_OK;
}
return status == NS_BASE_STREAM_CLOSED ? NS_OK : status;
}
if (!mLink->mAvailable) {
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mLink->mPeerStatus),
"CheckStatus will have returned an error");
return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK;
}
MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment,
"Only one thread may be processing a segment at a time");
// Extract an iterator over the next contiguous region of the shared memory
// buffer which will be used .
char* start = static_cast<char*>(mLink->mShmem->memory()) + mLink->mOffset;
char* iter = start;
char* end = start + std::min({aCount, mLink->mAvailable,
mLink->mCapacity - mLink->mOffset});
// Record the consumed region from our segment when exiting this scope,
// telling our peer how many bytes were consumed. Hold on to `mLink` to keep
// the shmem mapped and make sure we can clean up even if we're closed while
// processing the shmem region.
RefPtr<DataPipeLink> link = mLink;
link->mProcessingSegment = true;
auto scopeExit = MakeScopeExit([&] {
MOZ_RELEASE_ASSERT(link->mProcessingSegment);
link->mProcessingSegment = false;
uint32_t totalProcessed = iter - start;
if (totalProcessed > 0) {
link->mOffset += totalProcessed;
MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity);
if (link->mOffset == link->mCapacity) {
link->mOffset = 0;
}
link->mAvailable -= totalProcessed;
link->SendBytesConsumedOnUnlock(lock, totalProcessed);
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Processed Segment(%u of %zu) %s", totalProcessed, end - start,
Describe(lock).get()));
});
{
MutexAutoUnlock unlock(*mMutex);
while (iter < end) {
uint32_t processed = 0;
Span segment{iter, end};
nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed);
if (NS_FAILED(rv) || processed == 0) {
return NS_OK;
}
MOZ_RELEASE_ASSERT(processed <= segment.Length());
iter += processed;
*aProcessedCount += processed;
}
}
}
return NS_OK;
}
void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback,
already_AddRefed<nsIEventTarget> aTarget,
bool aClosureOnly) {
RefPtr<nsIRunnable> callback = std::move(aCallback);
RefPtr<nsIEventTarget> target = std::move(aTarget);
DataPipeAutoLock lock(*mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)",
callback.get(), Describe(lock).get()));
if (NS_FAILED(CheckStatus(lock))) {
MOZ_ASSERT_IF(mLink, !mLink->mCallback);
DoNotifyOnUnlock(lock, callback.forget(), target.forget());
return;
}
// NOTE: After this point, `mLink` may have previously had a callback which is
// now being cancelled, make sure we clear `mCallback` even if we're going to
// call `aCallback` immediately.
mLink->mCallback = callback.forget();
mLink->mCallbackTarget = target.forget();
mLink->mCallbackClosureOnly = aClosureOnly;
if (!aClosureOnly && mLink->mAvailable) {
mLink->NotifyOnUnlock(lock);
}
}
nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) {
// If our peer has closed or errored, we may need to close our local side to
// reflect the error code our peer provided. If we're a sender, we want to
// become closed immediately, whereas if we're a receiver we want to wait
// until our available buffer has been exhausted.
//
// NOTE: There may still be 2-stage writes/reads ongoing at this point, which
// will continue due to `mLink` being kept alive by the
// `ProcessSegmentsInternal` function.
if (NS_SUCCEEDED(mStatus) && NS_FAILED(mLink->mPeerStatus) &&
(!mLink->mReceiverSide || !mLink->mAvailable)) {
CloseInternal(aLock, mLink->mPeerStatus);
}
return mStatus;
}
nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) {
if (mLink) {
return mLink->Describe(aLock);
}
return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus));
}
template <typename T>
void DataPipeWrite(IPC::Message* aMsg, T* aParam) {
DataPipeAutoLock lock(*aParam->mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Write: %s", aParam->Describe(lock).get()));
WriteParam(aMsg, aParam->mStatus);
if (NS_FAILED(aParam->mStatus)) {
return;
}
MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment,
"cannot transfer while processing a segment");
// Serialize relevant parameters to our peer.
WriteParam(aMsg, std::move(aParam->mLink->mPort));
MOZ_ALWAYS_TRUE(aParam->mLink->mShmem->WriteHandle(aMsg));
WriteParam(aMsg, aParam->mLink->mCapacity);
WriteParam(aMsg, aParam->mLink->mPeerStatus);
WriteParam(aMsg, aParam->mLink->mOffset);
WriteParam(aMsg, aParam->mLink->mAvailable);
// Mark our peer as closed so we don't try to send to it when closing.
aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED;
aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED);
}
template <typename T>
bool DataPipeRead(const IPC::Message* aMsg, PickleIterator* aIter,
RefPtr<T>* aResult) {
nsresult rv = NS_OK;
if (!ReadParam(aMsg, aIter, &rv)) {
NS_WARNING("failed to read status!");
return false;
}
if (NS_FAILED(rv)) {
*aResult = new T(rv);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Read: [status=%s]", GetStaticErrorName(rv)));
return true;
}
ScopedPort port;
RefPtr shmem = new SharedMemoryBasic();
uint32_t capacity = 0;
nsresult peerStatus = NS_OK;
uint32_t offset = 0;
uint32_t available = 0;
if (!ReadParam(aMsg, aIter, &port) || !shmem->ReadHandle(aMsg, aIter) ||
!ReadParam(aMsg, aIter, &capacity) ||
!ReadParam(aMsg, aIter, &peerStatus) ||
!ReadParam(aMsg, aIter, &offset) || !ReadParam(aMsg, aIter, &available)) {
NS_WARNING("failed to read fields!");
return false;
}
if (!capacity || offset >= capacity || available > capacity) {
NS_WARNING("inconsistent state values");
return false;
}
if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) {
NS_WARNING("failed to map shared memory");
return false;
}
*aResult =
new T(std::move(port), shmem, capacity, peerStatus, offset, available);
if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) {
DataPipeAutoLock lock(*(*aResult)->mMutex);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Read: %s", (*aResult)->Describe(lock).get()));
}
return true;
}
} // namespace data_pipe_detail
//-----------------------------------------------------------------------------
// DataPipeSender
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream,
DataPipeSender)
// nsIOutputStream
NS_IMETHODIMP DataPipeSender::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; }
NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount,
uint32_t* aWriteCount) {
return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount,
aWriteCount);
}
NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream,
uint32_t aCount,
uint32_t* aWriteCount) {
return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
aWriteCount);
}
NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader,
void* aClosure, uint32_t aCount,
uint32_t* aWriteCount) {
auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
uint32_t* aReadCount) -> nsresult {
return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
aReadCount);
};
return ProcessSegmentsInternal(aCount, processSegment, aWriteCount);
}
NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) {
*_retval = true;
return NS_OK;
}
// nsIAsyncOutputStream
NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, reason);
return NS_OK;
}
NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget) {
AsyncWaitInternal(NS_NewRunnableFunction(
"DataPipeReceiver::AsyncWait",
[self = RefPtr{this}, callback = RefPtr{aCallback}] {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Calling OnOutputStreamReady(%p, %p)",
callback.get(), self.get()));
callback->OnOutputStreamReady(self);
}),
do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
return NS_OK;
}
//-----------------------------------------------------------------------------
// DataPipeReceiver
//-----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream,
nsIIPCSerializableInputStream, DataPipeReceiver)
// nsIInputStream
NS_IMETHODIMP DataPipeReceiver::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
nsresult rv = CheckStatus(lock);
if (NS_FAILED(rv)) {
return rv;
}
*_retval = mLink->mAvailable;
return NS_OK;
}
NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount,
uint32_t* aReadCount) {
return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount);
}
NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aReadCount) {
auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
uint32_t* aWriteCount) -> nsresult {
return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
aWriteCount);
};
return ProcessSegmentsInternal(aCount, processSegment, aReadCount);
}
NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) {
*_retval = true;
return NS_OK;
}
// nsIAsyncInputStream
NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) {
data_pipe_detail::DataPipeAutoLock lock(*mMutex);
CloseInternal(lock, aStatus);
return NS_OK;
}
NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aTarget) {
AsyncWaitInternal(NS_NewRunnableFunction(
"DataPipeReceiver::AsyncWait",
[self = RefPtr{this}, callback = RefPtr{aCallback}] {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Calling OnInputStreamReady(%p, %p)",
callback.get(), self.get()));
callback->OnInputStreamReady(self);
}),
do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
return NS_OK;
}
// nsIIPCSerializableInputStream
void DataPipeReceiver::Serialize(InputStreamParams& aParams,
FileDescriptorArray& aFileDescriptors,
bool aDelayedStart, uint32_t aMaxSize,
uint32_t* aSizeUsed,
ParentToChildStreamActorManager* aManager) {
*aSizeUsed = 0;
aParams = DataPipeReceiverStreamParams(this);
}
void DataPipeReceiver::Serialize(InputStreamParams& aParams,
FileDescriptorArray& aFileDescriptors,
bool aDelayedStart, uint32_t aMaxSize,
uint32_t* aSizeUsed,
ChildToParentStreamActorManager* aManager) {
*aSizeUsed = 0;
aParams = DataPipeReceiverStreamParams(this);
}
bool DataPipeReceiver::Deserialize(
const InputStreamParams& aParams,
const FileDescriptorArray& aFileDescriptors) {
MOZ_CRASH("Handled directly in `DeserializeInputStream`");
}
//-----------------------------------------------------------------------------
// NewDataPipe
//-----------------------------------------------------------------------------
nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender,
DataPipeReceiver** aReceiver) {
if (!aCapacity) {
aCapacity = kDefaultDataPipeCapacity;
}
RefPtr<NodeController> controller = NodeController::GetSingleton();
if (!controller) {
return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
}
auto [senderPort, receiverPort] = controller->CreatePortPair();
auto shmem = MakeRefPtr<SharedMemoryBasic>();
size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity);
if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) {
return NS_ERROR_OUT_OF_MEMORY;
}
RefPtr sender = new DataPipeSender(std::move(senderPort), shmem, aCapacity,
NS_OK, 0, aCapacity);
RefPtr receiver = new DataPipeReceiver(std::move(receiverPort), shmem,
aCapacity, NS_OK, 0, 0);
sender.forget(aSender);
receiver.forget(aReceiver);
return NS_OK;
}
} // namespace ipc
} // namespace mozilla
void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write(
Message* aMsg, mozilla::ipc::DataPipeSender* aParam) {
mozilla::ipc::data_pipe_detail::DataPipeWrite(aMsg, aParam);
}
bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read(
const Message* aMsg, PickleIterator* aIter,
RefPtr<mozilla::ipc::DataPipeSender>* aResult) {
return mozilla::ipc::data_pipe_detail::DataPipeRead(aMsg, aIter, aResult);
}
void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write(
Message* aMsg, mozilla::ipc::DataPipeReceiver* aParam) {
mozilla::ipc::data_pipe_detail::DataPipeWrite(aMsg, aParam);
}
bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read(
const Message* aMsg, PickleIterator* aIter,
RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) {
return mozilla::ipc::data_pipe_detail::DataPipeRead(aMsg, aIter, aResult);
}

181
ipc/glue/DataPipe.h Normal file
Просмотреть файл

@ -0,0 +1,181 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
#ifndef mozilla_ipc_DataPipe_h
#define mozilla_ipc_DataPipe_h
#include "mozilla/ipc/SharedMemoryBasic.h"
#include "mozilla/ipc/NodeController.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIIPCSerializableInputStream.h"
#include "nsISupports.h"
namespace mozilla {
namespace ipc {
namespace data_pipe_detail {
class DataPipeAutoLock;
class DataPipeLink;
class DataPipeBase {
public:
DataPipeBase(const DataPipeBase&) = delete;
DataPipeBase& operator=(const DataPipeBase&) = delete;
protected:
explicit DataPipeBase(bool aReceiverSide, nsresult aError);
DataPipeBase(bool aReceiverSide, ScopedPort aPort, SharedMemory* aShmem,
uint32_t aCapacity, nsresult aPeerStatus, uint32_t aOffset,
uint32_t aAvailable);
void CloseInternal(DataPipeAutoLock&, nsresult aStatus);
void AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback,
already_AddRefed<nsIEventTarget> aTarget,
bool aClosureOnly);
// Like `nsWriteSegmentFun` or `nsReadSegmentFun`.
using ProcessSegmentFun =
FunctionRef<nsresult(Span<char> aSpan, uint32_t aProcessedThisCall,
uint32_t* aProcessedCount)>;
nsresult ProcessSegmentsInternal(uint32_t aCount,
ProcessSegmentFun aProcessSegment,
uint32_t* aProcessedCount);
nsresult CheckStatus(DataPipeAutoLock&);
nsCString Describe(DataPipeAutoLock&);
virtual ~DataPipeBase();
const std::shared_ptr<Mutex> mMutex;
nsresult mStatus = NS_OK;
RefPtr<DataPipeLink> mLink;
};
template <typename T>
void DataPipeWrite(IPC::Message* aMsg, T* aParam);
template <typename T>
bool DataPipeRead(const IPC::Message* aMsg, PickleIterator* aIter,
RefPtr<T>* aResult);
} // namespace data_pipe_detail
class DataPipeSender;
class DataPipeReceiver;
#define NS_DATAPIPESENDER_IID \
{ \
0x6698ed77, 0x9fff, 0x425d, { \
0xb0, 0xa6, 0x1d, 0x30, 0x66, 0xee, 0xb8, 0x16 \
} \
}
// Helper class for streaming data to another process.
class DataPipeSender final : public nsIAsyncOutputStream,
public data_pipe_detail::DataPipeBase {
public:
NS_DECLARE_STATIC_IID_ACCESSOR(NS_DATAPIPESENDER_IID)
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIOUTPUTSTREAM
NS_DECL_NSIASYNCOUTPUTSTREAM
private:
friend nsresult NewDataPipe(uint32_t, DataPipeSender**, DataPipeReceiver**);
friend void data_pipe_detail::DataPipeWrite<DataPipeSender>(
IPC::Message* aMsg, DataPipeSender* aParam);
friend bool data_pipe_detail::DataPipeRead<DataPipeSender>(
const IPC::Message* aMsg, PickleIterator* aIter,
RefPtr<DataPipeSender>* aResult);
explicit DataPipeSender(nsresult aError)
: data_pipe_detail::DataPipeBase(/* aReceiverSide */ false, aError) {}
DataPipeSender(ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable)
: data_pipe_detail::DataPipeBase(/* aReceiverSide */ false,
std::move(aPort), aShmem, aCapacity,
aPeerStatus, aOffset, aAvailable) {}
~DataPipeSender() = default;
};
NS_DEFINE_STATIC_IID_ACCESSOR(DataPipeSender, NS_DATAPIPESENDER_IID)
#define NS_DATAPIPERECEIVER_IID \
{ \
0x0a185f83, 0x499e, 0x450c, { \
0x95, 0x82, 0x27, 0x67, 0xad, 0x6d, 0x64, 0xb5 \
} \
}
// Helper class for streaming data from another process.
class DataPipeReceiver final : public nsIAsyncInputStream,
public nsIIPCSerializableInputStream,
public data_pipe_detail::DataPipeBase {
public:
NS_DECLARE_STATIC_IID_ACCESSOR(NS_DATAPIPERECEIVER_IID)
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIINPUTSTREAM
NS_DECL_NSIASYNCINPUTSTREAM
NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
private:
friend nsresult NewDataPipe(uint32_t, DataPipeSender**, DataPipeReceiver**);
friend void data_pipe_detail::DataPipeWrite<DataPipeReceiver>(
IPC::Message* aMsg, DataPipeReceiver* aParam);
friend bool data_pipe_detail::DataPipeRead<DataPipeReceiver>(
const IPC::Message* aMsg, PickleIterator* aIter,
RefPtr<DataPipeReceiver>* aResult);
explicit DataPipeReceiver(nsresult aError)
: data_pipe_detail::DataPipeBase(/* aReceiverSide */ true, aError) {}
DataPipeReceiver(ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity,
nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable)
: data_pipe_detail::DataPipeBase(/* aReceiverSide */ true,
std::move(aPort), aShmem, aCapacity,
aPeerStatus, aOffset, aAvailable) {}
~DataPipeReceiver() = default;
};
NS_DEFINE_STATIC_IID_ACCESSOR(DataPipeReceiver, NS_DATAPIPERECEIVER_IID)
constexpr uint32_t kDefaultDataPipeCapacity = 64 * 1024;
/**
* Create a new DataPipe pair. The sender and receiver ends of the pipe may be
* used to transfer data between processes. |aCapacity| is the capacity of the
* underlying ring buffer. If `0` is passed, `kDefaultDataPipeCapacity` will be
* used.
*/
nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender,
DataPipeReceiver** aReceiver);
} // namespace ipc
} // namespace mozilla
namespace IPC {
template <>
struct ParamTraits<mozilla::ipc::DataPipeSender*> {
static void Write(Message* aMsg, mozilla::ipc::DataPipeSender* aParam);
static bool Read(const Message* aMsg, PickleIterator* aIter,
RefPtr<mozilla::ipc::DataPipeSender>* aResult);
};
template <>
struct ParamTraits<mozilla::ipc::DataPipeReceiver*> {
static void Write(Message* aMsg, mozilla::ipc::DataPipeReceiver* aParam);
static bool Read(const Message* aMsg, PickleIterator* aIter,
RefPtr<mozilla::ipc::DataPipeReceiver>* aResult);
};
} // namespace IPC
#endif // mozilla_ipc_DataPipe_h

Просмотреть файл

@ -9,6 +9,7 @@ include protocol PParentToChildStream;
include protocol PRemoteLazyInputStream;
using struct mozilla::void_t from "mozilla/ipc/IPCCore.h";
[RefCounted] using class DataPipeReceiver from "mozilla/ipc/DataPipe.h";
namespace mozilla {
namespace ipc {
@ -80,6 +81,11 @@ struct IPCRemoteStreamParams
int64_t length;
};
struct DataPipeReceiverStreamParams
{
DataPipeReceiver pipe;
};
union InputStreamParams
{
StringInputStreamParams;
@ -92,6 +98,7 @@ union InputStreamParams
InputStreamLengthWrapperParams;
IPCRemoteStreamParams;
EncryptedFileInputStreamParams;
DataPipeReceiverStreamParams;
};
struct EncryptedFileInputStreamParams

Просмотреть файл

@ -12,6 +12,7 @@
#include "mozilla/dom/File.h"
#include "mozilla/dom/quota/DecryptingInputStream_impl.h"
#include "mozilla/dom/quota/IPCStreamCipherStrategy.h"
#include "mozilla/ipc/DataPipe.h"
#include "mozilla/ipc/IPCStreamDestination.h"
#include "mozilla/ipc/IPCStreamSource.h"
#include "mozilla/InputStreamLengthHelper.h"
@ -234,6 +235,9 @@ void InputStreamHelper::PostSerializationActivation(InputStreamParams& aParams,
return;
}
case InputStreamParams::TDataPipeReceiverStreamParams:
break;
case InputStreamParams::TStringInputStreamParams:
break;
@ -318,6 +322,12 @@ already_AddRefed<nsIInputStream> InputStreamHelper::DeserializeInputStream(
return destinationStream->TakeReader();
}
if (aParams.type() == InputStreamParams::TDataPipeReceiverStreamParams) {
const DataPipeReceiverStreamParams& pipeParams =
aParams.get_DataPipeReceiverStreamParams();
return do_AddRef(pipeParams.pipe());
}
nsCOMPtr<nsIIPCSerializableInputStream> serializable;
switch (aParams.type()) {

Просмотреть файл

@ -55,6 +55,10 @@ namespace {
// protocol 0. Oops! We can get away with this until protocol 0
// starts approaching its 65,536th message.
enum {
// Message types used by DataPipe
DATA_PIPE_CLOSED_MESSAGE_TYPE = kuint16max - 18,
DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE = kuint16max - 17,
// Message types used by NodeChannel
ACCEPT_INVITE_MESSAGE_TYPE = kuint16max - 16,
REQUEST_INTRODUCTION_MESSAGE_TYPE = kuint16max - 15,

Просмотреть файл

@ -23,6 +23,7 @@ EXPORTS.mozilla.ipc += [
"CrashReporterHost.h",
"CrossProcessMutex.h",
"CrossProcessSemaphore.h",
"DataPipe.h",
"Endpoint.h",
"EnvironmentMap.h",
"FileDescriptor.h",
@ -158,6 +159,7 @@ UNIFIED_SOURCES += [
"BrowserProcessSubThread.cpp",
"CrashReporterClient.cpp",
"CrashReporterHost.cpp",
"DataPipe.cpp",
"Endpoint.cpp",
"FileDescriptor.cpp",
"FileDescriptorUtils.cpp",

248
ipc/gtest/TestDataPipe.cpp Normal file
Просмотреть файл

@ -0,0 +1,248 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "chrome/common/ipc_message.h"
#include "gtest/gtest.h"
#include "mozilla/ipc/DataPipe.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsStreamUtils.h"
namespace mozilla::ipc {
namespace {
struct InputStreamCallback : public nsIInputStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit InputStreamCallback(
std::function<nsresult(nsIAsyncInputStream*)> aFunc = nullptr)
: mFunc(std::move(aFunc)) {}
NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
MOZ_ALWAYS_FALSE(mCalled.exchange(true));
return mFunc ? mFunc(aStream) : NS_OK;
}
bool Called() const { return mCalled; }
private:
virtual ~InputStreamCallback() = default;
std::atomic<bool> mCalled = false;
std::function<nsresult(nsIAsyncInputStream*)> mFunc;
};
NS_IMPL_ISUPPORTS(InputStreamCallback, nsIInputStreamCallback)
struct OutputStreamCallback : public nsIOutputStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit OutputStreamCallback(
std::function<nsresult(nsIAsyncOutputStream*)> aFunc = nullptr)
: mFunc(std::move(aFunc)) {}
NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
MOZ_ALWAYS_FALSE(mCalled.exchange(true));
return mFunc ? mFunc(aStream) : NS_OK;
}
bool Called() const { return mCalled; }
private:
virtual ~OutputStreamCallback() = default;
std::atomic<bool> mCalled = false;
std::function<nsresult(nsIAsyncOutputStream*)> mFunc;
};
NS_IMPL_ISUPPORTS(OutputStreamCallback, nsIOutputStreamCallback)
// Populate an array with the given number of bytes. Data is lorem ipsum
// random text, but deterministic across multiple calls.
void CreateData(uint32_t aNumBytes, nsCString& aDataOut) {
static const char data[] =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec egestas "
"purus eu condimentum iaculis. In accumsan leo eget odio porttitor, non "
"rhoncus nulla vestibulum. Etiam lacinia consectetur nisl nec "
"sollicitudin. Sed fringilla accumsan diam, pulvinar varius massa. Duis "
"mollis dignissim felis, eget tempus nisi tristique ut. Fusce euismod, "
"lectus non lacinia tempor, tellus diam suscipit quam, eget hendrerit "
"lacus nunc fringilla ante. Sed ultrices massa vitae risus molestie, ut "
"finibus quam laoreet nullam.";
static const uint32_t dataLength = sizeof(data) - 1;
aDataOut.SetCapacity(aNumBytes);
while (aNumBytes > 0) {
uint32_t amount = std::min(dataLength, aNumBytes);
aDataOut.Append(data, amount);
aNumBytes -= amount;
}
}
// Synchronously consume the given input stream and validate the resulting data
// against the given string of expected values.
void ConsumeAndValidateStream(nsIInputStream* aStream,
const nsACString& aExpectedData) {
nsAutoCString outputData;
nsresult rv = NS_ConsumeStream(aStream, UINT32_MAX, outputData);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_EQ(aExpectedData.Length(), outputData.Length());
ASSERT_TRUE(aExpectedData.Equals(outputData));
}
} // namespace
TEST(DataPipe, SegmentedReadWrite)
{
RefPtr<DataPipeReceiver> reader;
RefPtr<DataPipeSender> writer;
nsresult rv =
NewDataPipe(1024, getter_AddRefs(writer), getter_AddRefs(reader));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCString inputData1;
CreateData(512, inputData1);
uint32_t numWritten = 0;
rv = writer->Write(inputData1.BeginReading(), inputData1.Length(),
&numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
EXPECT_EQ(numWritten, 512u);
uint64_t available = 0;
rv = reader->Available(&available);
EXPECT_EQ(available, 512u);
ConsumeAndValidateStream(reader, inputData1);
nsCString inputData2;
CreateData(1024, inputData2);
rv = writer->Write(inputData2.BeginReading(), inputData2.Length(),
&numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
EXPECT_EQ(numWritten, 1024u);
rv = reader->Available(&available);
EXPECT_EQ(available, 1024u);
ConsumeAndValidateStream(reader, inputData2);
}
TEST(DataPipe, Write_AsyncWait)
{
RefPtr<DataPipeReceiver> reader;
RefPtr<DataPipeSender> writer;
const uint32_t segmentSize = 1024;
nsresult rv =
NewDataPipe(segmentSize, getter_AddRefs(writer), getter_AddRefs(reader));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCString inputData;
CreateData(segmentSize, inputData);
uint32_t numWritten = 0;
rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
EXPECT_EQ(numWritten, segmentSize);
rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
RefPtr<OutputStreamCallback> cb = new OutputStreamCallback();
rv = writer->AsyncWait(cb, 0, 0, GetCurrentSerialEventTarget());
ASSERT_TRUE(NS_SUCCEEDED(rv));
NS_ProcessPendingEvents(nullptr);
ASSERT_FALSE(cb->Called());
ConsumeAndValidateStream(reader, inputData);
ASSERT_FALSE(cb->Called());
NS_ProcessPendingEvents(nullptr);
ASSERT_TRUE(cb->Called());
}
TEST(DataPipe, Read_AsyncWait)
{
RefPtr<DataPipeReceiver> reader;
RefPtr<DataPipeSender> writer;
const uint32_t segmentSize = 1024;
nsresult rv =
NewDataPipe(segmentSize, getter_AddRefs(writer), getter_AddRefs(reader));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCString inputData;
CreateData(segmentSize, inputData);
RefPtr<InputStreamCallback> cb = new InputStreamCallback();
rv = reader->AsyncWait(cb, 0, 0, GetCurrentSerialEventTarget());
ASSERT_TRUE(NS_SUCCEEDED(rv));
NS_ProcessPendingEvents(nullptr);
ASSERT_FALSE(cb->Called());
uint32_t numWritten = 0;
rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_FALSE(cb->Called());
NS_ProcessPendingEvents(nullptr);
ASSERT_TRUE(cb->Called());
ConsumeAndValidateStream(reader, inputData);
}
TEST(DataPipe, SerializeReader)
{
RefPtr<DataPipeReceiver> reader;
RefPtr<DataPipeSender> writer;
nsresult rv =
NewDataPipe(1024, getter_AddRefs(writer), getter_AddRefs(reader));
ASSERT_TRUE(NS_SUCCEEDED(rv));
IPC::Message msg(MSG_ROUTING_NONE, 0);
WriteParam(&msg, reader);
uint64_t available = 0;
rv = reader->Available(&available);
ASSERT_TRUE(NS_FAILED(rv));
nsCString inputData;
CreateData(512, inputData);
uint32_t numWritten = 0;
rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
RefPtr<DataPipeReceiver> reader2;
PickleIterator iter(msg);
ASSERT_TRUE(ReadParam(&msg, &iter, &reader2));
ASSERT_TRUE(reader2);
rv = reader2->Available(&available);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_EQ(available, 512u);
ConsumeAndValidateStream(reader2, inputData);
}
} // namespace mozilla::ipc

Просмотреть файл

@ -7,6 +7,7 @@
Library("ipctest")
SOURCES += [
"TestDataPipe.cpp",
"TestLogging.cpp",
"TestSharedMemory.cpp",
]

Просмотреть файл

@ -271,6 +271,10 @@ for protocol in sorted(allmessages.keys()):
print(
"""
case DATA_PIPE_CLOSED_MESSAGE_TYPE:
return "DATA_PIPE_CLOSED_MESSAGE";
case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE:
return "DATA_PIPE_BYTES_CONSUMED_MESSAGE";
case ACCEPT_INVITE_MESSAGE_TYPE:
return "ACCEPT_INVITE_MESSAGE";
case REQUEST_INTRODUCTION_MESSAGE_TYPE: