diff --git a/ipc/glue/DataPipe.cpp b/ipc/glue/DataPipe.cpp new file mode 100644 index 000000000000..3cd05269cb00 --- /dev/null +++ b/ipc/glue/DataPipe.cpp @@ -0,0 +1,705 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "DataPipe.h" +#include "mozilla/AlreadyAddRefed.h" +#include "mozilla/Assertions.h" +#include "mozilla/CheckedInt.h" +#include "mozilla/ErrorNames.h" +#include "mozilla/Logging.h" +#include "mozilla/ipc/InputStreamParams.h" +#include "nsIAsyncInputStream.h" +#include "nsStreamUtils.h" +#include "nsThreadUtils.h" + +namespace mozilla { +namespace ipc { + +LazyLogModule gDataPipeLog("DataPipe"); + +namespace data_pipe_detail { + +// Helper for queueing up actions to be run once the mutex has been unlocked. +// Actions will be run in-order. +class DataPipeAutoLock { + public: + explicit DataPipeAutoLock(Mutex& aMutex) : mMutex(aMutex) { mMutex.Lock(); } + DataPipeAutoLock(const DataPipeAutoLock&) = delete; + DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete; + + template + void AddUnlockAction(F aAction) { + mActions.AppendElement(MakeUnique>(std::move(aAction))); + } + + ~DataPipeAutoLock() { + mMutex.Unlock(); + for (auto& action : mActions) { + action->Run(); + } + } + + private: + // NOTE: This would be better served by something like llvm's + // `UniqueFunction`, but this works as well. We can't use `std::function`, as + // the actions may not be copy constructable. + struct IAction { + virtual void Run() = 0; + virtual ~IAction() = default; + }; + template + struct Action : IAction { + explicit Action(F&& aAction) : mAction(std::move(aAction)) {} + void Run() override { mAction(); } + F mAction; + }; + + Mutex& mMutex; + AutoTArray, 4> mActions; +}; + +static void DoNotifyOnUnlock(DataPipeAutoLock& aLock, + already_AddRefed aCallback, + already_AddRefed aTarget) { + nsCOMPtr callback{std::move(aCallback)}; + nsCOMPtr target{std::move(aTarget)}; + if (callback) { + aLock.AddUnlockAction( + [callback = std::move(callback), target = std::move(target)]() mutable { + if (target) { + target->Dispatch(callback.forget()); + } else { + NS_DispatchBackgroundTask(callback.forget()); + } + }); + } +} + +class DataPipeLink : public NodeController::PortObserver { + public: + DataPipeLink(bool aReceiverSide, std::shared_ptr aMutex, + ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable) + : mMutex(std::move(aMutex)), + mPort(std::move(aPort)), + mShmem(aShmem), + mCapacity(aCapacity), + mReceiverSide(aReceiverSide), + mPeerStatus(aPeerStatus), + mOffset(aOffset), + mAvailable(aAvailable) {} + + void Init() { + if (NS_SUCCEEDED(mPeerStatus)) { + MOZ_ASSERT(mPort.IsValid()); + mPort.Controller()->SetPortObserver(mPort.Port(), this); + OnPortStatusChanged(); + } + } + + void OnPortStatusChanged() final; + + // Add a task to notify the callback after `aLock` is unlocked. + // + // This method is safe to call multiple times, as after the first time it is + // called, `mCallback` will be cleared. + void NotifyOnUnlock(DataPipeAutoLock& aLock) { + DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget()); + } + + void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes) { + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get())); + if (NS_FAILED(mPeerStatus)) { + return; + } + + // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked + // but before we send the message. The strong controller and port references + // will allow us to try to send the message anyway, and it will be safely + // dropped if the port has already been closed. CONSUMED messages are safe + // to deliver out-of-order, so we don't need to worry about ordering here. + aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()}, + port = mPort.Port(), aBytes]() mutable { + auto message = MakeUnique( + MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE); + WriteParam(message.get(), aBytes); + controller->SendUserMessage(port, std::move(message)); + }); + } + + void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus, + bool aSendClosed = false) { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus), + aSendClosed ? ", send" : "", Describe(aLock).get())); + // The pipe was closed or errored. Clear the observer reference back + // to this type from the port layer, and ensure we notify waiters. + MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus)); + mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; + aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] { + if (aSendClosed) { + auto message = MakeUnique(MSG_ROUTING_NONE, + DATA_PIPE_CLOSED_MESSAGE_TYPE); + WriteParam(message.get(), aStatus); + port.Controller()->SendUserMessage(port.Port(), std::move(message)); + } + // The `ScopedPort` being destroyed with this action will close it, + // clearing the observer reference from the ports layer. + }); + NotifyOnUnlock(aLock); + } + + nsCString Describe(DataPipeAutoLock& aLock) const { + return nsPrintfCString( + "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]", + mReceiverSide ? "Receiver" : "Sender", this, mCapacity, + GetStaticErrorName(mPeerStatus), mOffset, mAvailable, + mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no"); + } + + // This mutex is shared with the `DataPipeBase` which owns this + // `DataPipeLink`. + std::shared_ptr mMutex; + + ScopedPort mPort; + const RefPtr mShmem; + const uint32_t mCapacity; + const bool mReceiverSide; + + bool mProcessingSegment = false; + + nsresult mPeerStatus = NS_OK; + uint32_t mOffset = 0; + uint32_t mAvailable = 0; + + bool mCallbackClosureOnly = false; + nsCOMPtr mCallback; + nsCOMPtr mCallbackTarget; +}; + +void DataPipeLink::OnPortStatusChanged() { + DataPipeAutoLock lock(*mMutex); + + while (NS_SUCCEEDED(mPeerStatus)) { + UniquePtr message; + if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) { + SetPeerError(lock, NS_BASE_STREAM_CLOSED); + return; + } + if (!message) { + return; // no more messages + } + + PickleIterator iter(*message); + switch (message->type()) { + case DATA_PIPE_CLOSED_MESSAGE_TYPE: { + nsresult status = NS_OK; + if (!ReadParam(message.get(), &iter, &status)) { + NS_WARNING("Unable to parse nsresult error from peer"); + status = NS_ERROR_UNEXPECTED; + } + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Got CLOSED(%s) %s", GetStaticErrorName(status), + Describe(lock).get())); + SetPeerError(lock, status); + return; + } + case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: { + uint32_t consumed = 0; + if (!ReadParam(message.get(), &iter, &consumed)) { + NS_WARNING("Unable to parse bytes consumed from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("Got CONSUMED(%u) %s", consumed, Describe(lock).get())); + auto newAvailable = CheckedUint32{mAvailable} + consumed; + if (!newAvailable.isValid() || newAvailable.value() > mCapacity) { + NS_WARNING("Illegal bytes consumed message received from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + mAvailable = newAvailable.value(); + if (!mCallbackClosureOnly) { + NotifyOnUnlock(lock); + } + break; + } + default: { + NS_WARNING("Illegal message type received from peer"); + SetPeerError(lock, NS_ERROR_UNEXPECTED); + return; + } + } + } +} + +DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError) + : mMutex(std::make_shared(aReceiverSide ? "DataPipeReceiver" + : "DataPipeSender")), + mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {} + +DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort, + SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, + uint32_t aAvailable) + : mMutex(std::make_shared(aReceiverSide ? "DataPipeReceiver" + : "DataPipeSender")), + mStatus(NS_OK), + mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort), aShmem, + aCapacity, aPeerStatus, aOffset, aAvailable)) { + mLink->Init(); +} + +DataPipeBase::~DataPipeBase() { + DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, NS_BASE_STREAM_CLOSED); +} + +void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) { + if (NS_FAILED(mStatus)) { + return; + } + + MOZ_LOG( + gDataPipeLog, LogLevel::Debug, + ("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get())); + + // Set our status to an errored status. + mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus; + RefPtr link = mLink.forget(); + link->NotifyOnUnlock(aLock); + + // If our peer hasn't disappeared yet, clean up our connection to it. + if (NS_SUCCEEDED(link->mPeerStatus)) { + link->SetPeerError(aLock, mStatus, /* aSendClosed */ true); + } +} + +nsresult DataPipeBase::ProcessSegmentsInternal( + uint32_t aCount, ProcessSegmentFun aProcessSegment, + uint32_t* aProcessedCount) { + *aProcessedCount = 0; + + while (*aProcessedCount < aCount) { + DataPipeAutoLock lock(*mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount, + Describe(lock).get())); + + nsresult status = CheckStatus(lock); + if (NS_FAILED(status)) { + if (*aProcessedCount > 0) { + return NS_OK; + } + return status == NS_BASE_STREAM_CLOSED ? NS_OK : status; + } + + if (!mLink->mAvailable) { + MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(mLink->mPeerStatus), + "CheckStatus will have returned an error"); + return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK; + } + + MOZ_RELEASE_ASSERT(!mLink->mProcessingSegment, + "Only one thread may be processing a segment at a time"); + + // Extract an iterator over the next contiguous region of the shared memory + // buffer which will be used . + char* start = static_cast(mLink->mShmem->memory()) + mLink->mOffset; + char* iter = start; + char* end = start + std::min({aCount, mLink->mAvailable, + mLink->mCapacity - mLink->mOffset}); + + // Record the consumed region from our segment when exiting this scope, + // telling our peer how many bytes were consumed. Hold on to `mLink` to keep + // the shmem mapped and make sure we can clean up even if we're closed while + // processing the shmem region. + RefPtr link = mLink; + link->mProcessingSegment = true; + auto scopeExit = MakeScopeExit([&] { + MOZ_RELEASE_ASSERT(link->mProcessingSegment); + link->mProcessingSegment = false; + uint32_t totalProcessed = iter - start; + if (totalProcessed > 0) { + link->mOffset += totalProcessed; + MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity); + if (link->mOffset == link->mCapacity) { + link->mOffset = 0; + } + link->mAvailable -= totalProcessed; + link->SendBytesConsumedOnUnlock(lock, totalProcessed); + } + MOZ_LOG(gDataPipeLog, LogLevel::Verbose, + ("Processed Segment(%u of %zu) %s", totalProcessed, end - start, + Describe(lock).get())); + }); + + { + MutexAutoUnlock unlock(*mMutex); + while (iter < end) { + uint32_t processed = 0; + Span segment{iter, end}; + nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed); + if (NS_FAILED(rv) || processed == 0) { + return NS_OK; + } + + MOZ_RELEASE_ASSERT(processed <= segment.Length()); + iter += processed; + *aProcessedCount += processed; + } + } + } + return NS_OK; +} + +void DataPipeBase::AsyncWaitInternal(already_AddRefed aCallback, + already_AddRefed aTarget, + bool aClosureOnly) { + RefPtr callback = std::move(aCallback); + RefPtr target = std::move(aTarget); + + DataPipeAutoLock lock(*mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)", + callback.get(), Describe(lock).get())); + + if (NS_FAILED(CheckStatus(lock))) { + MOZ_ASSERT_IF(mLink, !mLink->mCallback); + DoNotifyOnUnlock(lock, callback.forget(), target.forget()); + return; + } + + // NOTE: After this point, `mLink` may have previously had a callback which is + // now being cancelled, make sure we clear `mCallback` even if we're going to + // call `aCallback` immediately. + mLink->mCallback = callback.forget(); + mLink->mCallbackTarget = target.forget(); + mLink->mCallbackClosureOnly = aClosureOnly; + if (!aClosureOnly && mLink->mAvailable) { + mLink->NotifyOnUnlock(lock); + } +} + +nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) { + // If our peer has closed or errored, we may need to close our local side to + // reflect the error code our peer provided. If we're a sender, we want to + // become closed immediately, whereas if we're a receiver we want to wait + // until our available buffer has been exhausted. + // + // NOTE: There may still be 2-stage writes/reads ongoing at this point, which + // will continue due to `mLink` being kept alive by the + // `ProcessSegmentsInternal` function. + if (NS_SUCCEEDED(mStatus) && NS_FAILED(mLink->mPeerStatus) && + (!mLink->mReceiverSide || !mLink->mAvailable)) { + CloseInternal(aLock, mLink->mPeerStatus); + } + return mStatus; +} + +nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) { + if (mLink) { + return mLink->Describe(aLock); + } + return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus)); +} + +template +void DataPipeWrite(IPC::Message* aMsg, T* aParam) { + DataPipeAutoLock lock(*aParam->mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Write: %s", aParam->Describe(lock).get())); + + WriteParam(aMsg, aParam->mStatus); + if (NS_FAILED(aParam->mStatus)) { + return; + } + + MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment, + "cannot transfer while processing a segment"); + + // Serialize relevant parameters to our peer. + WriteParam(aMsg, std::move(aParam->mLink->mPort)); + MOZ_ALWAYS_TRUE(aParam->mLink->mShmem->WriteHandle(aMsg)); + WriteParam(aMsg, aParam->mLink->mCapacity); + WriteParam(aMsg, aParam->mLink->mPeerStatus); + WriteParam(aMsg, aParam->mLink->mOffset); + WriteParam(aMsg, aParam->mLink->mAvailable); + + // Mark our peer as closed so we don't try to send to it when closing. + aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED; + aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED); +} + +template +bool DataPipeRead(const IPC::Message* aMsg, PickleIterator* aIter, + RefPtr* aResult) { + nsresult rv = NS_OK; + if (!ReadParam(aMsg, aIter, &rv)) { + NS_WARNING("failed to read status!"); + return false; + } + if (NS_FAILED(rv)) { + *aResult = new T(rv); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Read: [status=%s]", GetStaticErrorName(rv))); + return true; + } + + ScopedPort port; + RefPtr shmem = new SharedMemoryBasic(); + uint32_t capacity = 0; + nsresult peerStatus = NS_OK; + uint32_t offset = 0; + uint32_t available = 0; + if (!ReadParam(aMsg, aIter, &port) || !shmem->ReadHandle(aMsg, aIter) || + !ReadParam(aMsg, aIter, &capacity) || + !ReadParam(aMsg, aIter, &peerStatus) || + !ReadParam(aMsg, aIter, &offset) || !ReadParam(aMsg, aIter, &available)) { + NS_WARNING("failed to read fields!"); + return false; + } + if (!capacity || offset >= capacity || available > capacity) { + NS_WARNING("inconsistent state values"); + return false; + } + if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) { + NS_WARNING("failed to map shared memory"); + return false; + } + + *aResult = + new T(std::move(port), shmem, capacity, peerStatus, offset, available); + if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) { + DataPipeAutoLock lock(*(*aResult)->mMutex); + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("IPC Read: %s", (*aResult)->Describe(lock).get())); + } + return true; +} + +} // namespace data_pipe_detail + +//----------------------------------------------------------------------------- +// DataPipeSender +//----------------------------------------------------------------------------- + +NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream, + DataPipeSender) + +// nsIOutputStream + +NS_IMETHODIMP DataPipeSender::Close() { + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; } + +NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount, + uint32_t* aWriteCount) { + return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount, + aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream, + uint32_t aCount, + uint32_t* aWriteCount) { + return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount, + aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader, + void* aClosure, uint32_t aCount, + uint32_t* aWriteCount) { + auto processSegment = [&](Span aSpan, uint32_t aToOffset, + uint32_t* aReadCount) -> nsresult { + return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), + aReadCount); + }; + return ProcessSegmentsInternal(aCount, processSegment, aWriteCount); +} + +NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) { + *_retval = true; + return NS_OK; +} + +// nsIAsyncOutputStream + +NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, reason); + return NS_OK; +} + +NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) { + AsyncWaitInternal(NS_NewRunnableFunction( + "DataPipeReceiver::AsyncWait", + [self = RefPtr{this}, callback = RefPtr{aCallback}] { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Calling OnOutputStreamReady(%p, %p)", + callback.get(), self.get())); + callback->OnOutputStreamReady(self); + }), + do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); + return NS_OK; +} + +//----------------------------------------------------------------------------- +// DataPipeReceiver +//----------------------------------------------------------------------------- + +NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream, + nsIIPCSerializableInputStream, DataPipeReceiver) + +// nsIInputStream + +NS_IMETHODIMP DataPipeReceiver::Close() { + return CloseWithStatus(NS_BASE_STREAM_CLOSED); +} + +NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + nsresult rv = CheckStatus(lock); + if (NS_FAILED(rv)) { + return rv; + } + *_retval = mLink->mAvailable; + return NS_OK; +} + +NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount, + uint32_t* aReadCount) { + return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount); +} + +NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter, + void* aClosure, uint32_t aCount, + uint32_t* aReadCount) { + auto processSegment = [&](Span aSpan, uint32_t aToOffset, + uint32_t* aWriteCount) -> nsresult { + return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(), + aWriteCount); + }; + return ProcessSegmentsInternal(aCount, processSegment, aReadCount); +} + +NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) { + *_retval = true; + return NS_OK; +} + +// nsIAsyncInputStream + +NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) { + data_pipe_detail::DataPipeAutoLock lock(*mMutex); + CloseInternal(lock, aStatus); + return NS_OK; +} + +NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback, + uint32_t aFlags, + uint32_t aRequestedCount, + nsIEventTarget* aTarget) { + AsyncWaitInternal(NS_NewRunnableFunction( + "DataPipeReceiver::AsyncWait", + [self = RefPtr{this}, callback = RefPtr{aCallback}] { + MOZ_LOG(gDataPipeLog, LogLevel::Debug, + ("Calling OnInputStreamReady(%p, %p)", + callback.get(), self.get())); + callback->OnInputStreamReady(self); + }), + do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY); + return NS_OK; +} + +// nsIIPCSerializableInputStream + +void DataPipeReceiver::Serialize(InputStreamParams& aParams, + FileDescriptorArray& aFileDescriptors, + bool aDelayedStart, uint32_t aMaxSize, + uint32_t* aSizeUsed, + ParentToChildStreamActorManager* aManager) { + *aSizeUsed = 0; + aParams = DataPipeReceiverStreamParams(this); +} + +void DataPipeReceiver::Serialize(InputStreamParams& aParams, + FileDescriptorArray& aFileDescriptors, + bool aDelayedStart, uint32_t aMaxSize, + uint32_t* aSizeUsed, + ChildToParentStreamActorManager* aManager) { + *aSizeUsed = 0; + aParams = DataPipeReceiverStreamParams(this); +} + +bool DataPipeReceiver::Deserialize( + const InputStreamParams& aParams, + const FileDescriptorArray& aFileDescriptors) { + MOZ_CRASH("Handled directly in `DeserializeInputStream`"); +} + +//----------------------------------------------------------------------------- +// NewDataPipe +//----------------------------------------------------------------------------- + +nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender, + DataPipeReceiver** aReceiver) { + if (!aCapacity) { + aCapacity = kDefaultDataPipeCapacity; + } + + RefPtr controller = NodeController::GetSingleton(); + if (!controller) { + return NS_ERROR_ILLEGAL_DURING_SHUTDOWN; + } + + auto [senderPort, receiverPort] = controller->CreatePortPair(); + auto shmem = MakeRefPtr(); + size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity); + if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) { + return NS_ERROR_OUT_OF_MEMORY; + } + + RefPtr sender = new DataPipeSender(std::move(senderPort), shmem, aCapacity, + NS_OK, 0, aCapacity); + RefPtr receiver = new DataPipeReceiver(std::move(receiverPort), shmem, + aCapacity, NS_OK, 0, 0); + sender.forget(aSender); + receiver.forget(aReceiver); + return NS_OK; +} + +} // namespace ipc +} // namespace mozilla + +void IPC::ParamTraits::Write( + Message* aMsg, mozilla::ipc::DataPipeSender* aParam) { + mozilla::ipc::data_pipe_detail::DataPipeWrite(aMsg, aParam); +} + +bool IPC::ParamTraits::Read( + const Message* aMsg, PickleIterator* aIter, + RefPtr* aResult) { + return mozilla::ipc::data_pipe_detail::DataPipeRead(aMsg, aIter, aResult); +} + +void IPC::ParamTraits::Write( + Message* aMsg, mozilla::ipc::DataPipeReceiver* aParam) { + mozilla::ipc::data_pipe_detail::DataPipeWrite(aMsg, aParam); +} + +bool IPC::ParamTraits::Read( + const Message* aMsg, PickleIterator* aIter, + RefPtr* aResult) { + return mozilla::ipc::data_pipe_detail::DataPipeRead(aMsg, aIter, aResult); +} diff --git a/ipc/glue/DataPipe.h b/ipc/glue/DataPipe.h new file mode 100644 index 000000000000..fa2c2336b7d6 --- /dev/null +++ b/ipc/glue/DataPipe.h @@ -0,0 +1,181 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#ifndef mozilla_ipc_DataPipe_h +#define mozilla_ipc_DataPipe_h + +#include "mozilla/ipc/SharedMemoryBasic.h" +#include "mozilla/ipc/NodeController.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsIIPCSerializableInputStream.h" +#include "nsISupports.h" + +namespace mozilla { +namespace ipc { + +namespace data_pipe_detail { + +class DataPipeAutoLock; +class DataPipeLink; + +class DataPipeBase { + public: + DataPipeBase(const DataPipeBase&) = delete; + DataPipeBase& operator=(const DataPipeBase&) = delete; + + protected: + explicit DataPipeBase(bool aReceiverSide, nsresult aError); + DataPipeBase(bool aReceiverSide, ScopedPort aPort, SharedMemory* aShmem, + uint32_t aCapacity, nsresult aPeerStatus, uint32_t aOffset, + uint32_t aAvailable); + + void CloseInternal(DataPipeAutoLock&, nsresult aStatus); + + void AsyncWaitInternal(already_AddRefed aCallback, + already_AddRefed aTarget, + bool aClosureOnly); + + // Like `nsWriteSegmentFun` or `nsReadSegmentFun`. + using ProcessSegmentFun = + FunctionRef aSpan, uint32_t aProcessedThisCall, + uint32_t* aProcessedCount)>; + nsresult ProcessSegmentsInternal(uint32_t aCount, + ProcessSegmentFun aProcessSegment, + uint32_t* aProcessedCount); + + nsresult CheckStatus(DataPipeAutoLock&); + + nsCString Describe(DataPipeAutoLock&); + + virtual ~DataPipeBase(); + + const std::shared_ptr mMutex; + nsresult mStatus = NS_OK; + RefPtr mLink; +}; + +template +void DataPipeWrite(IPC::Message* aMsg, T* aParam); + +template +bool DataPipeRead(const IPC::Message* aMsg, PickleIterator* aIter, + RefPtr* aResult); + +} // namespace data_pipe_detail + +class DataPipeSender; +class DataPipeReceiver; + +#define NS_DATAPIPESENDER_IID \ + { \ + 0x6698ed77, 0x9fff, 0x425d, { \ + 0xb0, 0xa6, 0x1d, 0x30, 0x66, 0xee, 0xb8, 0x16 \ + } \ + } + +// Helper class for streaming data to another process. +class DataPipeSender final : public nsIAsyncOutputStream, + public data_pipe_detail::DataPipeBase { + public: + NS_DECLARE_STATIC_IID_ACCESSOR(NS_DATAPIPESENDER_IID) + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIOUTPUTSTREAM + NS_DECL_NSIASYNCOUTPUTSTREAM + + private: + friend nsresult NewDataPipe(uint32_t, DataPipeSender**, DataPipeReceiver**); + friend void data_pipe_detail::DataPipeWrite( + IPC::Message* aMsg, DataPipeSender* aParam); + friend bool data_pipe_detail::DataPipeRead( + const IPC::Message* aMsg, PickleIterator* aIter, + RefPtr* aResult); + + explicit DataPipeSender(nsresult aError) + : data_pipe_detail::DataPipeBase(/* aReceiverSide */ false, aError) {} + DataPipeSender(ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable) + : data_pipe_detail::DataPipeBase(/* aReceiverSide */ false, + std::move(aPort), aShmem, aCapacity, + aPeerStatus, aOffset, aAvailable) {} + + ~DataPipeSender() = default; +}; + +NS_DEFINE_STATIC_IID_ACCESSOR(DataPipeSender, NS_DATAPIPESENDER_IID) + +#define NS_DATAPIPERECEIVER_IID \ + { \ + 0x0a185f83, 0x499e, 0x450c, { \ + 0x95, 0x82, 0x27, 0x67, 0xad, 0x6d, 0x64, 0xb5 \ + } \ + } + +// Helper class for streaming data from another process. +class DataPipeReceiver final : public nsIAsyncInputStream, + public nsIIPCSerializableInputStream, + public data_pipe_detail::DataPipeBase { + public: + NS_DECLARE_STATIC_IID_ACCESSOR(NS_DATAPIPERECEIVER_IID) + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIINPUTSTREAM + NS_DECL_NSIASYNCINPUTSTREAM + NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM + + private: + friend nsresult NewDataPipe(uint32_t, DataPipeSender**, DataPipeReceiver**); + friend void data_pipe_detail::DataPipeWrite( + IPC::Message* aMsg, DataPipeReceiver* aParam); + friend bool data_pipe_detail::DataPipeRead( + const IPC::Message* aMsg, PickleIterator* aIter, + RefPtr* aResult); + + explicit DataPipeReceiver(nsresult aError) + : data_pipe_detail::DataPipeBase(/* aReceiverSide */ true, aError) {} + DataPipeReceiver(ScopedPort aPort, SharedMemory* aShmem, uint32_t aCapacity, + nsresult aPeerStatus, uint32_t aOffset, uint32_t aAvailable) + : data_pipe_detail::DataPipeBase(/* aReceiverSide */ true, + std::move(aPort), aShmem, aCapacity, + aPeerStatus, aOffset, aAvailable) {} + + ~DataPipeReceiver() = default; +}; + +NS_DEFINE_STATIC_IID_ACCESSOR(DataPipeReceiver, NS_DATAPIPERECEIVER_IID) + +constexpr uint32_t kDefaultDataPipeCapacity = 64 * 1024; + +/** + * Create a new DataPipe pair. The sender and receiver ends of the pipe may be + * used to transfer data between processes. |aCapacity| is the capacity of the + * underlying ring buffer. If `0` is passed, `kDefaultDataPipeCapacity` will be + * used. + */ +nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender, + DataPipeReceiver** aReceiver); + +} // namespace ipc +} // namespace mozilla + +namespace IPC { + +template <> +struct ParamTraits { + static void Write(Message* aMsg, mozilla::ipc::DataPipeSender* aParam); + static bool Read(const Message* aMsg, PickleIterator* aIter, + RefPtr* aResult); +}; + +template <> +struct ParamTraits { + static void Write(Message* aMsg, mozilla::ipc::DataPipeReceiver* aParam); + static bool Read(const Message* aMsg, PickleIterator* aIter, + RefPtr* aResult); +}; + +} // namespace IPC + +#endif // mozilla_ipc_DataPipe_h diff --git a/ipc/glue/InputStreamParams.ipdlh b/ipc/glue/InputStreamParams.ipdlh index c7208eaf7f86..e8ca6d59d448 100644 --- a/ipc/glue/InputStreamParams.ipdlh +++ b/ipc/glue/InputStreamParams.ipdlh @@ -9,6 +9,7 @@ include protocol PParentToChildStream; include protocol PRemoteLazyInputStream; using struct mozilla::void_t from "mozilla/ipc/IPCCore.h"; +[RefCounted] using class DataPipeReceiver from "mozilla/ipc/DataPipe.h"; namespace mozilla { namespace ipc { @@ -80,6 +81,11 @@ struct IPCRemoteStreamParams int64_t length; }; +struct DataPipeReceiverStreamParams +{ + DataPipeReceiver pipe; +}; + union InputStreamParams { StringInputStreamParams; @@ -92,6 +98,7 @@ union InputStreamParams InputStreamLengthWrapperParams; IPCRemoteStreamParams; EncryptedFileInputStreamParams; + DataPipeReceiverStreamParams; }; struct EncryptedFileInputStreamParams diff --git a/ipc/glue/InputStreamUtils.cpp b/ipc/glue/InputStreamUtils.cpp index ed650b0072a2..1f7e979b6e3a 100644 --- a/ipc/glue/InputStreamUtils.cpp +++ b/ipc/glue/InputStreamUtils.cpp @@ -12,6 +12,7 @@ #include "mozilla/dom/File.h" #include "mozilla/dom/quota/DecryptingInputStream_impl.h" #include "mozilla/dom/quota/IPCStreamCipherStrategy.h" +#include "mozilla/ipc/DataPipe.h" #include "mozilla/ipc/IPCStreamDestination.h" #include "mozilla/ipc/IPCStreamSource.h" #include "mozilla/InputStreamLengthHelper.h" @@ -234,6 +235,9 @@ void InputStreamHelper::PostSerializationActivation(InputStreamParams& aParams, return; } + case InputStreamParams::TDataPipeReceiverStreamParams: + break; + case InputStreamParams::TStringInputStreamParams: break; @@ -318,6 +322,12 @@ already_AddRefed InputStreamHelper::DeserializeInputStream( return destinationStream->TakeReader(); } + if (aParams.type() == InputStreamParams::TDataPipeReceiverStreamParams) { + const DataPipeReceiverStreamParams& pipeParams = + aParams.get_DataPipeReceiverStreamParams(); + return do_AddRef(pipeParams.pipe()); + } + nsCOMPtr serializable; switch (aParams.type()) { diff --git a/ipc/glue/ProtocolUtils.h b/ipc/glue/ProtocolUtils.h index 603b9b7ff91c..b6415d0679ba 100644 --- a/ipc/glue/ProtocolUtils.h +++ b/ipc/glue/ProtocolUtils.h @@ -55,6 +55,10 @@ namespace { // protocol 0. Oops! We can get away with this until protocol 0 // starts approaching its 65,536th message. enum { + // Message types used by DataPipe + DATA_PIPE_CLOSED_MESSAGE_TYPE = kuint16max - 18, + DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE = kuint16max - 17, + // Message types used by NodeChannel ACCEPT_INVITE_MESSAGE_TYPE = kuint16max - 16, REQUEST_INTRODUCTION_MESSAGE_TYPE = kuint16max - 15, diff --git a/ipc/glue/moz.build b/ipc/glue/moz.build index adc376b5ce1f..bf5c6f2f96f9 100644 --- a/ipc/glue/moz.build +++ b/ipc/glue/moz.build @@ -23,6 +23,7 @@ EXPORTS.mozilla.ipc += [ "CrashReporterHost.h", "CrossProcessMutex.h", "CrossProcessSemaphore.h", + "DataPipe.h", "Endpoint.h", "EnvironmentMap.h", "FileDescriptor.h", @@ -158,6 +159,7 @@ UNIFIED_SOURCES += [ "BrowserProcessSubThread.cpp", "CrashReporterClient.cpp", "CrashReporterHost.cpp", + "DataPipe.cpp", "Endpoint.cpp", "FileDescriptor.cpp", "FileDescriptorUtils.cpp", diff --git a/ipc/gtest/TestDataPipe.cpp b/ipc/gtest/TestDataPipe.cpp new file mode 100644 index 000000000000..221c011f1a84 --- /dev/null +++ b/ipc/gtest/TestDataPipe.cpp @@ -0,0 +1,248 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=8 sts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "chrome/common/ipc_message.h" +#include "gtest/gtest.h" + +#include "mozilla/ipc/DataPipe.h" +#include "nsIAsyncInputStream.h" +#include "nsIAsyncOutputStream.h" +#include "nsStreamUtils.h" + +namespace mozilla::ipc { + +namespace { + +struct InputStreamCallback : public nsIInputStreamCallback { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + explicit InputStreamCallback( + std::function aFunc = nullptr) + : mFunc(std::move(aFunc)) {} + + NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override { + MOZ_ALWAYS_FALSE(mCalled.exchange(true)); + return mFunc ? mFunc(aStream) : NS_OK; + } + + bool Called() const { return mCalled; } + + private: + virtual ~InputStreamCallback() = default; + + std::atomic mCalled = false; + std::function mFunc; +}; + +NS_IMPL_ISUPPORTS(InputStreamCallback, nsIInputStreamCallback) + +struct OutputStreamCallback : public nsIOutputStreamCallback { + public: + NS_DECL_THREADSAFE_ISUPPORTS + + explicit OutputStreamCallback( + std::function aFunc = nullptr) + : mFunc(std::move(aFunc)) {} + + NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override { + MOZ_ALWAYS_FALSE(mCalled.exchange(true)); + return mFunc ? mFunc(aStream) : NS_OK; + } + + bool Called() const { return mCalled; } + + private: + virtual ~OutputStreamCallback() = default; + + std::atomic mCalled = false; + std::function mFunc; +}; + +NS_IMPL_ISUPPORTS(OutputStreamCallback, nsIOutputStreamCallback) + +// Populate an array with the given number of bytes. Data is lorem ipsum +// random text, but deterministic across multiple calls. +void CreateData(uint32_t aNumBytes, nsCString& aDataOut) { + static const char data[] = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec egestas " + "purus eu condimentum iaculis. In accumsan leo eget odio porttitor, non " + "rhoncus nulla vestibulum. Etiam lacinia consectetur nisl nec " + "sollicitudin. Sed fringilla accumsan diam, pulvinar varius massa. Duis " + "mollis dignissim felis, eget tempus nisi tristique ut. Fusce euismod, " + "lectus non lacinia tempor, tellus diam suscipit quam, eget hendrerit " + "lacus nunc fringilla ante. Sed ultrices massa vitae risus molestie, ut " + "finibus quam laoreet nullam."; + static const uint32_t dataLength = sizeof(data) - 1; + + aDataOut.SetCapacity(aNumBytes); + + while (aNumBytes > 0) { + uint32_t amount = std::min(dataLength, aNumBytes); + aDataOut.Append(data, amount); + aNumBytes -= amount; + } +} + +// Synchronously consume the given input stream and validate the resulting data +// against the given string of expected values. +void ConsumeAndValidateStream(nsIInputStream* aStream, + const nsACString& aExpectedData) { + nsAutoCString outputData; + nsresult rv = NS_ConsumeStream(aStream, UINT32_MAX, outputData); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + ASSERT_EQ(aExpectedData.Length(), outputData.Length()); + ASSERT_TRUE(aExpectedData.Equals(outputData)); +} + +} // namespace + +TEST(DataPipe, SegmentedReadWrite) +{ + RefPtr reader; + RefPtr writer; + + nsresult rv = + NewDataPipe(1024, getter_AddRefs(writer), getter_AddRefs(reader)); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + nsCString inputData1; + CreateData(512, inputData1); + + uint32_t numWritten = 0; + rv = writer->Write(inputData1.BeginReading(), inputData1.Length(), + &numWritten); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + EXPECT_EQ(numWritten, 512u); + + uint64_t available = 0; + rv = reader->Available(&available); + EXPECT_EQ(available, 512u); + ConsumeAndValidateStream(reader, inputData1); + + nsCString inputData2; + CreateData(1024, inputData2); + + rv = writer->Write(inputData2.BeginReading(), inputData2.Length(), + &numWritten); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + EXPECT_EQ(numWritten, 1024u); + + rv = reader->Available(&available); + EXPECT_EQ(available, 1024u); + ConsumeAndValidateStream(reader, inputData2); +} + +TEST(DataPipe, Write_AsyncWait) +{ + RefPtr reader; + RefPtr writer; + + const uint32_t segmentSize = 1024; + + nsresult rv = + NewDataPipe(segmentSize, getter_AddRefs(writer), getter_AddRefs(reader)); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + nsCString inputData; + CreateData(segmentSize, inputData); + + uint32_t numWritten = 0; + rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + EXPECT_EQ(numWritten, segmentSize); + + rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten); + ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); + + RefPtr cb = new OutputStreamCallback(); + + rv = writer->AsyncWait(cb, 0, 0, GetCurrentSerialEventTarget()); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + NS_ProcessPendingEvents(nullptr); + + ASSERT_FALSE(cb->Called()); + + ConsumeAndValidateStream(reader, inputData); + + ASSERT_FALSE(cb->Called()); + + NS_ProcessPendingEvents(nullptr); + + ASSERT_TRUE(cb->Called()); +} + +TEST(DataPipe, Read_AsyncWait) +{ + RefPtr reader; + RefPtr writer; + + const uint32_t segmentSize = 1024; + + nsresult rv = + NewDataPipe(segmentSize, getter_AddRefs(writer), getter_AddRefs(reader)); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + nsCString inputData; + CreateData(segmentSize, inputData); + + RefPtr cb = new InputStreamCallback(); + + rv = reader->AsyncWait(cb, 0, 0, GetCurrentSerialEventTarget()); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + NS_ProcessPendingEvents(nullptr); + + ASSERT_FALSE(cb->Called()); + + uint32_t numWritten = 0; + rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + ASSERT_FALSE(cb->Called()); + + NS_ProcessPendingEvents(nullptr); + + ASSERT_TRUE(cb->Called()); + + ConsumeAndValidateStream(reader, inputData); +} + +TEST(DataPipe, SerializeReader) +{ + RefPtr reader; + RefPtr writer; + nsresult rv = + NewDataPipe(1024, getter_AddRefs(writer), getter_AddRefs(reader)); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + IPC::Message msg(MSG_ROUTING_NONE, 0); + WriteParam(&msg, reader); + + uint64_t available = 0; + rv = reader->Available(&available); + ASSERT_TRUE(NS_FAILED(rv)); + + nsCString inputData; + CreateData(512, inputData); + + uint32_t numWritten = 0; + rv = writer->Write(inputData.BeginReading(), inputData.Length(), &numWritten); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + + RefPtr reader2; + PickleIterator iter(msg); + ASSERT_TRUE(ReadParam(&msg, &iter, &reader2)); + ASSERT_TRUE(reader2); + + rv = reader2->Available(&available); + ASSERT_TRUE(NS_SUCCEEDED(rv)); + ASSERT_EQ(available, 512u); + ConsumeAndValidateStream(reader2, inputData); +} + +} // namespace mozilla::ipc diff --git a/ipc/gtest/moz.build b/ipc/gtest/moz.build index f1f7fb595ebf..903f021b274e 100644 --- a/ipc/gtest/moz.build +++ b/ipc/gtest/moz.build @@ -7,6 +7,7 @@ Library("ipctest") SOURCES += [ + "TestDataPipe.cpp", "TestLogging.cpp", "TestSharedMemory.cpp", ] diff --git a/ipc/ipdl/ipdl.py b/ipc/ipdl/ipdl.py index 741e8c202c34..07e043343ee6 100644 --- a/ipc/ipdl/ipdl.py +++ b/ipc/ipdl/ipdl.py @@ -271,6 +271,10 @@ for protocol in sorted(allmessages.keys()): print( """ + case DATA_PIPE_CLOSED_MESSAGE_TYPE: + return "DATA_PIPE_CLOSED_MESSAGE"; + case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: + return "DATA_PIPE_BYTES_CONSUMED_MESSAGE"; case ACCEPT_INVITE_MESSAGE_TYPE: return "ACCEPT_INVITE_MESSAGE"; case REQUEST_INTRODUCTION_MESSAGE_TYPE: