Bug 1811958: Implement IncomingUnidirectionalStreams and ReadableStreams supplied by it r=saschanaz,webidl,smaug

Differential Revision: https://phabricator.services.mozilla.com/D167619
This commit is contained in:
Randell Jesup 2023-02-23 17:12:27 +00:00
Родитель 05a8191c9c
Коммит 9135a7e404
15 изменённых файлов: 524 добавлений и 157 удалений

Просмотреть файл

@ -218,6 +218,9 @@ class WritableStream : public nsISupports, public nsWrapperCache {
already_AddRefed<WritableStreamDefaultWriter> GetWriter(ErrorResult& aRv);
protected:
nsCOMPtr<nsIGlobalObject> mGlobal;
// Internal Slots:
private:
bool mBackpressure = false;
@ -238,7 +241,6 @@ class WritableStream : public nsISupports, public nsWrapperCache {
RefPtr<WritableStreamDefaultWriter> mWriter;
nsTArray<RefPtr<Promise>> mWriteRequests;
nsCOMPtr<nsIGlobalObject> mGlobal;
HoldDropJSObjectsCaller mHoldDropCaller;
};

Просмотреть файл

@ -3,6 +3,14 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/* https://w3c.github.io/webtransport/#send-stream */
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportSendStream : WritableStream {
Promise<WebTransportSendStreamStats> getStats();
};
/* https://w3c.github.io/webtransport/#send-stream-stats */
dictionary WebTransportSendStreamStats {
@ -12,6 +20,13 @@ dictionary WebTransportSendStreamStats {
unsigned long long bytesAcknowledged;
};
/* https://w3c.github.io/webtransport/#receive-stream */
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportReceiveStream : ReadableStream {
Promise<WebTransportReceiveStreamStats> getStats();
};
/* https://w3c.github.io/webtransport/#receive-stream-stats */
dictionary WebTransportReceiveStreamStats {
@ -20,26 +35,10 @@ dictionary WebTransportReceiveStreamStats {
unsigned long long bytesRead;
};
/* https://w3c.github.io/webtransport/#receive-stream */
/* https://w3c.github.io/webtransport/#send-stream */
/*
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportSendStream : WritableStream {
Promise<WebTransportSendStreamStats> getStats();
};
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportReceiveStream : ReadableStream {
Promise<WebTransportReceiveStreamStats> getStats();
};
*/
/* https://w3c.github.io/webtransport/#bidirectional-stream */
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportBidirectionalStream {
// XXX spec says these should be WebTransportReceiveStream and WebTransportSendStream
readonly attribute ReadableStream readable;
readonly attribute WritableStream writable;
readonly attribute WebTransportReceiveStream readable;
readonly attribute WebTransportSendStream writable;
};

Просмотреть файл

@ -9,6 +9,7 @@
#include "mozilla/RefPtr.h"
#include "nsUTF8Utils.h"
#include "nsIURL.h"
#include "nsIWebTransportStream.h"
#include "mozilla/Assertions.h"
#include "mozilla/dom/DOMExceptionBinding.h"
#include "mozilla/dom/Promise.h"
@ -17,7 +18,6 @@
#include "mozilla/dom/ReadableStreamDefaultController.h"
#include "mozilla/dom/WebTransportDatagramDuplexStream.h"
#include "mozilla/dom/WebTransportError.h"
#include "mozilla/dom/WebTransportStreams.h"
#include "mozilla/dom/WebTransportLog.h"
#include "mozilla/dom/WritableStream.h"
#include "mozilla/ipc/BackgroundChild.h"
@ -26,11 +26,36 @@
namespace mozilla::dom {
NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(WebTransport, mGlobal,
mIncomingUnidirectionalStreams,
mIncomingBidirectionalStreams,
mSendStreams, mReceiveStreams, mDatagrams,
mReady, mClosed)
NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE_CLASS(WebTransport)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(WebTransport)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mGlobal)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mIncomingUnidirectionalStreams)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mIncomingBidirectionalStreams)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mIncomingUnidirectionalAlgorithm)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mIncomingBidirectionalAlgorithm)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mSendStreams)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReceiveStreams)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mDatagrams)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReady)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mClosed)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(WebTransport)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mGlobal)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mIncomingUnidirectionalStreams)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mIncomingBidirectionalStreams)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mIncomingUnidirectionalAlgorithm)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mIncomingBidirectionalAlgorithm)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mSendStreams)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mReceiveStreams)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mDatagrams)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mReady)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mClosed)
if (tmp->mChild) {
tmp->mChild->Shutdown(false);
tmp->mChild = nullptr;
}
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
NS_IMPL_CYCLE_COLLECTING_ADDREF(WebTransport)
NS_IMPL_CYCLE_COLLECTING_RELEASE(WebTransport)
@ -64,16 +89,28 @@ WebTransport::~WebTransport() {
void WebTransport::NewBidirectionalStream(
const RefPtr<mozilla::ipc::DataPipeReceiver>& aIncoming,
const RefPtr<mozilla::ipc::DataPipeSender>& aOutgoing) {
LOG_VERBOSE(("NewUnidirectionalStream()"));
// XXX
}
void WebTransport::NewUnidirectionalStream(
const RefPtr<mozilla::ipc::DataPipeReceiver>& aStream) {
LOG_VERBOSE(("NewUnidirectionalStream()"));
// Create a Unidirectional stream and push it into the
// IncomingUnidirectionalStreams stream. Must be added to the ReceiveStreams
// array
// RefPtr<ReadableStream> stream = CreateReadableByteStream(cx, global,
// algorithm, aRV);
mUnidirectionalStreams.Push(aStream);
// Notify something to wake up readers of IncomingReceiveStreams
// The callback is always set/used from the same thread (MainThread or a
// Worker thread).
if (mIncomingUnidirectionalAlgorithm) {
RefPtr<WebTransportIncomingStreamsAlgorithms> callback =
mIncomingUnidirectionalAlgorithm;
callback->NotifyIncomingStream();
}
}
// WebIDL Boilerplate
@ -92,6 +129,7 @@ already_AddRefed<WebTransport> WebTransport::Constructor(
const GlobalObject& aGlobal, const nsAString& aURL,
const WebTransportOptions& aOptions, ErrorResult& aError) {
LOG(("Creating WebTransport for %s", NS_ConvertUTF16toUTF8(aURL).get()));
// https://w3c.github.io/webtransport/#webtransport-constructor
nsCOMPtr<nsIGlobalObject> global = do_QueryInterface(aGlobal.GetAsSupports());
RefPtr<WebTransport> result = new WebTransport(global);
@ -100,6 +138,7 @@ already_AddRefed<WebTransport> WebTransport::Constructor(
return nullptr;
}
// Step 25 Return transport
return result.forget();
}
@ -215,18 +254,14 @@ void WebTransport::Init(const GlobalObject& aGlobal, const nsAString& aURL,
// We set the global from the aGlobalObject parameter of the constructor, so
// it must still be set here.
const nsCOMPtr<nsIGlobalObject> global(mGlobal);
// Used to implement the "wait until" aspects of the pull algorithm
mIncomingBidirectionalPromise = Promise::Create(mGlobal, aError);
if (NS_WARN_IF(aError.Failed())) {
return;
}
mIncomingBidirectionalAlgorithm = new WebTransportIncomingStreamsAlgorithms(
WebTransportIncomingStreamsAlgorithms::StreamType::Bidirectional, this);
RefPtr<WebTransportIncomingStreamsAlgorithms> algorithm =
new WebTransportIncomingStreamsAlgorithms(mIncomingBidirectionalPromise,
false, this);
mIncomingBidirectionalAlgorithm;
mIncomingBidirectionalStreams = ReadableStream::CreateNative(
cx, global, *algorithm, Some(0.0), nullptr, aError); // XXX
cx, global, *algorithm, Some(0.0), nullptr, aError);
if (aError.Failed()) {
return;
}
@ -236,15 +271,10 @@ void WebTransport::Init(const GlobalObject& aGlobal, const nsAString& aURL,
// pullAlgorithm set to pullUnidirectionalStreamAlgorithm, and highWaterMark
// set to 0.
// Used to implement the "wait until" aspects of the pull algorithm
mIncomingUnidirectionalPromise = Promise::Create(mGlobal, aError);
if (NS_WARN_IF(aError.Failed())) {
return;
}
algorithm = new WebTransportIncomingStreamsAlgorithms(
mIncomingUnidirectionalPromise, true, this);
mIncomingUnidirectionalAlgorithm = new WebTransportIncomingStreamsAlgorithms(
WebTransportIncomingStreamsAlgorithms::StreamType::Unidirectional, this);
algorithm = mIncomingUnidirectionalAlgorithm;
mIncomingUnidirectionalStreams = ReadableStream::CreateNative(
cx, global, *algorithm, Some(0.0), nullptr, aError);
if (aError.Failed()) {
@ -274,13 +304,14 @@ void WebTransport::Init(const GlobalObject& aGlobal, const nsAString& aURL,
nsresult rv = aResult.IsReject()
? NS_ERROR_FAILURE
: Get<0>(aResult.ResolveValue());
LOG(("isreject: %d nsresult 0x%x", aResult.IsReject(),
(uint32_t)rv));
if (NS_FAILED(rv)) {
self->RejectWaitingConnection(rv);
} else {
// This will process anything waiting for the connection to
// complete;
// Step 25 Return transport
self->ResolveWaitingConnection(
static_cast<WebTransportReliabilityMode>(
Get<1>(aResult.ResolveValue())),
@ -293,28 +324,46 @@ void WebTransport::ResolveWaitingConnection(
WebTransportReliabilityMode aReliability, WebTransportChild* aChild) {
LOG(("Resolved Connection %p, reliability = %u", this,
(unsigned)aReliability));
MOZ_ASSERT(mState == WebTransportState::CONNECTING);
// https://w3c.github.io/webtransport/#webtransport-constructor
// Step 17 of initialize WebTransport over HTTP
// Step 17.1 If transport.[[State]] is not "connecting":
if (mState != WebTransportState::CONNECTING) {
// Step 17.1.1: In parallel, terminate session.
// Step 17.1.2: abort these steps
return;
}
mChild = aChild;
// Step 17.2: Set transport.[[State]] to "connected".
mState = WebTransportState::CONNECTED;
// Step 17.3: Set transport.[[Session]] to session.
// Step 17.4: Set transports [[Reliability]] to "supports-unreliable".
mReliability = aReliability;
mReady->MaybeResolve(true);
// Step 17.5: Resolve transport.[[Ready]] with undefined.
mReady->MaybeResolveWithUndefined();
}
void WebTransport::RejectWaitingConnection(nsresult aRv) {
LOG(("Reject Connection %p", this));
MOZ_ASSERT(mState == WebTransportState::CONNECTING);
mState = WebTransportState::FAILED;
LOG(("Rejected connection %x", (uint32_t)aRv));
LOG(("Rejected connection %p %x", this, (uint32_t)aRv));
// https://w3c.github.io/webtransport/#webtransport-constructor
// (initialize WebTransport over HTTP)
// https://w3c.github.io/webtransport/#webtransport-internal-slots
// "Reliability returns "pending" until a connection is established" so
// we leave it pending
mReady->MaybeReject(aRv);
// This will abort any pulls for IncomingBidirectional/UnidirectionalStreams
mIncomingBidirectionalPromise->MaybeResolveWithUndefined();
mIncomingUnidirectionalPromise->MaybeResolveWithUndefined();
// Step 14: If the previous step fails, abort the remaining steps and
// queue a network task with transport to run these steps:
// Step 14.1: If transport.[[State]] is "closed" or "failed", then abort
// these steps.
if (mState == WebTransportState::CLOSED ||
mState == WebTransportState::FAILED) {
return;
}
// Step 14.2: Let error be the result of creating a WebTransportError with
// "session".
RefPtr<WebTransportError> error = new WebTransportError(
"WebTransport session rejected"_ns, WebTransportErrorSource::Session);
// Step 14.3: Cleanup transport with error.
ErrorResult errorresult;
Cleanup(error, nullptr, errorresult);
// We never set mChild, so we aren't holding a reference that blocks GC
// (spec 5.8)
@ -374,7 +423,6 @@ void WebTransport::Close(const WebTransportCloseInfo& aOptions,
mState == WebTransportState::FAILED) {
return;
}
MOZ_ASSERT(mChild);
// Step 3: If transport.[[State]] is "connecting":
if (mState == WebTransportState::CONNECTING) {
// Step 3.1: Let error be the result of creating a WebTransportError with
@ -385,9 +433,11 @@ void WebTransport::Close(const WebTransportCloseInfo& aOptions,
// Step 3.2: Cleanup transport with error.
Cleanup(error, nullptr, aRv);
// Step 3.3: Abort these steps.
MOZ_ASSERT(!mChild);
return;
}
LOG(("Sending Close"));
MOZ_ASSERT(mChild);
// Step 4: Let session be transport.[[Session]].
// Step 5: Let code be closeInfo.closeCode.
// Step 6: "Let reasonString be the maximal code unit prefix of
@ -422,6 +472,7 @@ void WebTransport::Close(const WebTransportCloseInfo& aOptions,
// in our destructor.
// This also causes IPC to drop the reference to us, allowing us to be
// GC'd (spec 5.8)
// Cleanup() clears the algorithm member vars
mChild->Shutdown();
mChild = nullptr;
}
@ -470,9 +521,9 @@ void WebTransport::Cleanup(WebTransportError* aError,
// transport.[[IncomingUnidirectionalStreams]].
// Step 7: Set transport.[[SendStreams]] to an empty set.
// Step 8: Set transport.[[ReceiveStreams]] to an empty set.
nsTArray<RefPtr<WritableStream>> sendStreams;
nsTArray<RefPtr<WebTransportSendStream>> sendStreams;
sendStreams.SwapElements(mSendStreams);
nsTArray<RefPtr<ReadableStream>> receiveStreams;
nsTArray<RefPtr<WebTransportReceiveStream>> receiveStreams;
receiveStreams.SwapElements(mReceiveStreams);
// Step 9: If closeInfo is given, then set transport.[[State]] to "closed".
@ -526,9 +577,9 @@ void WebTransport::Cleanup(WebTransportError* aError,
// 13.4: Error incomingUnidirectionalStreams with error
mIncomingUnidirectionalStreams->ErrorNative(cx, errorValue, IgnoreErrors());
}
// abort any pending pulls from Incoming*Streams (not in spec)
mIncomingUnidirectionalPromise->MaybeResolveWithUndefined();
mIncomingBidirectionalPromise->MaybeResolveWithUndefined();
// Let go of the algorithms
mIncomingBidirectionalAlgorithm = nullptr;
mIncomingUnidirectionalAlgorithm = nullptr;
}
} // namespace mozilla::dom

Просмотреть файл

@ -8,11 +8,15 @@
#define DOM_WEBTRANSPORT_API_WEBTRANSPORT__H_
#include "nsCOMPtr.h"
#include "nsDeque.h"
#include "nsISupports.h"
#include "nsWrapperCache.h"
#include "mozilla/dom/Promise.h"
#include "mozilla/dom/WebTransportBinding.h"
#include "mozilla/dom/WebTransportChild.h"
#include "mozilla/dom/WebTransportSendStream.h"
#include "mozilla/dom/WebTransportReceiveStream.h"
#include "mozilla/dom/WebTransportStreams.h"
#include "mozilla/ipc/DataPipe.h"
namespace mozilla::dom {
@ -25,6 +29,9 @@ class WritableStream;
class WebTransport final : public nsISupports, public nsWrapperCache {
friend class WebTransportIncomingStreamsAlgorithms;
// For mSendStreams/mReceiveStreams
friend class WebTransportSendStream;
friend class WebTransportReceiveStream;
public:
explicit WebTransport(nsIGlobalObject* aGlobal);
@ -107,14 +114,22 @@ class WebTransport final : public nsISupports, public nsWrapperCache {
// each sendStream in sendStreams, error sendStream with error."
// XXX Use nsTArray.h for now, but switch to OrderHashSet/Table for release to
// improve remove performance (if needed)
nsTArray<RefPtr<WritableStream>> mSendStreams;
nsTArray<RefPtr<ReadableStream>> mReceiveStreams;
nsTArray<RefPtr<WebTransportSendStream>> mSendStreams;
nsTArray<RefPtr<WebTransportReceiveStream>> mReceiveStreams;
WebTransportState mState;
RefPtr<Promise> mReady;
RefPtr<Promise> mIncomingUnidirectionalPromise;
RefPtr<Promise> mIncomingBidirectionalPromise;
// XXX may not need to be a RefPtr, since we own it through the Streams
RefPtr<WebTransportIncomingStreamsAlgorithms> mIncomingBidirectionalAlgorithm;
RefPtr<WebTransportIncomingStreamsAlgorithms>
mIncomingUnidirectionalAlgorithm;
WebTransportReliabilityMode mReliability;
// Incoming streams get queued here
nsRefPtrDeque<mozilla::ipc::DataPipeReceiver> mUnidirectionalStreams;
nsDeque<Tuple<RefPtr<mozilla::ipc::DataPipeReceiver>,
RefPtr<mozilla::ipc::DataPipeSender>>>
mBidirectionalStreams;
// These are created in the constructor
RefPtr<ReadableStream> mIncomingUnidirectionalStreams;
RefPtr<ReadableStream> mIncomingBidirectionalStreams;

Просмотреть файл

@ -35,11 +35,8 @@ class WebTransportBidirectionalStream final : public nsISupports,
JS::Handle<JSObject*> aGivenProto) override;
// WebIDL Interface
// XXX spec says these should be WebTransportReceiveStream and
// WebTransportSendStream
// XXX Not implemented
already_AddRefed<ReadableStream> Readable() { return nullptr; }
already_AddRefed<WritableStream> Writable() { return nullptr; }
already_AddRefed<WebTransportReceiveStream> Readable() { return nullptr; }
already_AddRefed<WebTransportSendStream> Writable() { return nullptr; }
private:
~WebTransportBidirectionalStream() = default;

Просмотреть файл

@ -0,0 +1,61 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/dom/WebTransportReceiveStream.h"
#include "mozilla/dom/ReadableByteStreamController.h"
#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
#include "mozilla/dom/ReadableStream.h"
#include "mozilla/dom/WebTransport.h"
#include "mozilla/dom/WebTransportSendReceiveStreamBinding.h"
#include "mozilla/ipc/DataPipe.h"
using namespace mozilla::ipc;
namespace mozilla::dom {
WebTransportReceiveStream::WebTransportReceiveStream(nsIGlobalObject* aGlobal)
: ReadableStream(aGlobal) {}
// WebIDL Boilerplate
JSObject* WebTransportReceiveStream::WrapObject(
JSContext* aCx, JS::Handle<JSObject*> aGivenProto) {
return WebTransportReceiveStream_Binding::Wrap(aCx, this, aGivenProto);
}
already_AddRefed<WebTransportReceiveStream> WebTransportReceiveStream::Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal,
DataPipeReceiver* receiver, ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#webtransportreceivestream-create
AutoJSAPI jsapi;
if (!jsapi.Init(aGlobal)) {
return nullptr;
}
JSContext* cx = jsapi.cx();
auto stream = MakeRefPtr<WebTransportReceiveStream>(aGlobal);
nsCOMPtr<nsIAsyncInputStream> inputStream = receiver;
auto algorithms = MakeRefPtr<InputToReadableStreamAlgorithms>(
inputStream, (ReadableStream*)stream);
stream->SetUpByteNative(cx, *algorithms, Some(0.0), aRv);
if (aRv.Failed()) {
return nullptr;
}
// Add to ReceiveStreams
aWebTransport->mReceiveStreams.AppendElement(stream);
return stream.forget();
}
already_AddRefed<Promise> WebTransportReceiveStream::GetStats() {
RefPtr<Promise> promise = Promise::CreateInfallible(ReadableStream::mGlobal);
promise->MaybeRejectWithNotSupportedError("GetStats isn't supported yet");
return promise.forget();
}
} // namespace mozilla::dom

Просмотреть файл

@ -4,36 +4,40 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef DOM_WEBTRANSPORT_API_WEBTRANSPORTSENDSTREAM__H_
#define DOM_WEBTRANSPORT_API_WEBTRANSPORTSENDSTREAM__H_
#ifndef DOM_WEBTRANSPORT_API_WEBTRANSPORTRECEIVESTREAM__H_
#define DOM_WEBTRANSPORT_API_WEBTRANSPORTRECEIVESTREAM__H_
#include "mozilla/dom/ReadableStream.h"
#if WEBTRANSPORT_STREAM_IMPLEMENTED
namespace mozilla::ipc {
class DataPipeReceiver;
}
namespace mozilla::dom {
class WebTransport;
class WebTransportReceiveStream final : public ReadableStream {
protected:
WebTransportReceiveStream();
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(WebTransportReceiveStream,
ReadableStream)
NS_INLINE_DECL_REFCOUNTING_INHERITED(WebTransportReceiveStream,
ReadableStream)
explicit WebTransportReceiveStream(nsIGlobalObject* aGlobal);
MOZ_CAN_RUN_SCRIPT_BOUNDARY static already_AddRefed<WebTransportReceiveStream>
Create(WebTransport* aWebTransport, nsIGlobalObject* aGlobal,
mozilla::ipc::DataPipeReceiver* receiver, ErrorResult& aRv);
// WebIDL Boilerplate
JSObject* WrapObject(JSContext* aCx,
JS::Handle<JSObject*> aGivenProto) override;
// WebIDL Interface
already_AddRefed<Promise> GetStats();
};
#else
namespace mozilla::dom {
class WebTransportReceiveStream final : public nsISupports {
protected:
WebTransportReceiveStream();
public:
NS_DECL_ISUPPORTS
already_AddRefed<Promise> GetStats();
private:
~WebTransportReceiveStream() override = default;
};
#endif
}
#endif

Просмотреть файл

@ -0,0 +1,72 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mozilla/dom/WebTransportSendStream.h"
#include "mozilla/dom/UnderlyingSinkCallbackHelpers.h"
#include "mozilla/dom/WritableStream.h"
#include "mozilla/dom/WebTransport.h"
#include "mozilla/dom/WebTransportSendReceiveStreamBinding.h"
#include "mozilla/ipc/DataPipe.h"
using namespace mozilla::ipc;
namespace mozilla::dom {
WebTransportSendStream::WebTransportSendStream(nsIGlobalObject* aGlobal)
: WritableStream(aGlobal,
WritableStream::HoldDropJSObjectsCaller::Implicit) {}
JSObject* WebTransportSendStream::WrapObject(
JSContext* aCx, JS::Handle<JSObject*> aGivenProto) {
return WebTransportSendStream_Binding::Wrap(aCx, this, aGivenProto);
}
// NOTE: this does not yet implement SendOrder; see bug 1816925
/* static */
already_AddRefed<WebTransportSendStream> WebTransportSendStream::Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal,
DataPipeSender* sender, ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#webtransportsendstream-create
AutoJSAPI jsapi;
if (!jsapi.Init(aGlobal)) {
return nullptr;
}
JSContext* cx = jsapi.cx();
auto stream = MakeRefPtr<WebTransportSendStream>(aGlobal);
nsCOMPtr<nsIAsyncOutputStream> outputStream = sender;
auto algorithms = MakeRefPtr<WritableStreamToOutput>(
stream->GetParentObject(), outputStream);
// Steps 2-5
RefPtr<QueuingStrategySize> writableSizeAlgorithm;
stream->SetUpNative(cx, *algorithms, Nothing(), writableSizeAlgorithm, aRv);
// Step 6: Add the following steps to streams [[controller]]'s [[signal]].
// Step 6.1: If stream.[[PendingOperation]] is null, then abort these steps.
// Step 6.2: Let reason be streams [[controller]]'s [[signal]]'s abort
// reason. Step 6.3: Let abortPromise be the result of aborting stream with
// reason. Step 6.4: Upon fulfillment of abortPromise, reject promise with
// reason. Step 6.5: Let pendingOperation be stream.[[PendingOperation]].
// Step 6.6: Set stream.[[PendingOperation]] to null.
// Step 6.7: Resolve pendingOperation with promise.
// XXX TODO
// Step 7: Append stream to SendStreams
aWebTransport->mSendStreams.AppendElement(stream);
// Step 8: return stream
return stream.forget();
}
already_AddRefed<Promise> WebTransportSendStream::GetStats() {
RefPtr<Promise> promise = Promise::CreateInfallible(WritableStream::mGlobal);
promise->MaybeRejectWithNotSupportedError("GetStats isn't supported yet");
return promise.forget();
}
} // namespace mozilla::dom

Просмотреть файл

@ -9,31 +9,34 @@
#include "mozilla/dom/WritableStream.h"
#if WEBTRANSPORT_STREAM_IMPLEMENTED
namespace mozilla::ipc {
class DataPipeSender;
}
namespace mozilla::dom {
class WebTransport;
class WebTransportSendStream final : public WritableStream {
protected:
WebTransportSendStream();
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(WebTransportSendStream,
WritableStream)
NS_INLINE_DECL_REFCOUNTING_INHERITED(WebTransportSendStream, WritableStream)
explicit WebTransportSendStream(nsIGlobalObject* aGlobal);
MOZ_CAN_RUN_SCRIPT_BOUNDARY static already_AddRefed<WebTransportSendStream>
Create(WebTransport* aWebTransport, nsIGlobalObject* aGlobal,
mozilla::ipc::DataPipeSender* sender, ErrorResult& aRv);
// WebIDL Boilerplate
JSObject* WrapObject(JSContext* aCx,
JS::Handle<JSObject*> aGivenProto) override;
// WebIDL Interface
already_AddRefed<Promise> GetStats();
};
#else
namespace mozilla::dom {
class WebTransportSendStream final : public nsISupports {
protected:
WebTransportSendStream();
public:
NS_DECL_ISUPPORTS
already_AddRefed<Promise> GetStats();
private:
~WebTransportSendStream() override = default;
};
#endif
}
#endif

Просмотреть файл

@ -6,14 +6,21 @@
#include "mozilla/dom/WebTransportStreams.h"
#include "mozilla/dom/WebTransportLog.h"
#include "mozilla/dom/Promise-inl.h"
#include "mozilla/dom/WebTransport.h"
#include "mozilla/dom/WebTransportBidirectionalStream.h"
#include "mozilla/dom/WebTransportReceiveStream.h"
#include "mozilla/dom/WebTransportSendStream.h"
#include "mozilla/Result.h"
using namespace mozilla::ipc;
namespace mozilla::dom {
NS_IMPL_CYCLE_COLLECTION_INHERITED(WebTransportIncomingStreamsAlgorithms,
UnderlyingSourceAlgorithmsWrapper, mStream)
UnderlyingSourceAlgorithmsWrapper,
mTransport, mCallback)
NS_IMPL_ADDREF_INHERITED(WebTransportIncomingStreamsAlgorithms,
UnderlyingSourceAlgorithmsWrapper)
NS_IMPL_RELEASE_INHERITED(WebTransportIncomingStreamsAlgorithms,
@ -21,6 +28,10 @@ NS_IMPL_RELEASE_INHERITED(WebTransportIncomingStreamsAlgorithms,
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WebTransportIncomingStreamsAlgorithms)
NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsWrapper)
WebTransportIncomingStreamsAlgorithms::WebTransportIncomingStreamsAlgorithms(
StreamType aUnidirectional, WebTransport* aTransport)
: mUnidirectional(aUnidirectional), mTransport(aTransport) {}
WebTransportIncomingStreamsAlgorithms::
~WebTransportIncomingStreamsAlgorithms() = default;
@ -29,42 +40,134 @@ WebTransportIncomingStreamsAlgorithms::PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#pullbidirectionalstream and
// https://w3c.github.io/webtransport/#pullunidirectionalstream
// pullUnidirectionalStream Step 1: If WebTransport state is CONNECTING,
// return a promise and resolve or reject on state change
// We don't explicitly check Ready here, since we'll reject
// Step 1: If transport.[[State]] is "connecting", then return the result
// of performing the following steps upon fulfillment of
// transport.[[Ready]]:
// We don't explicitly check mState here, since we'll reject
// mIncomingStreamPromise if we go to FAILED or CLOSED
//
// Step 2
RefPtr<Promise> promise = Promise::Create(mStream->GetParentObject(), aRv);
// Step 2: Let session be transport.[[Session]].
// Step 3: Let p be a new promise.
RefPtr<Promise> promise =
Promise::CreateInfallible(mTransport->GetParentObject());
RefPtr<WebTransportIncomingStreamsAlgorithms> self(this);
// The real work of PullCallback()
// Wait until there's an incoming stream (or rejection)
// Step 5
Result<RefPtr<Promise>, nsresult> returnResult =
mIncomingStreamPromise->ThenWithCycleCollectedArgs(
[](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv,
const RefPtr<WebTransportIncomingStreamsAlgorithms>& self,
RefPtr<Promise> newPromise) {
Unused << self->mUnidirectional;
// XXX Use self->mUnidirectional here
// Step 6 Get new transport stream
// Step 7.1 create stream using transport stream
// Step 7.2 Enqueue
// Add to ReceiveStreams
// mStream->mReceiveStreams.AppendElement(stream);
// Step 7.3
newPromise->MaybeResolveWithUndefined();
return newPromise.forget();
},
self, promise);
if (returnResult.isErr()) {
// XXX Reject?
aRv.Throw(returnResult.unwrapErr());
return nullptr;
// Step 5: Wait until there is an available incoming unidirectional stream.
if (mTransport->mUnidirectionalStreams.GetSize() == 0) {
// We need to wait.
// Per
// https://streams.spec.whatwg.org/#readablestreamdefaultcontroller-pulling
// we can't be called again until the promise is resolved
MOZ_ASSERT(!mCallback);
mCallback = promise;
LOG(("Incoming%sDirectionalStreams Pull waiting for a stream",
mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi"));
Result<RefPtr<Promise>, nsresult> returnResult =
promise->ThenWithCycleCollectedArgs(
[](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv,
RefPtr<WebTransportIncomingStreamsAlgorithms> self,
RefPtr<Promise> aPromise) -> already_AddRefed<Promise> {
self->BuildStream(aCx, aRv);
return nullptr;
},
self, promise);
if (returnResult.isErr()) {
// XXX Reject?
aRv.Throw(returnResult.unwrapErr());
return nullptr;
}
// Step 4: Return p and run the remaining steps in parallel.
return returnResult.unwrap().forget();
}
self->BuildStream(aCx, aRv);
// Step 4: Return p and run the remaining steps in parallel.
return promise.forget();
}
void WebTransportIncomingStreamsAlgorithms::BuildStream(JSContext* aCx,
ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#pullbidirectionalstream and
// https://w3c.github.io/webtransport/#pullunidirectionalstream
LOG(("Incoming%sDirectionalStreams Pull building a stream",
mUnidirectional == StreamType::Unidirectional ? "Uni" : "Bi"));
if (mUnidirectional == StreamType::Unidirectional) {
// Step 6: Let internalStream be the result of receiving an incoming
// unidirectional stream.
RefPtr<DataPipeReceiver> pipe = mTransport->mUnidirectionalStreams.Pop();
// Step 7.1: Let stream be the result of creating a
// WebTransportReceiveStream with internalStream and transport
RefPtr<WebTransportReceiveStream> readableStream =
WebTransportReceiveStream::Create(mTransport, mTransport->mGlobal, pipe,
aRv);
if (MOZ_UNLIKELY(!readableStream)) {
aRv.ThrowUnknownError("Internal error");
return;
}
// Step 7.2 Enqueue stream to transport.[[IncomingUnidirectionalStreams]].
JS::Rooted<JS::Value> jsStream(aCx);
if (MOZ_UNLIKELY(!ToJSValue(aCx, readableStream, &jsStream))) {
aRv.ThrowUnknownError("Internal error");
return;
}
// EnqueueNative is CAN_RUN_SCRIPT
RefPtr<ReadableStream> incomingStream =
mTransport->mIncomingUnidirectionalStreams;
incomingStream->EnqueueNative(aCx, jsStream, aRv);
if (MOZ_UNLIKELY(aRv.Failed())) {
aRv.ThrowUnknownError("Internal error");
return;
}
} else {
// Step 6: Let internalStream be the result of receiving a bidirectional
// stream
UniquePtr<Tuple<RefPtr<DataPipeReceiver>, RefPtr<DataPipeSender>>> pipes(
mTransport->mBidirectionalStreams.Pop());
// Step 7.1: Let stream be the result of creating a
// WebTransportBidirectionalStream with internalStream and transport
// Step 7.2 Enqueue stream to transport.[[IncomingBidirectionalStreams]].
// Add to ReceiveStreams
// mTransport->mReceiveStreams.AppendElement(stream);
}
// Step 7.3: Resolve p with undefined.
}
void WebTransportIncomingStreamsAlgorithms::NotifyIncomingStream() {
if (mUnidirectional == StreamType::Unidirectional) {
LOG(("NotifyIncomingStream: %zu Unidirectional ",
mTransport->mUnidirectionalStreams.GetSize()));
auto number = mTransport->mUnidirectionalStreams.GetSize();
MOZ_ASSERT(number > 0);
#endif
RefPtr<Promise> promise = mCallback.forget();
if (promise) {
promise->MaybeResolveWithUndefined();
}
} else {
LOG(("NotifyIncomingStream: %zu Bidirectional ",
mTransport->mBidirectionalStreams.GetSize()));
auto number = mTransport->mBidirectionalStreams.GetSize();
MOZ_ASSERT(number > 0);
#endif
RefPtr<Promise> promise = mCallback.forget();
if (promise) {
promise->MaybeResolveWithUndefined();
}
}
}
void WebTransportIncomingStreamsAlgorithms::NotifyRejectAll() {
// cancel all pulls
LOG(("Cancel all WebTransport Pulls"));
// Ensure we clear the callback before resolving/rejecting it
if (RefPtr<Promise> promise = mCallback.forget()) {
promise->MaybeReject(NS_ERROR_FAILURE);
}
// Step 4
return returnResult.unwrap().forget();
}
} // namespace mozilla::dom

Просмотреть файл

@ -8,19 +8,18 @@
#define DOM_WEBTRANSPORT_API_WEBTRANSPORTSTREAMS__H_
#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
#include "mozilla/dom/WritableStream.h"
namespace mozilla::dom {
class WebTransport;
class WebTransportIncomingStreamsAlgorithms
: public UnderlyingSourceAlgorithmsWrapper {
public:
WebTransportIncomingStreamsAlgorithms(Promise* aIncomingStreamPromise,
bool aUnidirectional,
WebTransport* aStream)
: mIncomingStreamPromise(aIncomingStreamPromise),
mUnidirectional(aUnidirectional),
mStream(aStream) {}
enum class StreamType : uint8_t { Unidirectional, Bidirectional };
WebTransportIncomingStreamsAlgorithms(StreamType aUnidirectional,
WebTransport* aTransport);
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(
@ -30,13 +29,21 @@ class WebTransportIncomingStreamsAlgorithms
JSContext* aCx, ReadableStreamController& aController,
ErrorResult& aRv) override;
// We call EnqueueNative, which is MOZ_CAN_RUN_SCRIPT but won't in this case
MOZ_CAN_RUN_SCRIPT_BOUNDARY void BuildStream(JSContext* aCx,
ErrorResult& aRv);
void NotifyIncomingStream();
void NotifyRejectAll();
protected:
~WebTransportIncomingStreamsAlgorithms() override;
private:
RefPtr<Promise> mIncomingStreamPromise;
const bool mUnidirectional;
RefPtr<WebTransport> mStream;
const StreamType mUnidirectional;
RefPtr<WebTransport> mTransport;
RefPtr<Promise> mCallback;
};
} // namespace mozilla::dom

Просмотреть файл

@ -19,6 +19,8 @@ UNIFIED_SOURCES += [
"WebTransportBidirectionalStream.cpp",
"WebTransportDatagramDuplexStream.cpp",
"WebTransportError.cpp",
"WebTransportReceiveStream.cpp",
"WebTransportSendStream.cpp",
"WebTransportStreams.cpp",
]

Просмотреть файл

@ -14,12 +14,15 @@
#include "nsIOService.h"
#include "nsIPrincipal.h"
#include "nsIWebTransport.h"
#include "nsStreamUtils.h"
#include "nsIWebTransportStream.h"
using IPCResult = mozilla::ipc::IPCResult;
namespace mozilla::dom {
NS_IMPL_ISUPPORTS(WebTransportParent, WebTransportSessionEventListener);
using IPCResult = mozilla::ipc::IPCResult;
using CreateWebTransportPromise =
MozPromise<WebTransportReliabilityMode, nsresult, true>;
WebTransportParent::~WebTransportParent() {
@ -131,8 +134,8 @@ void WebTransportParent::ActorDestroy(ActorDestroyReason aWhy) {
// We may not receive this response if the child side is destroyed without
// `Close` or `Shutdown` being explicitly called.
mozilla::ipc::IPCResult WebTransportParent::RecvClose(
const uint32_t& aCode, const nsACString& aReason) {
IPCResult WebTransportParent::RecvClose(const uint32_t& aCode,
const nsACString& aReason) {
LOG(("Close received, code = %u, reason = %s", aCode,
PromiseFlatCString(aReason).get()));
MOZ_ASSERT(!mClosed);
@ -142,6 +145,16 @@ mozilla::ipc::IPCResult WebTransportParent::RecvClose(
return IPC_OK();
}
IPCResult WebTransportParent::RecvCreateUnidirectionalStream(
CreateUnidirectionalStreamResolver&& aResolver) {
return IPC_OK();
}
IPCResult WebTransportParent::RecvCreateBidirectionalStream(
CreateBidirectionalStreamResolver&& aResolver) {
return IPC_OK();
}
// We recieve this notification from the WebTransportSessionProxy if session was
// successfully created at the end of
// WebTransportSessionProxy::OnStopRequest
@ -231,8 +244,30 @@ WebTransportParent::OnIncomingStreamAvailableInternal(
NS_IMETHODIMP
WebTransportParent::OnIncomingUnidirectionalStreamAvailable(
nsIWebTransportReceiveStream* aStream) {
// XXX implement once DOM WebAPI supports creation of streams
Unused << aStream;
LOG(("IncomingUnidirectonalStream available"));
// We should be on the Socket Thread
RefPtr<DataPipeSender> sender;
RefPtr<DataPipeReceiver> receiver;
nsresult rv = NewDataPipe(mozilla::ipc::kDefaultDataPipeCapacity,
getter_AddRefs(sender), getter_AddRefs(receiver));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
nsCOMPtr<nsIEventTarget> target = GetCurrentSerialEventTarget();
nsCOMPtr<nsIAsyncInputStream> stream = do_QueryInterface(aStream);
rv = NS_AsyncCopy(stream, sender, target, NS_ASYNCCOPY_VIA_READSEGMENTS,
mozilla::ipc::kDefaultDataPipeCapacity, nullptr, nullptr);
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
LOG(("Sending UnidirectionalStream pipe to content"));
// pass the DataPipeReceiver to the content process
if (!SendIncomingUnidirectionalStream(receiver)) {
return NS_ERROR_FAILURE;
}
return NS_OK;
}
@ -240,6 +275,7 @@ NS_IMETHODIMP
WebTransportParent::OnIncomingBidirectionalStreamAvailable(
nsIWebTransportBidirectionalStream* aStream) {
// XXX implement once DOM WebAPI supports creation of streams
LOG(("Sending BidirectionalStream pipe to content"));
Unused << aStream;
return NS_OK;
}

Просмотреть файл

@ -11,6 +11,7 @@
#include "mozilla/dom/FlippedOnce.h"
#include "mozilla/dom/PWebTransportParent.h"
#include "mozilla/ipc/Endpoint.h"
#include "mozilla/ipc/PBackgroundSharedTypes.h"
#include "nsISupports.h"
#include "nsIPrincipal.h"
#include "nsIWebTransport.h"
@ -21,6 +22,8 @@ enum class WebTransportReliabilityMode : uint8_t;
class WebTransportParent : public PWebTransportParent,
public WebTransportSessionEventListener {
using IPCResult = mozilla::ipc::IPCResult;
public:
WebTransportParent() = default;
@ -34,8 +37,12 @@ class WebTransportParent : public PWebTransportParent,
Endpoint<PWebTransportParent>&& aParentEndpoint,
std::function<void(Tuple<const nsresult&, const uint8_t&>)>&& aResolver);
mozilla::ipc::IPCResult RecvClose(const uint32_t& aCode,
const nsACString& aReason);
IPCResult RecvClose(const uint32_t& aCode, const nsACString& aReason);
IPCResult RecvCreateUnidirectionalStream(
CreateUnidirectionalStreamResolver&& aResolver);
IPCResult RecvCreateBidirectionalStream(
CreateBidirectionalStreamResolver&& aResolver);
void ActorDestroy(ActorDestroyReason aWhy) override;

Просмотреть файл

@ -9,6 +9,10 @@ include PBackgroundSharedTypes;
namespace mozilla {
namespace dom {
struct BidirectionalStream {
DataPipeReceiver incoming;
DataPipeSender outStream;
};
async protocol PWebTransport
{
@ -17,6 +21,10 @@ async protocol PWebTransport
* TODO: documentation
*/
async Close(uint32_t code, nsCString reason);
async CreateUnidirectionalStream()
returns(DataPipeSender sender);
async CreateBidirectionalStream()
returns(BidirectionalStream pipes);
child: