Bug 1207753 - DataChannel thread-annotations r=bwc

Differential Revision: https://phabricator.services.mozilla.com/D130583
This commit is contained in:
Randell Jesup 2022-03-28 14:27:49 +00:00
Родитель ec34ccc22e
Коммит b012e973e6
2 изменённых файлов: 73 добавлений и 56 удалений

Просмотреть файл

@ -442,9 +442,9 @@ DataChannelConnection::DataChannelConnection(
#endif
}
bool DataChannelConnection::Init(const uint16_t aLocalPort,
const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize) {
bool DataChannelConnection::Init(
const uint16_t aLocalPort, const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize) {
ASSERT_WEBRTC(NS_IsMainThread());
struct sctp_initmsg initmsg;
@ -609,9 +609,11 @@ error_cleanup:
return false;
}
// Only called on MainThread, mMaxMessageSize is read on other threads
void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet,
uint64_t aMaxMessageSize) {
MutexAutoLock lock(mLock); // TODO: Needed?
ASSERT_WEBRTC(NS_IsMainThread());
MutexAutoLock lock(mLock);
if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
// Don't overwrite already set MMS with default values
@ -1556,6 +1558,7 @@ void DataChannelConnection::DeliverQueuedData(uint16_t stream) {
mLock.AssertCurrentThreadOwns();
mQueuedData.RemoveElementsBy([stream, this](const auto& dataItem) {
mLock.AssertCurrentThreadOwns();
const bool match = dataItem->mStream == stream;
if (match) {
DC_DEBUG(("Delivering queued data for stream %u, length %u", stream,

Просмотреть файл

@ -186,8 +186,7 @@ class DataChannelConnection final : public net::NeckoTargetHolder
void Stop();
void Close(DataChannel* aChannel);
// CloseLocked() must be called with mLock held
void CloseLocked(DataChannel* aChannel);
void CloseLocked(DataChannel* aChannel) REQUIRES(mLock);
void CloseAll();
// Returns a POSIX error code.
@ -205,18 +204,20 @@ class DataChannelConnection final : public net::NeckoTargetHolder
// Called on data reception from the SCTP library
// must(?) be public so my c->c++ trampoline can call it
// May be called with (STS thread) or without the lock
int ReceiveCallback(struct socket* sock, void* data, size_t datalen,
struct sctp_rcvinfo rcv, int flags);
struct sctp_rcvinfo rcv,
int flags);
// Find out state
enum { CONNECTING = 0U, OPEN = 1U, CLOSING = 2U, CLOSED = 3U };
Mutex mLock MOZ_UNANNOTATED;
Mutex mLock;
void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
nsIInputStream* aBlob);
bool SendDeferredMessages();
bool SendDeferredMessages() REQUIRES(mLock);
#ifdef SCTP_DTLS_SUPPORTED
int SctpDtlsOutput(void* addr, void* buffer, size_t length, uint8_t tos,
@ -245,14 +246,14 @@ class DataChannelConnection final : public net::NeckoTargetHolder
const Maybe<uint64_t>& aMaxMessageSize);
// Caller must hold mLock
uint16_t GetReadyState() const {
uint16_t GetReadyState() const REQUIRES(mLock) {
mLock.AssertCurrentThreadOwns();
return mState;
}
// Caller must hold mLock
void SetReadyState(const uint16_t aState);
void SetReadyState(const uint16_t aState) REQUIRES(mLock);
#ifdef SCTP_DTLS_SUPPORTED
static void DTLSConnectThread(void* data);
@ -260,62 +261,74 @@ class DataChannelConnection final : public net::NeckoTargetHolder
void SctpDtlsInput(const std::string& aTransportId,
const MediaPacket& packet);
#endif
DataChannel* FindChannelByStream(uint16_t stream);
uint16_t FindFreeStream();
bool RequestMoreStreams(int32_t aNeeded = 16);
uint32_t UpdateCurrentStreamIndex();
uint32_t GetCurrentStreamIndex();
int SendControlMessage(const uint8_t* data, uint32_t len, uint16_t stream);
int SendOpenAckMessage(uint16_t stream);
DataChannel* FindChannelByStream(uint16_t stream) REQUIRES(mLock);
uint16_t FindFreeStream() REQUIRES(mLock);
bool RequestMoreStreams(int32_t aNeeded = 16) REQUIRES(mLock);
uint32_t UpdateCurrentStreamIndex() REQUIRES(mLock);
uint32_t GetCurrentStreamIndex() REQUIRES(mLock);
int SendControlMessage(const uint8_t* data, uint32_t len, uint16_t stream)
REQUIRES(mLock);
int SendOpenAckMessage(uint16_t stream) REQUIRES(mLock);
int SendOpenRequestMessage(const nsACString& label,
const nsACString& protocol, uint16_t stream,
bool unordered, uint16_t prPolicy,
uint32_t prValue);
uint32_t prValue) REQUIRES(mLock);
bool SendBufferedMessages(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
size_t* aWritten);
int SendMsgInternal(OutgoingMsg& msg, size_t* aWritten);
int SendMsgInternalOrBuffer(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
OutgoingMsg& msg, bool& buffered,
size_t* aWritten);
size_t* aWritten) REQUIRES(mLock);
int SendDataMsgInternalOrBuffer(DataChannel& channel, const uint8_t* data,
size_t len, uint32_t ppid);
size_t len, uint32_t ppid) REQUIRES(mLock);
int SendDataMsg(DataChannel& channel, const uint8_t* data, size_t len,
uint32_t ppidPartial, uint32_t ppidFinal);
uint32_t ppidPartial, uint32_t ppidFinal) REQUIRES(mLock);
int SendDataMsgCommon(uint16_t stream, const nsACString& aMsg, bool isBinary);
void DeliverQueuedData(uint16_t stream);
void DeliverQueuedData(uint16_t stream) REQUIRES(mLock);
already_AddRefed<DataChannel> OpenFinish(
already_AddRefed<DataChannel>&& aChannel);
already_AddRefed<DataChannel>&& aChannel) REQUIRES(mLock);
void ProcessQueuedOpens();
void ClearResets();
void SendOutgoingStreamReset();
void ResetOutgoingStream(uint16_t stream);
void ProcessQueuedOpens() REQUIRES(mLock);
void ClearResets() REQUIRES(mLock);
void SendOutgoingStreamReset() REQUIRES(mLock);
void ResetOutgoingStream(uint16_t stream) REQUIRES(mLock);
void HandleOpenRequestMessage(
const struct rtcweb_datachannel_open_request* req, uint32_t length,
uint16_t stream);
uint16_t stream) REQUIRES(mLock);
void HandleOpenAckMessage(const struct rtcweb_datachannel_ack* ack,
uint32_t length, uint16_t stream);
void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream);
void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
REQUIRES(mLock);
uint8_t BufferMessage(nsACString& recvBuffer, const void* data,
uint32_t length, uint32_t ppid, int flags);
void HandleDataMessage(const void* buffer, size_t length, uint32_t ppid,
uint16_t stream, int flags);
uint16_t stream, int flags) REQUIRES(mLock);
void HandleDCEPMessage(const void* buffer, size_t length, uint32_t ppid,
uint16_t stream, int flags);
uint16_t stream, int flags) REQUIRES(mLock);
void HandleMessage(const void* buffer, size_t length, uint32_t ppid,
uint16_t stream, int flags);
void HandleAssociationChangeEvent(const struct sctp_assoc_change* sac);
void HandlePeerAddressChangeEvent(const struct sctp_paddr_change* spc);
void HandleRemoteErrorEvent(const struct sctp_remote_error* sre);
void HandleShutdownEvent(const struct sctp_shutdown_event* sse);
void HandleAdaptationIndication(const struct sctp_adaptation_event* sai);
void HandlePartialDeliveryEvent(const struct sctp_pdapi_event* spde);
void HandleSendFailedEvent(const struct sctp_send_failed_event* ssfe);
void HandleStreamResetEvent(const struct sctp_stream_reset_event* strrst);
void HandleStreamChangeEvent(const struct sctp_stream_change_event* strchg);
void HandleNotification(const union sctp_notification* notif, size_t n);
uint16_t stream, int flags) REQUIRES(mLock);
void HandleAssociationChangeEvent(const struct sctp_assoc_change* sac)
REQUIRES(mLock);
void HandlePeerAddressChangeEvent(const struct sctp_paddr_change* spc)
REQUIRES(mLock);
void HandleRemoteErrorEvent(const struct sctp_remote_error* sre)
REQUIRES(mLock);
void HandleShutdownEvent(const struct sctp_shutdown_event* sse)
REQUIRES(mLock);
void HandleAdaptationIndication(const struct sctp_adaptation_event* sai)
REQUIRES(mLock);
void HandlePartialDeliveryEvent(const struct sctp_pdapi_event* spde)
REQUIRES(mLock);
void HandleSendFailedEvent(const struct sctp_send_failed_event* ssfe)
REQUIRES(mLock);
void HandleStreamResetEvent(const struct sctp_stream_reset_event* strrst)
REQUIRES(mLock);
void HandleStreamChangeEvent(const struct sctp_stream_change_event* strchg)
REQUIRES(mLock);
void HandleNotification(const union sctp_notification* notif, size_t n)
REQUIRES(mLock);
#ifdef SCTP_DTLS_SUPPORTED
bool IsSTSThread() const {
@ -349,13 +362,15 @@ class DataChannelConnection final : public net::NeckoTargetHolder
bool LessThan(const RefPtr<DataChannel>& a1,
const RefPtr<DataChannel>& a2) const;
};
mutable Mutex mMutex MOZ_UNANNOTATED;
ChannelArray mChannels;
mutable Mutex mMutex;
ChannelArray mChannels GUARDED_BY(mMutex);
};
bool mSendInterleaved = false;
bool mSendInterleaved GUARDED_BY(mLock) = false;
// MainThread only
bool mMaxMessageSizeSet = false;
uint64_t mMaxMessageSize = 0;
// mMaxMessageSize is only set on MainThread, but read off-main-thread
uint64_t mMaxMessageSize GUARDED_BY(mLock) = 0;
// Main thread only
Maybe<bool> mAllocateEven;
// Data:
@ -367,21 +382,20 @@ class DataChannelConnection final : public net::NeckoTargetHolder
uint32_t mCurrentStream = 0;
nsRefPtrDeque<DataChannel> mPending;
// STS and main
size_t mNegotiatedIdLimit = 0; // GUARDED_BY(mConnection->mLock)
uint8_t mPendingType = PENDING_NONE;
size_t mNegotiatedIdLimit GUARDED_BY(mLock) = 0;
uint8_t mPendingType GUARDED_BY(mLock) = PENDING_NONE;
// holds data that's come in before a channel is open
nsTArray<UniquePtr<QueuedDataMessage>> mQueuedData;
nsTArray<UniquePtr<QueuedDataMessage>> mQueuedData GUARDED_BY(mLock);
// holds outgoing control messages
nsTArray<UniquePtr<BufferedOutgoingMsg>>
mBufferedControl; // GUARDED_BY(mConnection->mLock)
nsTArray<UniquePtr<BufferedOutgoingMsg>> mBufferedControl GUARDED_BY(mLock);
// Streams pending reset. Accessed from main and STS.
AutoTArray<uint16_t, 4> mStreamsResetting; // GUARDED_BY(mConnection->mLock)
AutoTArray<uint16_t, 4> mStreamsResetting GUARDED_BY(mLock);
// accessed from STS thread
struct socket* mMasterSocket = nullptr;
// cloned from mMasterSocket on successful Connect on STS thread
struct socket* mSocket = nullptr;
uint16_t mState = CLOSED; // Protected with mLock
uint16_t mState GUARDED_BY(mLock) = CLOSED; // Protected with mLock
#ifdef SCTP_DTLS_SUPPORTED
std::string mTransportId;
@ -563,8 +577,8 @@ class DataChannel {
nsTArray<UniquePtr<BufferedOutgoingMsg>>
mBufferedData; // GUARDED_BY(mConnection->mLock)
nsCOMPtr<nsISerialEventTarget> mMainThreadEventTarget;
mutable Mutex mStatsLock MOZ_UNANNOTATED; // protects mTrafficCounters
TrafficCounters mTrafficCounters;
mutable Mutex mStatsLock; // protects mTrafficCounters
TrafficCounters mTrafficCounters GUARDED_BY(mStatsLock);
};
// used to dispatch notifications of incoming data to the main thread