Bug 1853444: Set/Get SendOrder for WebTransportSendStreams r=kershaw,necko-reviewers,webidl,smaug

Allow dynamic changes to steram sendOrder to comply with updated
spec.

Differential Revision: https://phabricator.services.mozilla.com/D188377
This commit is contained in:
Randell Jesup 2023-11-09 16:02:46 +00:00
Родитель 9ade511ef2
Коммит 3e19daaab6
24 изменённых файлов: 237 добавлений и 371 удалений

Просмотреть файл

@ -8,6 +8,7 @@
[Exposed=(Window,Worker), SecureContext, Pref="network.webtransport.enabled"]
interface WebTransportSendStream : WritableStream {
attribute long long? sendOrder;
Promise<WebTransportSendStreamStats> getStats();
};

Просмотреть файл

@ -662,7 +662,7 @@ already_AddRefed<Promise> WebTransport::CreateBidirectionalStream(
// pair
mChild->SendCreateBidirectionalStream(
sendOrder,
[self = RefPtr{this}, promise](
[self = RefPtr{this}, sendOrder, promise](
BidirectionalStreamResponse&& aPipes) MOZ_CAN_RUN_SCRIPT_BOUNDARY {
LOG(("CreateBidirectionalStream response"));
if (BidirectionalStreamResponse::Tnsresult == aPipes.type()) {
@ -688,7 +688,7 @@ already_AddRefed<Promise> WebTransport::CreateBidirectionalStream(
WebTransportBidirectionalStream::Create(
self, self->mGlobal, id,
aPipes.get_BidirectionalStream().inStream(),
aPipes.get_BidirectionalStream().outStream(), error);
aPipes.get_BidirectionalStream().outStream(), sendOrder, error);
LOG(("Returning a bidirectionalStream"));
promise->MaybeResolve(newStream);
},
@ -733,7 +733,8 @@ already_AddRefed<Promise> WebTransport::CreateUnidirectionalStream(
// Ask the parent to create the stream and send us the DataPipeSender
mChild->SendCreateUnidirectionalStream(
sendOrder,
[self = RefPtr{this}, promise](UnidirectionalStreamResponse&& aResponse)
[self = RefPtr{this}, sendOrder,
promise](UnidirectionalStreamResponse&& aResponse)
MOZ_CAN_RUN_SCRIPT_BOUNDARY {
LOG(("CreateUnidirectionalStream response"));
if (UnidirectionalStreamResponse::Tnsresult == aResponse.type()) {
@ -764,7 +765,8 @@ already_AddRefed<Promise> WebTransport::CreateUnidirectionalStream(
RefPtr<WebTransportSendStream> writableStream =
WebTransportSendStream::Create(
self, self->mGlobal, id,
aResponse.get_UnidirectionalStream().outStream(), error);
aResponse.get_UnidirectionalStream().outStream(), sendOrder,
error);
if (!writableStream) {
promise->MaybeReject(std::move(error));
return;
@ -870,6 +872,11 @@ void WebTransport::Cleanup(WebTransportError* aError,
NotifyToWindow(false);
}
void WebTransport::SendSetSendOrder(uint64_t aStreamId,
Maybe<int64_t> aSendOrder) {
mChild->SendSetSendOrder(aStreamId, aSendOrder);
}
void WebTransport::NotifyBFCacheOnMainThread(nsPIDOMWindowInner* aInner,
bool aCreated) {
AssertIsOnMainThread();

Просмотреть файл

@ -118,6 +118,8 @@ class WebTransport final : public nsISupports, public nsWrapperCache {
MOZ_CAN_RUN_SCRIPT_BOUNDARY already_AddRefed<ReadableStream>
IncomingUnidirectionalStreams();
void SendSetSendOrder(uint64_t aStreamId, Maybe<int64_t> aSendOrder);
void Shutdown() {}
private:

Просмотреть файл

@ -36,7 +36,8 @@ JSObject* WebTransportBidirectionalStream::WrapObject(
already_AddRefed<WebTransportBidirectionalStream>
WebTransportBidirectionalStream::Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal, uint64_t aStreamId,
DataPipeReceiver* receiver, DataPipeSender* sender, ErrorResult& aRv) {
DataPipeReceiver* receiver, DataPipeSender* aSender,
Maybe<int64_t> aSendOrder, ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#pullbidirectionalstream (and
// createBidirectionalStream)
@ -49,8 +50,8 @@ WebTransportBidirectionalStream::Create(
return nullptr;
}
RefPtr<WebTransportSendStream> writableStream =
WebTransportSendStream::Create(aWebTransport, aGlobal, aStreamId, sender,
aRv);
WebTransportSendStream::Create(aWebTransport, aGlobal, aStreamId, aSender,
aSendOrder, aRv);
if (!writableStream) {
return nullptr;
;

Просмотреть файл

@ -35,7 +35,8 @@ class WebTransportBidirectionalStream final : public nsISupports,
static already_AddRefed<WebTransportBidirectionalStream> Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal, uint64_t aStreamId,
::mozilla::ipc::DataPipeReceiver* receiver,
::mozilla::ipc::DataPipeSender* sender, ErrorResult& aRv);
::mozilla::ipc::DataPipeSender* aSender, Maybe<int64_t> aSendOrder,
ErrorResult& aRv);
// WebIDL Boilerplate
nsIGlobalObject* GetParentObject() const;

Просмотреть файл

@ -40,7 +40,7 @@ JSObject* WebTransportSendStream::WrapObject(
/* static */
already_AddRefed<WebTransportSendStream> WebTransportSendStream::Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal, uint64_t aStreamId,
DataPipeSender* sender, ErrorResult& aRv) {
DataPipeSender* aSender, Maybe<int64_t> aSendOrder, ErrorResult& aRv) {
// https://w3c.github.io/webtransport/#webtransportsendstream-create
AutoJSAPI jsapi;
if (!jsapi.Init(aGlobal)) {
@ -50,10 +50,16 @@ already_AddRefed<WebTransportSendStream> WebTransportSendStream::Create(
auto stream = MakeRefPtr<WebTransportSendStream>(aGlobal, aWebTransport);
nsCOMPtr<nsIAsyncOutputStream> outputStream = sender;
nsCOMPtr<nsIAsyncOutputStream> outputStream = aSender;
auto algorithms = MakeRefPtr<WritableStreamToOutput>(
stream->GetParentObject(), outputStream);
stream->mStreamId = aStreamId;
if (aSendOrder.isSome()) {
stream->mSendOrder.SetValue(aSendOrder.value());
}
// Steps 2-5
RefPtr<QueuingStrategySize> writableSizeAlgorithm;
stream->SetUpNative(cx, *algorithms, Nothing(), writableSizeAlgorithm, aRv);
@ -74,6 +80,12 @@ already_AddRefed<WebTransportSendStream> WebTransportSendStream::Create(
return stream.forget();
}
void WebTransportSendStream::SetSendOrder(Nullable<int64_t> aSendOrder) {
mSendOrder = aSendOrder;
mTransport->SendSetSendOrder(
mStreamId, aSendOrder.IsNull() ? Nothing() : Some(aSendOrder.Value()));
}
already_AddRefed<Promise> WebTransportSendStream::GetStats() {
RefPtr<Promise> promise = Promise::CreateInfallible(WritableStream::mGlobal);
promise->MaybeRejectWithNotSupportedError("GetStats isn't supported yet");

Просмотреть файл

@ -27,13 +27,18 @@ class WebTransportSendStream final : public WritableStream {
static already_AddRefed<WebTransportSendStream> Create(
WebTransport* aWebTransport, nsIGlobalObject* aGlobal, uint64_t aStreamId,
mozilla::ipc::DataPipeSender* sender, ErrorResult& aRv);
mozilla::ipc::DataPipeSender* aSender, Maybe<int64_t> aSendOrder,
ErrorResult& aRv);
// WebIDL Boilerplate
JSObject* WrapObject(JSContext* aCx,
JS::Handle<JSObject*> aGivenProto) override;
// WebIDL Interface
Nullable<int64_t> GetSendOrder() { return mSendOrder; }
void SetSendOrder(Nullable<int64_t> aSendOrder);
already_AddRefed<Promise> GetStats();
private:
@ -44,6 +49,8 @@ class WebTransportSendStream final : public WritableStream {
// CC runs. WebTransport::CleanUp() will destroy all the send and receive
// streams, breaking the cycle.
RefPtr<WebTransport> mTransport;
uint64_t mStreamId;
Nullable<int64_t> mSendOrder;
};
} // namespace mozilla::dom

Просмотреть файл

@ -140,7 +140,7 @@ void WebTransportIncomingStreamsAlgorithms::BuildStream(JSContext* aCx,
RefPtr<WebTransportBidirectionalStream> stream =
WebTransportBidirectionalStream::Create(mTransport, mTransport->mGlobal,
std::get<0>(tuple), input,
output, aRv);
output, Nothing(), aRv);
// Step 7.2 Enqueue stream to transport.[[IncomingBidirectionalStreams]].
JS::Rooted<JS::Value> jsStream(aCx);

Просмотреть файл

@ -165,61 +165,78 @@ IPCResult WebTransportParent::RecvClose(const uint32_t& aCode,
return IPC_OK();
}
class ReceiveStream final : public nsIWebTransportStreamCallback {
class BidiReceiveStream : public nsIWebTransportStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIWEBTRANSPORTSTREAMCALLBACK
ReceiveStream(
WebTransportParent::CreateUnidirectionalStreamResolver&& aResolver,
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>&&
aStreamCallback,
Maybe<int64_t> aSendOrder, nsCOMPtr<nsISerialEventTarget>& aSocketThread)
: mUniResolver(aResolver),
mStreamCallback(std::move(aStreamCallback)),
mSendOrder(aSendOrder),
mSocketThread(aSocketThread) {}
ReceiveStream(
BidiReceiveStream(
WebTransportParent::CreateBidirectionalStreamResolver&& aResolver,
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>&&
aStreamCallback,
std::function<
void(uint64_t, WebTransportParent::OnResetOrStopSendingCallback&&,
nsIWebTransportBidirectionalStream* aStream)>&& aStreamCallback,
Maybe<int64_t> aSendOrder, nsCOMPtr<nsISerialEventTarget>& aSocketThread)
: mBiResolver(aResolver),
: mResolver(aResolver),
mStreamCallback(std::move(aStreamCallback)),
mSendOrder(aSendOrder),
mSocketThread(aSocketThread) {}
private:
~ReceiveStream() = default;
WebTransportParent::CreateUnidirectionalStreamResolver mUniResolver;
WebTransportParent::CreateBidirectionalStreamResolver mBiResolver;
virtual ~BidiReceiveStream() = default;
WebTransportParent::CreateBidirectionalStreamResolver mResolver;
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&)>
WebTransportParent::OnResetOrStopSendingCallback&&,
nsIWebTransportBidirectionalStream* aStream)>
mStreamCallback;
Maybe<int64_t> mSendOrder;
nsCOMPtr<nsISerialEventTarget> mSocketThread;
};
NS_IMPL_ISUPPORTS(ReceiveStream, nsIWebTransportStreamCallback)
class UniReceiveStream : public nsIWebTransportStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIWEBTRANSPORTSTREAMCALLBACK
UniReceiveStream(
WebTransportParent::CreateUnidirectionalStreamResolver&& aResolver,
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&,
nsIWebTransportSendStream* aStream)>&& aStreamCallback,
Maybe<int64_t> aSendOrder, nsCOMPtr<nsISerialEventTarget>& aSocketThread)
: mResolver(aResolver),
mStreamCallback(std::move(aStreamCallback)),
mSendOrder(aSendOrder),
mSocketThread(aSocketThread) {}
private:
virtual ~UniReceiveStream() = default;
WebTransportParent::CreateUnidirectionalStreamResolver mResolver;
std::function<void(uint64_t,
WebTransportParent::OnResetOrStopSendingCallback&&,
nsIWebTransportSendStream* aStream)>
mStreamCallback;
Maybe<int64_t> mSendOrder;
nsCOMPtr<nsISerialEventTarget> mSocketThread;
};
NS_IMPL_ISUPPORTS(BidiReceiveStream, nsIWebTransportStreamCallback)
NS_IMPL_ISUPPORTS(UniReceiveStream, nsIWebTransportStreamCallback)
// nsIWebTransportStreamCallback:
NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
NS_IMETHODIMP BidiReceiveStream::OnBidirectionalStreamReady(
nsIWebTransportBidirectionalStream* aStream) {
LOG(("Bidirectional stream ready!"));
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
if (mSendOrder.isSome()) {
aStream->SetSendOrder(mSendOrder.value());
}
aStream->SetSendOrder(mSendOrder);
RefPtr<mozilla::ipc::DataPipeSender> inputsender;
RefPtr<mozilla::ipc::DataPipeReceiver> inputreceiver;
nsresult rv =
NewDataPipe(mozilla::ipc::kDefaultDataPipeCapacity,
getter_AddRefs(inputsender), getter_AddRefs(inputreceiver));
if (NS_WARN_IF(NS_FAILED(rv))) {
mBiResolver(rv);
mResolver(rv);
return rv;
}
@ -234,7 +251,7 @@ NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
mozilla::ipc::kDefaultDataPipeCapacity, nullptr, nullptr,
true, true, getter_AddRefs(inputCopyContext));
if (NS_WARN_IF(NS_FAILED(rv))) {
mBiResolver(rv);
mResolver(rv);
return rv;
}
@ -244,7 +261,7 @@ NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
NewDataPipe(mozilla::ipc::kDefaultDataPipeCapacity,
getter_AddRefs(outputsender), getter_AddRefs(outputreceiver));
if (NS_WARN_IF(NS_FAILED(rv))) {
mBiResolver(rv);
mResolver(rv);
return rv;
}
@ -257,12 +274,12 @@ NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
mozilla::ipc::kDefaultDataPipeCapacity, nullptr, nullptr,
true, true, getter_AddRefs(outputCopyContext));
if (NS_WARN_IF(NS_FAILED(rv))) {
mBiResolver(rv);
mResolver(rv);
return rv;
}
LOG(("Returning BidirectionalStream pipe to content"));
mBiResolver(BidirectionalStream(id, inputreceiver, outputsender));
mResolver(BidirectionalStream(id, inputreceiver, outputsender));
auto onResetOrStopSending =
[inputCopyContext(inputCopyContext), outputCopyContext(outputCopyContext),
@ -275,29 +292,36 @@ NS_IMETHODIMP ReceiveStream::OnBidirectionalStreamReady(
outputreceiver->CloseWithStatus(aError);
};
// Store onResetOrStopSending in WebTransportParent::mStreamCallbackMap and
// onResetOrStopSending will be called when a stream receives STOP_SENDING or
// RESET.
mStreamCallback(id, WebTransportParent::OnResetOrStopSendingCallback(
std::move(onResetOrStopSending)));
// Store onResetOrStopSending in WebTransportParent::mBidiStreamCallbackMap
// and onResetOrStopSending will be called when a stream receives STOP_SENDING
// or RESET.
mStreamCallback(id,
WebTransportParent::OnResetOrStopSendingCallback(
std::move(onResetOrStopSending)),
aStream);
return NS_OK;
}
NS_IMETHODIMP UniReceiveStream::OnBidirectionalStreamReady(
nsIWebTransportBidirectionalStream* aStream) {
return NS_ERROR_FAILURE;
}
NS_IMETHODIMP
ReceiveStream::OnUnidirectionalStreamReady(nsIWebTransportSendStream* aStream) {
UniReceiveStream::OnUnidirectionalStreamReady(
nsIWebTransportSendStream* aStream) {
LOG(("Unidirectional stream ready!"));
// We should be on the Socket Thread
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
if (mSendOrder.isSome()) {
aStream->SetSendOrder(mSendOrder.value());
}
aStream->SetSendOrder(mSendOrder);
RefPtr<::mozilla::ipc::DataPipeSender> sender;
RefPtr<::mozilla::ipc::DataPipeReceiver> receiver;
nsresult rv = NewDataPipe(mozilla::ipc::kDefaultDataPipeCapacity,
getter_AddRefs(sender), getter_AddRefs(receiver));
if (NS_WARN_IF(NS_FAILED(rv))) {
mUniResolver(rv);
mResolver(rv);
return rv;
}
@ -312,13 +336,13 @@ ReceiveStream::OnUnidirectionalStreamReady(nsIWebTransportSendStream* aStream) {
mozilla::ipc::kDefaultDataPipeCapacity, nullptr, nullptr,
true, true, getter_AddRefs(copyContext));
if (NS_WARN_IF(NS_FAILED(rv))) {
mUniResolver(rv);
mResolver(rv);
return rv;
}
LOG(("Returning UnidirectionalStream pipe to content"));
// pass the DataPipeSender to the content process
mUniResolver(UnidirectionalStream(id, sender));
mResolver(UnidirectionalStream(id, sender));
auto onResetOrStopSending = [copyContext(copyContext),
receiver(receiver)](nsresult aError) {
@ -329,24 +353,54 @@ ReceiveStream::OnUnidirectionalStreamReady(nsIWebTransportSendStream* aStream) {
// Store onResetOrStopSending in WebTransportParent::mStreamCallbackMap and
// onResetOrStopSending will be called when a stream receives STOP_SENDING.
mStreamCallback(id, WebTransportParent::OnResetOrStopSendingCallback(
std::move(onResetOrStopSending)));
mStreamCallback(id,
WebTransportParent::OnResetOrStopSendingCallback(
std::move(onResetOrStopSending)),
aStream);
return NS_OK;
}
JS_HAZ_CAN_RUN_SCRIPT NS_IMETHODIMP ReceiveStream::OnError(uint8_t aError) {
NS_IMETHODIMP
BidiReceiveStream::OnUnidirectionalStreamReady(
nsIWebTransportSendStream* aStream) {
return NS_ERROR_FAILURE;
}
JS_HAZ_CAN_RUN_SCRIPT NS_IMETHODIMP UniReceiveStream::OnError(uint8_t aError) {
nsresult rv = aError == nsIWebTransport::INVALID_STATE_ERROR
? NS_ERROR_DOM_INVALID_STATE_ERR
: NS_ERROR_FAILURE;
LOG(("CreateStream OnError: %u", aError));
if (mUniResolver) {
mUniResolver(rv);
} else if (mBiResolver) {
mBiResolver(rv);
}
mResolver(rv);
return NS_OK;
}
JS_HAZ_CAN_RUN_SCRIPT NS_IMETHODIMP BidiReceiveStream::OnError(uint8_t aError) {
nsresult rv = aError == nsIWebTransport::INVALID_STATE_ERROR
? NS_ERROR_DOM_INVALID_STATE_ERR
: NS_ERROR_FAILURE;
LOG(("CreateStream OnError: %u", aError));
mResolver(rv);
return NS_OK;
}
IPCResult WebTransportParent::RecvSetSendOrder(uint64_t aStreamId,
Maybe<int64_t> aSendOrder) {
if (aSendOrder) {
LOG(("Set sendOrder=%" PRIi64 " for streamId %" PRIu64, aSendOrder.value(),
aStreamId));
} else {
LOG(("Set sendOrder=null for streamId %" PRIu64, aStreamId));
}
if (auto entry = mUniStreamCallbackMap.Lookup(aStreamId)) {
entry->mStream->SetSendOrder(aSendOrder);
} else if (auto entry = mBidiStreamCallbackMap.Lookup(aStreamId)) {
entry->mStream->SetSendOrder(aSendOrder);
}
return IPC_OK();
}
IPCResult WebTransportParent::RecvCreateUnidirectionalStream(
Maybe<int64_t> aSendOrder, CreateUnidirectionalStreamResolver&& aResolver) {
LOG(("%s for %p received, useSendOrder=%d, sendOrder=%" PRIi64, __func__,
@ -356,11 +410,13 @@ IPCResult WebTransportParent::RecvCreateUnidirectionalStream(
auto streamCb =
[self = RefPtr{this}](
uint64_t aStreamId,
WebTransportParent::OnResetOrStopSendingCallback&& aCallback) {
self->mStreamCallbackMap.InsertOrUpdate(aStreamId,
std::move(aCallback));
WebTransportParent::OnResetOrStopSendingCallback&& aCallback,
nsIWebTransportSendStream* aStream) {
self->mUniStreamCallbackMap.InsertOrUpdate(
aStreamId, StreamHash<nsIWebTransportSendStream>{
std::move(aCallback), aStream});
};
RefPtr<ReceiveStream> callback = new ReceiveStream(
RefPtr<UniReceiveStream> callback = new UniReceiveStream(
std::move(aResolver), std::move(streamCb), aSendOrder, mSocketThread);
nsresult rv;
rv = mWebTransport->CreateOutgoingUnidirectionalStream(callback);
@ -379,11 +435,13 @@ IPCResult WebTransportParent::RecvCreateBidirectionalStream(
auto streamCb =
[self = RefPtr{this}](
uint64_t aStreamId,
WebTransportParent::OnResetOrStopSendingCallback&& aCallback) {
self->mStreamCallbackMap.InsertOrUpdate(aStreamId,
std::move(aCallback));
WebTransportParent::OnResetOrStopSendingCallback&& aCallback,
nsIWebTransportBidirectionalStream* aStream) {
self->mBidiStreamCallbackMap.InsertOrUpdate(
aStreamId, StreamHash<nsIWebTransportBidirectionalStream>{
std::move(aCallback), aStream});
};
RefPtr<ReceiveStream> callback = new ReceiveStream(
RefPtr<BidiReceiveStream> callback = new BidiReceiveStream(
std::move(aResolver), std::move(streamCb), aSendOrder, mSocketThread);
nsresult rv;
rv = mWebTransport->CreateOutgoingBidirectionalStream(callback);
@ -511,9 +569,12 @@ NS_IMETHODIMP WebTransportParent::OnStopSending(uint64_t aStreamId,
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
LOG(("WebTransportParent::OnStopSending %p stream id=%" PRIx64, this,
aStreamId));
if (auto entry = mStreamCallbackMap.Lookup(aStreamId)) {
entry->OnResetOrStopSending(aError);
mStreamCallbackMap.Remove(aStreamId);
if (auto entry = mUniStreamCallbackMap.Lookup(aStreamId)) {
entry->mCallback.OnResetOrStopSending(aError);
mUniStreamCallbackMap.Remove(aStreamId);
} else if (auto entry = mBidiStreamCallbackMap.Lookup(aStreamId)) {
entry->mCallback.OnResetOrStopSending(aError);
mBidiStreamCallbackMap.Remove(aStreamId);
}
if (CanSend()) {
Unused << SendOnStreamResetOrStopSending(aStreamId,
@ -527,9 +588,12 @@ NS_IMETHODIMP WebTransportParent::OnResetReceived(uint64_t aStreamId,
MOZ_ASSERT(mSocketThread->IsOnCurrentThread());
LOG(("WebTransportParent::OnResetReceived %p stream id=%" PRIx64, this,
aStreamId));
if (auto entry = mStreamCallbackMap.Lookup(aStreamId)) {
entry->OnResetOrStopSending(aError);
mStreamCallbackMap.Remove(aStreamId);
if (auto entry = mUniStreamCallbackMap.Lookup(aStreamId)) {
entry->mCallback.OnResetOrStopSending(aError);
mUniStreamCallbackMap.Remove(aStreamId);
} else if (auto entry = mBidiStreamCallbackMap.Lookup(aStreamId)) {
entry->mCallback.OnResetOrStopSending(aError);
mBidiStreamCallbackMap.Remove(aStreamId);
}
if (CanSend()) {
Unused << SendOnStreamResetOrStopSending(aStreamId, ResetError(aError));

Просмотреть файл

@ -16,6 +16,7 @@
#include "nsISupports.h"
#include "nsIPrincipal.h"
#include "nsIWebTransport.h"
#include "nsIWebTransportStream.h"
#include "nsTHashMap.h"
namespace mozilla::dom {
@ -43,6 +44,8 @@ class WebTransportParent : public PWebTransportParent,
IPCResult RecvClose(const uint32_t& aCode, const nsACString& aReason);
IPCResult RecvSetSendOrder(uint64_t aStreamId, Maybe<int64_t> aSendOrder);
IPCResult RecvCreateUnidirectionalStream(
Maybe<int64_t> aSendOrder,
CreateUnidirectionalStreamResolver&& aResolver);
@ -93,7 +96,17 @@ class WebTransportParent : public PWebTransportParent,
nsCOMPtr<nsIWebTransport> mWebTransport;
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
nsTHashMap<nsUint64HashKey, OnResetOrStopSendingCallback> mStreamCallbackMap;
// What we need to be able to lookup by streamId
template <typename T>
struct StreamHash {
OnResetOrStopSendingCallback mCallback;
nsCOMPtr<T> mStream;
};
nsTHashMap<nsUint64HashKey, StreamHash<nsIWebTransportBidirectionalStream>>
mBidiStreamCallbackMap;
nsTHashMap<nsUint64HashKey, StreamHash<nsIWebTransportSendStream>>
mUniStreamCallbackMap;
};
} // namespace mozilla::dom

Просмотреть файл

@ -71,6 +71,11 @@ async protocol PWebTransport
async GetMaxDatagramSize()
returns(uint64_t maxDatagramSize);
/**
* Set the sendOrder for an existing stream
*/
async SetSendOrder(uint64_t streamId, int64_t? sendOrder);
child:
async IncomingUnidirectionalStream(uint64_t streamId, nullable DataPipeReceiver receive);

Просмотреть файл

@ -2502,7 +2502,8 @@ uint64_t Http3Session::MaxDatagramSize(uint64_t aSessionId) {
return size;
}
void Http3Session::SetSendOrder(Http3StreamBase* aStream, int64_t aSendOrder) {
void Http3Session::SetSendOrder(Http3StreamBase* aStream,
Maybe<int64_t> aSendOrder) {
if (!IsClosing()) {
nsresult rv = mHttp3Connection->WebTransportSetSendOrder(
aStream->StreamId(), aSendOrder);

Просмотреть файл

@ -215,7 +215,7 @@ class Http3Session final : public nsAHttpTransaction, public nsAHttpConnection {
uint64_t MaxDatagramSize(uint64_t aSessionId);
void SetSendOrder(Http3StreamBase* aStream, int64_t aSendOrder);
void SetSendOrder(Http3StreamBase* aStream, Maybe<int64_t> aSendOrder);
void CloseWebTransportConn();

Просмотреть файл

@ -660,7 +660,7 @@ void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
mSession->StreamHasDataToWrite(this);
}
void Http3WebTransportStream::SetSendOrder(int64_t aSendOrder) {
void Http3WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {
mSession->SetSendOrder(this, aSendOrder);
}

Просмотреть файл

@ -48,7 +48,7 @@ class Http3WebTransportStream final : public Http3StreamBase,
}
Http3Stream* GetHttp3Stream() override { return nullptr; }
void SetSendOrder(int64_t aSendOrder);
void SetSendOrder(Maybe<int64_t> aSendOrder);
[[nodiscard]] nsresult ReadSegments() override;
[[nodiscard]] nsresult WriteSegments() override;

Просмотреть файл

@ -223,7 +223,13 @@ NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) {
return NS_OK;
}
NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(int64_t aSendOrder) {
NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(Maybe<int64_t> aSendOrder) {
if (!OnSocketThread()) {
return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"SetSendOrder", [stream = mWebTransportStream, aSendOrder]() {
stream->SetSendOrder(aSendOrder);
}));
}
mWebTransportStream->SetSendOrder(aSendOrder);
return NS_OK;
}
@ -338,8 +344,10 @@ WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
const char* aBuf, uint32_t aCount, uint32_t* aResult) {
LOG(("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes",
this, aCount));
LOG(
("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes, "
"first byte %c",
this, aCount, aBuf[0]));
return mStream->Write(aBuf, aCount, aResult);
}

Просмотреть файл

@ -38,7 +38,7 @@ class WebTransportStreamProxy final
NS_IMETHOD GetOutputStream(nsIAsyncOutputStream** aOut) override;
NS_IMETHOD GetStreamId(uint64_t* aId) override;
NS_IMETHOD SetSendOrder(int64_t aSendOrder) override;
NS_IMETHOD SetSendOrder(Maybe<int64_t> aSendOrder) override;
private:
virtual ~WebTransportStreamProxy();

Просмотреть файл

@ -47,7 +47,7 @@ interface nsIWebTransport : nsISupports {
in WebTransportSessionEventListener aListener,
in const_MaybeClientInfoRef aClientInfo);
// Asynchronously get states.
// Asynchronously get stats.
void getStats();
// Close the session.

Просмотреть файл

@ -11,12 +11,15 @@ interface nsIInputStreamCallback;
interface nsIEventTarget;
%{C++
#include "mozilla/Maybe.h"
namespace mozilla {
class TimeStamp;
}
%}
native TimeStamp(mozilla::TimeStamp);
native MaybeInt64(mozilla::Maybe<int64_t>);
[builtinclass, scriptable, uuid(ccc3e685-8411-48f0-8b3e-ff6d1fae4809)]
interface nsIWebTransportSendStreamStats : nsISupports {
@ -62,7 +65,7 @@ interface nsIWebTransportSendStream : nsISupports {
void getSendStreamStats(in nsIWebTransportStreamStatsCallback aCallback);
readonly attribute nsIAsyncOutputStream outputStream;
readonly attribute uint64_t streamId;
void setSendOrder(in int64_t aSendOrder);
[noscript] void setSendOrder(in MaybeInt64 aSendOrder);
};
[builtinclass, scriptable, uuid(f9ecb509-36db-4689-97d6-137639a08750)]
@ -82,5 +85,5 @@ interface nsIWebTransportBidirectionalStream : nsISupports {
readonly attribute nsIAsyncInputStream inputStream;
readonly attribute nsIAsyncOutputStream outputStream;
readonly attribute uint64_t streamId;
void setSendOrder(in int64_t aSendOrder);
[noscript] void setSendOrder(in MaybeInt64 aSendOrder);
};

Просмотреть файл

@ -149,9 +149,10 @@ class NeqoHttp3Conn final {
aResult);
}
nsresult WebTransportSetSendOrder(uint64_t aSessionId, int64_t aSendOrder) {
nsresult WebTransportSetSendOrder(uint64_t aSessionId,
Maybe<int64_t> aSendOrder) {
return neqo_http3conn_webtransport_set_sendorder(this, aSessionId,
aSendOrder);
aSendOrder.ptrOr(nullptr));
}
private:

Просмотреть файл

@ -1346,13 +1346,15 @@ pub extern "C" fn neqo_http3conn_webtransport_max_datagram_size(
pub extern "C" fn neqo_http3conn_webtransport_set_sendorder(
conn: &mut NeqoHttp3Conn,
stream_id: u64,
_sendorder: i64,
sendorder: *const i64,
) -> nsresult {
match conn
.conn
.webtransport_set_sendorder(StreamId::from(stream_id), None)
{
Ok(()) => NS_OK,
Err(_) => NS_ERROR_UNEXPECTED,
unsafe {
match conn
.conn
.webtransport_set_sendorder(StreamId::from(stream_id), sendorder.as_ref().copied())
{
Ok(()) => NS_OK,
Err(_) => NS_ERROR_UNEXPECTED,
}
}
}

Просмотреть файл

@ -5,9 +5,6 @@
[WebTransport interface: webTransport must inherit property "draining" with the proper type]
expected: FAIL
[WebTransportSendStream interface: attribute sendOrder]
expected: FAIL
[idlharness.https.any.serviceworker.html]
[WebTransport interface: attribute draining]
@ -16,9 +13,6 @@
[WebTransport interface: webTransport must inherit property "draining" with the proper type]
expected: FAIL
[WebTransportSendStream interface: attribute sendOrder]
expected: FAIL
[idlharness.https.any.sharedworker.html]
[WebTransport interface: attribute draining]
@ -27,9 +21,6 @@
[WebTransport interface: webTransport must inherit property "draining" with the proper type]
expected: FAIL
[WebTransportSendStream interface: attribute sendOrder]
expected: FAIL
[idlharness.https.any.worker.html]
[WebTransport interface: attribute draining]
@ -37,6 +28,3 @@
[WebTransport interface: webTransport must inherit property "draining" with the proper type]
expected: FAIL
[WebTransportSendStream interface: attribute sendOrder]
expected: FAIL

Просмотреть файл

@ -1,79 +1,23 @@
[sendorder.https.any.html]
[WebTransport client should be able to create and handle a bidirectional stream with sendOrder]
expected: FAIL
[WebTransport client should be able to modify unset sendOrder after stream creation]
expected: FAIL
[WebTransport client should be able to modify existing sendOrder after stream creation]
expected: FAIL
[WebTransport sendorder should not starve a stream without sendorder]
expected: FAIL
[WebTransport sendorder should starve a lower priority stream]
expected: FAIL
[sendorder.https.any.sharedworker.html]
expected: [OK]
[WebTransport sendorder should starve a lower priority stream, variant 2]
expected: FAIL
expected: [PASS, TIMEOUT]
[sendorder.https.any.worker.html]
[WebTransport client should be able to create and handle a bidirectional stream with sendOrder]
expected: FAIL
[WebTransport client should be able to modify unset sendOrder after stream creation]
expected: FAIL
[WebTransport client should be able to modify existing sendOrder after stream creation]
expected: FAIL
[WebTransport sendorder should not starve a stream without sendorder]
expected: FAIL
[WebTransport sendorder should starve a lower priority stream]
expected: FAIL
expected: [OK]
[WebTransport sendorder should starve a lower priority stream, variant 2]
expected: FAIL
[sendorder.https.any.sharedworker.html]
[WebTransport client should be able to create and handle a bidirectional stream with sendOrder]
expected: FAIL
[WebTransport client should be able to modify unset sendOrder after stream creation]
expected: FAIL
[WebTransport client should be able to modify existing sendOrder after stream creation]
expected: FAIL
[WebTransport sendorder should not starve a stream without sendorder]
expected: FAIL
[WebTransport sendorder should starve a lower priority stream]
expected: FAIL
[WebTransport sendorder should starve a lower priority stream, variant 2]
expected: FAIL
expected: [PASS, TIMEOUT]
[sendorder.https.any.serviceworker.html]
[WebTransport client should be able to create and handle a bidirectional stream with sendOrder]
expected: FAIL
[WebTransport client should be able to modify unset sendOrder after stream creation]
expected: FAIL
[WebTransport client should be able to modify existing sendOrder after stream creation]
expected: FAIL
[WebTransport sendorder should not starve a stream without sendorder]
expected: FAIL
[WebTransport sendorder should starve a lower priority stream]
expected: FAIL
expected: [OK]
[WebTransport sendorder should starve a lower priority stream, variant 2]
expected: FAIL
expected: [PASS, TIMEOUT]
[sendorder.https.any.html]
expected: [OK]
[WebTransport sendorder should starve a lower priority stream, variant 2]
expected: [PASS, TIMEOUT]

Просмотреть файл

@ -55,197 +55,3 @@ promise_test(async t => {
// Note: this doesn't verify the underlying stack actually changes priority, just the API
// for controlling sendOrder
}, 'WebTransport client should be able to modify existing sendOrder after stream creation');
promise_test(async t => {
// Establish a WebTransport session.
const id = token();
const wt = new WebTransport(webtransport_url(`sendorder.py?token=${id}`));
await wt.ready;
const bytes_low = new Uint8Array(65536).fill('1');
const bytes_unordered = new Uint8Array(65536).fill('0');
// Create a bidirectional stream without sendOrder
const {writable: unordered_writable} = await wt.createBidirectionalStream();
// Create a bidirectional stream with sendOrder
const {writable: low_writable} = await wt.createBidirectionalStream({sendOrder: 1});
// Write a large block to the lower-priority stream, async
const low_writer = low_writable.getWriter();
assert_equals(low_writable.sendOrder, 1);
// Write a large block to the lower-priority stream, async
const unordered_writer = unordered_writable.getWriter();
// enough bytes written to ensure we'll fill the congestion window even
// on a local server
// this should be enough to require queuing
for (let i = 0; i < 30; i++) {
low_writer.write(bytes_low).catch(() => {});
}
for (let i = 0; i < 30; i++) {
unordered_writer.write(bytes_unordered).catch(() => {});
}
await Promise.all([low_writer.close(), unordered_writer.close()]);
// Read the data - first byte for each data reception
const reply = await query(id);
// If unordered data avoids starvation, some of it will come in before the end
// of the sendordered data.
// first packet normally will be '1', since that would likely
// start being sent before unordered data starts queuing, but that's
// not required by the spec, just that the unordered stream isn't starved
//assert_equals(reply[0], 1);
// Scan for the first 0 after we get a 1, then verify that more 1's come in after the first 0
let ok = false;
for (i = 0; i < reply.length; i++) {
if (reply[i] == 1) {
// scan for a 0
for (; i < reply.length; i++) {
if (reply[i] == 0) {
for (; i < reply.length; i++) {
if (reply[i] == 1) {
// some unordered data came in before sendordered data, we're good
ok = true;
break;
}
}
break;
}
}
break;
}
}
assert_true(ok);
}, 'WebTransport sendorder should not starve a stream without sendorder');
promise_test(async t => {
// Establish a WebTransport session.
const id = token();
const wt = new WebTransport(webtransport_url(`sendorder.py?token=${id}`));
await wt.ready;
const bytes_low = new Uint8Array(65536).fill('1');
const bytes_unordered = new Uint8Array(65536).fill('0');
const bytes_high = new Uint8Array(65536).fill('2');
// Create a bidirectional stream without sendOrder
const {writable: unordered_writable} = await wt.createBidirectionalStream();
// Create a bidirectional stream with sendOrder
const {writable: low_writable} = await wt.createBidirectionalStream({sendOrder: 1});
// Create a second bidirectional stream with higher sendOrder
const {writable: high_writable} = await wt.createBidirectionalStream({sendOrder: 2});
// Write a large block to the lower-priority stream, async
const unordered_writer = unordered_writable.getWriter();
// Write a large block to the lower-priority stream, async
const low_writer = low_writable.getWriter();
assert_equals(low_writable.sendOrder, 1);
const high_writer = high_writable.getWriter();
assert_equals(high_writable.sendOrder, 2);
// enough bytes written to ensure we'll fill the congestion window even
// on a local server
// this should be enough to require queuing
for (let i = 0; i < 30; i++) {
unordered_writer.write(bytes_unordered).catch(() => {});
}
for (let i = 0; i < 30; i++) {
low_writer.write(bytes_low).catch(() => {});
}
// these should jump the queue and get sent before the low-priority data finishes
for (let i = 0; i < 30; i++) {
high_writer.write(bytes_high).catch(() => {});
}
await Promise.all([low_writer.close(), unordered_writer.close(), high_writer.close()]);
// Read the data - first byte for each data reception
const reply = await query(id);
// If high priority data gets prioritized, it won't be last received. If
// it isn't prioritized, it will likely come in after all the
// low-priority data. The first packet normally will be '0' (unordered),
// since that would likely start being sent before low_priority data
// shows up in the queue (and then they'll round-robin until
// high-priority data gets queued). Some low-priority data will likely
// start coming in before the high priority data gets queued; after high
// priority data is queued it should jump ahead of the low-priority data
// (and interleave with unordered). Some low-priority data may
// interleave with high-priority data, since it doesn't get queued all at
// once.
assert_true(reply[reply.length-1] != 2);
}, 'WebTransport sendorder should starve a lower priority stream');
promise_test(async t => {
// Establish a WebTransport session.
const id = token();
const wt = new WebTransport(webtransport_url(`sendorder.py?token=${id}`));
await wt.ready;
const bytes_low = new Uint8Array(65536).fill('1');
const bytes_unordered = new Uint8Array(65536).fill('0');
const bytes_high = new Uint8Array(65536).fill('2');
// Create a bidirectional stream without sendOrder
const {writable: unordered_writable} = await wt.createBidirectionalStream();
// Create a bidirectional stream with sendOrder
const {writable: low_writable} = await wt.createBidirectionalStream({sendOrder: 1});
// Create a second bidirectional stream with higher sendOrder
const {writable: high_writable} = await wt.createBidirectionalStream({sendOrder: 2});
// Write a large block to the lower-priority stream, async
const unordered_writer = unordered_writable.getWriter();
// Write a large block to the lower-priority stream, async
const low_writer = low_writable.getWriter();
assert_equals(low_writable.sendOrder, 1);
const high_writer = high_writable.getWriter();
assert_equals(high_writable.sendOrder, 2);
// enough bytes written to ensure we'll fill the congestion window even
// on a local server
// this should be enough to require queuing
for (let i = 0; i < 30; i++) {
unordered_writer.write(bytes_unordered).catch(() => {});
}
// Alternate version where high-priority data should always come in
// before low-priority, assuming we've saturated the output queue and can
// feed data in faster than it goes through.
for (let i = 0; i < 30; i++) {
high_writer.write(bytes_high).catch(() => {});
}
for (let i = 0; i < 30; i++) {
low_writer.write(bytes_low).catch(() => {});
}
await Promise.all([low_writer.close(), unordered_writer.close(), high_writer.close()]);
// Read the data - first byte for each data reception
const reply = await query(id);
// Scan for the last 2, and verify there are no 1's before it
let ok = true;
for (i = 0; i < reply.length; i++) {
if (reply[i] == 1) {
// scan for a 2
for (; i < reply.length; i++) {
if (reply[i] == 2) {
// priority 1 data should never jump in front of priority 2
ok = false;
break;
}
}
break;
}
}
assert_true(ok);
}, 'WebTransport sendorder should starve a lower priority stream, variant 2');
// XXX add tests for unordered vs ordered