Bug 1816439 - Fix races between OnSessionReady and OnSessionClosed, r=jesup,necko-reviewers

Since OnSesionReady is called on the main thread and others event not, there are lots of races need to be addressed.
This patch also annotates some members with correct thread safety macros.

Differential Revision: https://phabricator.services.mozilla.com/D170224
This commit is contained in:
Kershaw Chang 2023-02-24 09:46:52 +00:00
Родитель b573807379
Коммит a95e029f9f
4 изменённых файлов: 172 добавлений и 82 удалений

Просмотреть файл

@ -103,7 +103,10 @@ void WebTransportParent::Create(
InvokeAsync(mSocketThread, __func__,
[parentEndpoint = std::move(aParentEndpoint), runnable = r,
resolver = std::move(aResolver), p = RefPtr{this}]() mutable {
p->mResolver = resolver;
{
MutexAutoLock lock(p->mMutex);
p->mResolver = resolver;
}
LOG(("Binding parent endpoint"));
if (!parentEndpoint.Bind(p)) {
@ -122,9 +125,17 @@ void WebTransportParent::Create(
[p = RefPtr{this}](
const CreateWebTransportPromise::ResolveOrRejectValue& aValue) {
if (aValue.IsReject()) {
p->mResolver(ResolveType(
aValue.RejectValue(),
static_cast<uint8_t>(WebTransportReliabilityMode::Pending)));
std::function<void(ResolveType)> resolver;
{
MutexAutoLock lock(p->mMutex);
resolver = std::move(p->mResolver);
}
if (resolver) {
resolver(
ResolveType(aValue.RejectValue(),
static_cast<uint8_t>(
WebTransportReliabilityMode::Pending)));
}
}
});
}
@ -139,8 +150,11 @@ IPCResult WebTransportParent::RecvClose(const uint32_t& aCode,
const nsACString& aReason) {
LOG(("Close for %p received, code = %u, reason = %s", this, aCode,
PromiseFlatCString(aReason).get()));
MOZ_ASSERT(!mClosed);
mClosed.Flip();
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(!mClosed);
mClosed.Flip();
}
mWebTransport->CloseSession(aCode, aReason);
Close();
return IPC_OK();
@ -309,15 +323,22 @@ WebTransportParent::OnSessionReady(uint64_t aSessionId) {
LOG(("Created web transport session, sessionID = %" PRIu64 ", for %p",
aSessionId, this));
mSessionReady = true;
mOwningEventTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportParent::OnSessionReady", [self = RefPtr{this}] {
if (!self->IsClosed() && self->mResolver) {
MutexAutoLock lock(self->mMutex);
if (!self->mClosed && self->mResolver) {
self->mResolver(ResolveType(
NS_OK, static_cast<uint8_t>(
WebTransportReliabilityMode::Supports_unreliable)));
self->mResolver = nullptr;
if (self->mExecuteAfterResolverCallback) {
self->mExecuteAfterResolverCallback();
self->mExecuteAfterResolverCallback = nullptr;
}
} else {
if (self->IsClosed()) {
if (self->mClosed) {
LOG(("Session already closed at OnSessionReady %p", self.get()));
} else {
LOG(("No resolver at OnSessionReady %p", self.get()));
@ -353,7 +374,7 @@ WebTransportParent::OnSessionClosed(const uint32_t aErrorCode,
// we need better error propagation from lower-levels of http3
// webtransport session and it's subsequent error mapping to DOM.
// XXX See Bug 1806834
if (mResolver) {
if (!mSessionReady) {
LOG(("webtransport %p session creation failed code= %u, reason= %s", this,
aErrorCode, PromiseFlatCString(aReason).get()));
// we know we haven't gone Ready yet
@ -361,32 +382,51 @@ WebTransportParent::OnSessionClosed(const uint32_t aErrorCode,
mOwningEventTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportParent::OnSessionClosed",
[self = RefPtr{this}, result = rv] {
if (!self->IsClosed() && self->mResolver) {
MutexAutoLock lock(self->mMutex);
if (!self->mClosed && self->mResolver) {
self->mResolver(ResolveType(
result, static_cast<uint8_t>(
WebTransportReliabilityMode::Supports_unreliable)));
self->mResolver = nullptr;
}
}));
} else {
{
MutexAutoLock lock(mMutex);
if (mResolver) {
LOG(("[%p] NotifyRemoteClosed to be called later", this));
// NotifyRemoteClosed needs to wait until mResolver is invoked.
mExecuteAfterResolverCallback = [self = RefPtr{this}, aErrorCode,
reason = nsCString{aReason}]() {
self->NotifyRemoteClosed(aErrorCode, reason);
};
return NS_OK;
}
}
// https://w3c.github.io/webtransport/#web-transport-termination
// Step 1: Let cleanly be a boolean representing whether the HTTP/3
// stream associated with the CONNECT request that initiated
// transport.[[Session]] is in the "Data Recvd" state. [QUIC]
// XXX not calculated yet
LOG(("webtransport %p session remote closed code= %u, reason= %s", this,
aErrorCode, PromiseFlatCString(aReason).get()));
mSocketThread->Dispatch(NS_NewRunnableFunction(
__func__,
[self = RefPtr{this}, aErrorCode, reason = nsCString{aReason}]() {
// Tell the content side we were closed by the server
Unused << self->SendRemoteClosed(/*XXX*/ true, aErrorCode, reason);
// Let the other end shut down the IPC channel after RecvClose()
}));
NotifyRemoteClosed(aErrorCode, aReason);
}
return NS_OK;
}
void WebTransportParent::NotifyRemoteClosed(uint32_t aErrorCode,
const nsACString& aReason) {
LOG(("webtransport %p session remote closed code= %u, reason= %s", this,
aErrorCode, PromiseFlatCString(aReason).get()));
mSocketThread->Dispatch(NS_NewRunnableFunction(
__func__,
[self = RefPtr{this}, aErrorCode, reason = nsCString{aReason}]() {
// Tell the content side we were closed by the server
Unused << self->SendRemoteClosed(/*XXX*/ true, aErrorCode, reason);
// Let the other end shut down the IPC channel after RecvClose()
}));
}
// This method is currently not used by WebTransportSessionProxy to inform of
// any session related events. All notification is recieved via
// WebTransportSessionProxy::OnSessionReady and

Просмотреть файл

@ -47,16 +47,23 @@ class WebTransportParent : public PWebTransportParent,
void ActorDestroy(ActorDestroyReason aWhy) override;
bool IsClosed() const { return mClosed; }
protected:
virtual ~WebTransportParent();
private:
void NotifyRemoteClosed(uint32_t aErrorCode, const nsACString& aReason);
using ResolveType = Tuple<const nsresult&, const uint8_t&>;
nsCOMPtr<nsISerialEventTarget> mSocketThread;
std::function<void(ResolveType)> mResolver;
FlippedOnce<false> mClosed;
Atomic<bool> mSessionReady{false};
mozilla::Mutex mMutex{"WebTransportParent::mMutex"};
std::function<void(ResolveType)> mResolver MOZ_GUARDED_BY(mMutex);
// This is needed because mResolver is resolved on the background thread and
// OnSessionClosed is called on the socket thread.
std::function<void()> mExecuteAfterResolverCallback MOZ_GUARDED_BY(mMutex);
FlippedOnce<false> mClosed MOZ_GUARDED_BY(mMutex);
nsCOMPtr<nsIWebTransport> mWebTransport;
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
};

Просмотреть файл

@ -124,9 +124,10 @@ WebTransportSessionProxy::RetargetTo(nsIEventTarget* aTarget) {
if (mState != WebTransportSessionProxyState::ACTIVE) {
return NS_ERROR_UNEXPECTED;
}
mTarget = aTarget;
}
mTarget = aTarget;
return NS_OK;
}
@ -136,8 +137,8 @@ WebTransportSessionProxy::GetStats() { return NS_ERROR_NOT_IMPLEMENTED; }
NS_IMETHODIMP
WebTransportSessionProxy::CloseSession(uint32_t status,
const nsACString& reason) {
MOZ_ASSERT(mTarget->IsOnCurrentThread());
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mTarget->IsOnCurrentThread());
mCloseStatus = status;
mReason = reason;
mListener = nullptr;
@ -511,7 +512,6 @@ WebTransportSessionProxy::OnStopRequest(nsIRequest* aRequest,
uint32_t closeStatus = 0;
uint64_t sessionId;
bool succeeded = false;
nsTArray<std::function<void()>> pendingEvents;
{
MutexAutoLock lock(mMutex);
switch (mState) {
@ -542,7 +542,6 @@ WebTransportSessionProxy::OnStopRequest(nsIRequest* aRequest,
sessionId = mSessionId;
listener = mListener;
ChangeState(WebTransportSessionProxyState::ACTIVE);
pendingEvents = std::move(mPendingEvents);
}
break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
@ -553,10 +552,20 @@ WebTransportSessionProxy::OnStopRequest(nsIRequest* aRequest,
if (listener) {
if (succeeded) {
listener->OnSessionReady(sessionId);
nsTArray<std::function<void()>> pendingEvents;
{
MutexAutoLock lock(mMutex);
pendingEvents = std::move(mPendingEvents);
mStopRequestCalled = true;
}
if (!pendingEvents.IsEmpty()) {
for (const auto& event : pendingEvents) {
event();
}
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::DispatchPendingEvents",
[pendingEvents = std::move(pendingEvents)]() {
for (const auto& event : pendingEvents) {
event();
}
}));
}
} else {
listener->OnSessionClosed(closeStatus,
@ -686,37 +695,46 @@ WebTransportSessionProxy::OnSessionReadyInternal(
NS_IMETHODIMP
WebTransportSessionProxy::OnIncomingStreamAvailableInternal(
Http3WebTransportStream* aStream) {
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
RefPtr<Http3WebTransportStream> stream = aStream;
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnIncomingStreamAvailableInternal",
[self{std::move(self)}, stream{std::move(stream)}]() {
self->OnIncomingStreamAvailableInternal(stream);
}));
return NS_OK;
}
nsCOMPtr<WebTransportSessionEventListener> listener;
{
MutexAutoLock lock(mMutex);
LOG(
("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p "
"mState=%d "
"mStopRequestCalled=%d",
this, mState, mStopRequestCalled));
// Since OnSessionReady on the listener is called on the main thread,
// OnIncomingStreamAvailableInternal and OnSessionReady can be racy. If
// OnStopRequest is not called yet, OnIncomingStreamAvailableInternal needs
// to wait.
if (!mStopRequestCalled) {
mPendingEvents.AppendElement(
[self = RefPtr{this}, stream = RefPtr{aStream}]() {
self->OnIncomingStreamAvailableInternal(stream);
});
return NS_OK;
}
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
RefPtr<Http3WebTransportStream> stream = aStream;
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnIncomingStreamAvailableInternal",
[self{std::move(self)}, stream{std::move(stream)}]() {
self->OnIncomingStreamAvailableInternal(stream);
}));
return NS_OK;
}
LOG(
("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p "
"mState=%d mListener=%p",
this, mState, mListener.get()));
switch (mState) {
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
// OnSessionReady is not called yet, so we need to wait.
mPendingEvents.AppendElement(
[self = RefPtr{this}, stream = RefPtr{aStream}]() {
self->OnIncomingStreamAvailableInternal(stream);
});
break;
case WebTransportSessionProxyState::ACTIVE:
listener = mListener;
break;
default:
return NS_ERROR_ABORT;
if (mState == WebTransportSessionProxyState::ACTIVE) {
listener = mListener;
} else {
MOZ_ASSERT(false, "mState is not ACTIVE");
}
}
@ -756,8 +774,23 @@ NS_IMETHODIMP
WebTransportSessionProxy::OnSessionClosed(uint32_t status,
const nsACString& reason) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("WebTransportSessionProxy::OnSessionClosed"));
MutexAutoLock lock(mMutex);
LOG(
("WebTransportSessionProxy::OnSessionClosed %p mState=%d "
"mStopRequestCalled=%d",
this, mState, mStopRequestCalled));
// Since OnSessionReady on the listener is called on the main thread,
// OnSessionClosed and OnSessionReady can be racy. If OnStopRequest is not
// called yet, OnSessionClosed needs to wait.
if (!mStopRequestCalled) {
nsCString closeReason(reason);
mPendingEvents.AppendElement(
[self = RefPtr{this}, status(status), closeReason(closeReason)]() {
Unused << self->OnSessionClosed(status, closeReason);
});
return NS_OK;
}
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
@ -789,6 +822,8 @@ void WebTransportSessionProxy::CallOnSessionClosedLocked() {
}
void WebTransportSessionProxy::CallOnSessionClosed() {
mMutex.AssertCurrentThreadOwns();
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
@ -797,8 +832,6 @@ void WebTransportSessionProxy::CallOnSessionClosed() {
return;
}
mMutex.AssertCurrentThreadOwns();
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
nsAutoCString reason;
@ -897,11 +930,11 @@ void WebTransportSessionProxy::ChangeState(
void WebTransportSessionProxy::NotifyDatagramReceived(
nsTArray<uint8_t>&& aData) {
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mTarget->IsOnCurrentThread());
if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) {
return;
}
@ -915,12 +948,15 @@ NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceivedInternal(
nsTArray<uint8_t>&& aData) {
MOZ_ASSERT(OnSocketThread());
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnDatagramReceived",
[self = RefPtr{this}, data{std::move(aData)}]() mutable {
self->NotifyDatagramReceived(std::move(data));
}));
{
MutexAutoLock lock(mMutex);
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnDatagramReceived",
[self = RefPtr{this}, data{std::move(aData)}]() mutable {
self->NotifyDatagramReceived(std::move(data));
}));
}
}
NotifyDatagramReceived(std::move(aData));
@ -933,11 +969,11 @@ NS_IMETHODIMP WebTransportSessionProxy::OnDatagramReceived(
}
void WebTransportSessionProxy::OnMaxDatagramSizeInternal(uint64_t aSize) {
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mTarget->IsOnCurrentThread());
if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) {
return;
}
@ -950,12 +986,15 @@ void WebTransportSessionProxy::OnMaxDatagramSizeInternal(uint64_t aSize) {
NS_IMETHODIMP WebTransportSessionProxy::OnMaxDatagramSize(uint64_t aSize) {
MOZ_ASSERT(OnSocketThread());
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(
NS_NewRunnableFunction("WebTransportSessionProxy::OnMaxDatagramSize",
[self = RefPtr{this}, size(aSize)] {
self->OnMaxDatagramSizeInternal(size);
}));
{
MutexAutoLock lock(mMutex);
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(
NS_NewRunnableFunction("WebTransportSessionProxy::OnMaxDatagramSize",
[self = RefPtr{this}, size(aSize)] {
self->OnMaxDatagramSizeInternal(size);
}));
}
}
OnMaxDatagramSizeInternal(aSize);
@ -964,11 +1003,10 @@ NS_IMETHODIMP WebTransportSessionProxy::OnMaxDatagramSize(uint64_t aSize) {
void WebTransportSessionProxy::OnOutgoingDatagramOutComeInternal(
uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) {
MOZ_ASSERT(mTarget->IsOnCurrentThread());
nsCOMPtr<WebTransportSessionEventListener> listener;
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mTarget->IsOnCurrentThread());
if (mState != WebTransportSessionProxyState::ACTIVE || !mListener) {
return;
}
@ -983,12 +1021,15 @@ WebTransportSessionProxy::OnOutgoingDatagramOutCome(
uint64_t aId, WebTransportSessionEventListener::DatagramOutcome aOutCome) {
MOZ_ASSERT(OnSocketThread());
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnOutgoingDatagramOutCome",
[self = RefPtr{this}, id(aId), outcome(aOutCome)] {
self->OnOutgoingDatagramOutComeInternal(id, outcome);
}));
{
MutexAutoLock lock(mMutex);
if (!mTarget->IsOnCurrentThread()) {
return mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnOutgoingDatagramOutCome",
[self = RefPtr{this}, id(aId), outcome(aOutCome)] {
self->OnOutgoingDatagramOutComeInternal(id, outcome);
}));
}
}
OnOutgoingDatagramOutComeInternal(aId, aOutCome);

Просмотреть файл

@ -176,9 +176,11 @@ class WebTransportSessionProxy final : public nsIWebTransport,
uint64_t mSessionId MOZ_GUARDED_BY(mMutex) = UINT64_MAX;
uint32_t mCloseStatus MOZ_GUARDED_BY(mMutex) = 0;
nsCString mReason MOZ_GUARDED_BY(mMutex);
bool mStopRequestCalled MOZ_GUARDED_BY(mMutex) = false;
// This is used to store events happened before OnSessionReady.
// Note that these events will be dispatched to the socket thread.
nsTArray<std::function<void()>> mPendingEvents MOZ_GUARDED_BY(mMutex);
nsCOMPtr<nsIEventTarget> mTarget;
nsCOMPtr<nsIEventTarget> mTarget MOZ_GUARDED_BY(mMutex);
};
} // namespace mozilla::net