Bug 1547082 - Allow simulating the latency and bandwidth when communicating with a cloud based replaying process, r=loganfsmyth.

Differential Revision: https://phabricator.services.mozilla.com/D29107

--HG--
extra : rebase_source : def4a6337cf030f3fc6046c46e4cc7168a7ef00c
This commit is contained in:
Brian Hackett 2019-04-27 05:26:44 -10:00
Родитель b72f8bf1f1
Коммит d2dbf7829d
6 изменённых файлов: 129 добавлений и 21 удалений

Просмотреть файл

@ -53,6 +53,8 @@ void OpenChannel(base::ProcessId aMiddlemanPid, uint32_t aChannelId,
} // namespace parent
static void InitializeSimulatedDelayState();
struct HelloMessage {
int32_t mMagic;
};
@ -65,7 +67,8 @@ Channel::Channel(size_t aId, bool aMiddlemanRecording,
mConnectionFd(0),
mFd(0),
mMessageBuffer(nullptr),
mMessageBytes(0) {
mMessageBytes(0),
mSimulateDelays(false) {
MOZ_RELEASE_ASSERT(NS_IsMainThread());
if (IsRecordingOrReplaying()) {
@ -101,6 +104,12 @@ Channel::Channel(size_t aId, bool aMiddlemanRecording,
MOZ_RELEASE_ASSERT(rv >= 0);
}
// Simulate message delays in channels used to communicate with a replaying
// process.
mSimulateDelays = IsMiddleman() ? !aMiddlemanRecording : IsReplaying();
InitializeSimulatedDelayState();
Thread::SpawnNonRecordedThread(ThreadMain, this);
}
@ -129,6 +138,8 @@ void Channel::ThreadMain(void* aChannelArg) {
MOZ_RELEASE_ASSERT(rv == sizeof(msg));
}
channel->mStartTime = channel->mAvailableTime = TimeStamp::Now();
{
MonitorAutoLock lock(channel->mMonitor);
channel->mInitialized = true;
@ -144,7 +155,60 @@ void Channel::ThreadMain(void* aChannelArg) {
}
}
void Channel::SendMessage(const Message& aMsg) {
// Simulated one way latency between middleman and replaying children, in ms.
static size_t gSimulatedLatency;
// Simulated bandwidth for data transferred between middleman and replaying
// children, in bytes/ms.
static size_t gSimulatedBandwidth;
static size_t LoadEnvValue(const char* aEnv) {
const char* value = getenv(aEnv);
if (value && value[0]) {
int n = atoi(value);
return n >= 0 ? n : 0;
}
return 0;
}
static void InitializeSimulatedDelayState() {
// In preparation for shifting computing resources into the cloud when
// debugging a recorded execution (see bug 1547081), we need to be able to
// test expected performance when there is a significant distance between the
// user's machine (running the UI, middleman, and recording process) and
// machines in the cloud (running replaying processes). To assess this
// expected performance, the environment variables below can be used to
// specify the one-way latency and bandwidth to simulate for connections
// between the middleman and replaying processes.
//
// This simulation is approximate: the bandwidth tracked is per connection
// instead of the total across all connections, and network restrictions are
// not yet simulated when transferring graphics data.
//
// If there are multiple channels then we will do this initialization multiple
// times, so this needs to be idempotent.
gSimulatedLatency = LoadEnvValue("MOZ_RECORD_REPLAY_SIMULATED_LATENCY");
gSimulatedBandwidth = LoadEnvValue("MOZ_RECORD_REPLAY_SIMULATED_BANDWIDTH");
}
static bool MessageSubjectToSimulatedDelay(MessageType aType) {
switch (aType) {
// Middleman call messages are not subject to delays. When replaying
// children are in the cloud they will use a local process to perform
// middleman calls.
case MessageType::MiddlemanCallResponse:
case MessageType::MiddlemanCallRequest:
case MessageType::ResetMiddlemanCalls:
// Don't call system functions when we're in the process of crashing.
case MessageType::BeginFatalError:
case MessageType::FatalError:
return false;
default:
return true;
}
}
void Channel::SendMessage(Message&& aMsg) {
MOZ_RELEASE_ASSERT(NS_IsMainThread() ||
aMsg.mType == MessageType::BeginFatalError ||
aMsg.mType == MessageType::FatalError ||
@ -160,6 +224,30 @@ void Channel::SendMessage(const Message& aMsg) {
PrintMessage("SendMsg", aMsg);
if (gSimulatedLatency &&
gSimulatedBandwidth &&
mSimulateDelays &&
MessageSubjectToSimulatedDelay(aMsg.mType)) {
AutoEnsurePassThroughThreadEvents pt;
// Find the time this message will start sending.
TimeStamp sendTime = TimeStamp::Now();
if (sendTime < mAvailableTime) {
sendTime = mAvailableTime;
}
// Find the time spent sending the message over the channel.
size_t sendDurationMs = aMsg.mSize / gSimulatedBandwidth;
mAvailableTime = sendTime + TimeDuration::FromMilliseconds(sendDurationMs);
// The receive time of the message is the time the message finishes sending
// plus the connection latency.
TimeStamp receiveTime =
mAvailableTime + TimeDuration::FromMilliseconds(gSimulatedLatency);
aMsg.mReceiveTime = (receiveTime - mStartTime).ToMilliseconds();
}
const char* ptr = (const char*)&aMsg;
size_t nbytes = aMsg.mSize;
while (nbytes) {
@ -227,6 +315,16 @@ Message::UniquePtr Channel::WaitForMessage() {
}
mMessageBytes = remaining;
// If there is a simulated delay on the message, wait until it completes.
if (res->mReceiveTime) {
TimeStamp receiveTime =
mStartTime + TimeDuration::FromMilliseconds(res->mReceiveTime);
while (receiveTime > TimeStamp::Now()) {
MonitorAutoLock lock(mMonitor);
mMonitor.WaitUntil(receiveTime);
}
}
PrintMessage("RecvMsg", *res);
return res;
}
@ -269,14 +367,14 @@ void Channel::PrintMessage(const char* aPrefix, const Message& aMsg) {
case MessageType::DebuggerRequest: {
const DebuggerRequestMessage& nmsg = (const DebuggerRequestMessage&)aMsg;
data = NS_ConvertUTF16toUTF8(
nsDependentString(nmsg.Buffer(), nmsg.BufferSize()));
nsDependentSubstring(nmsg.Buffer(), nmsg.BufferSize()));
break;
}
case MessageType::DebuggerResponse: {
const DebuggerResponseMessage& nmsg =
(const DebuggerResponseMessage&)aMsg;
data = NS_ConvertUTF16toUTF8(
nsDependentString(nmsg.Buffer(), nmsg.BufferSize()));
nsDependentSubstring(nmsg.Buffer(), nmsg.BufferSize()));
break;
}
case MessageType::SetIsActive: {

Просмотреть файл

@ -152,11 +152,16 @@ enum class MessageType {
struct Message {
MessageType mType;
// When simulating message delays, the time this message should be received,
// relative to when the channel was opened.
uint32_t mReceiveTime;
// Total message size, including the header.
uint32_t mSize;
protected:
Message(MessageType aType, uint32_t aSize) : mType(aType), mSize(aSize) {
Message(MessageType aType, uint32_t aSize)
: mType(aType), mReceiveTime(0), mSize(aSize) {
MOZ_RELEASE_ASSERT(mSize >= sizeof(*this));
}
@ -467,6 +472,16 @@ class Channel {
// The number of bytes of data already in the message buffer.
size_t mMessageBytes;
// Whether this channel is subject to message delays during simulation.
bool mSimulateDelays;
// The time this channel was opened, for use in simulating message delays.
TimeStamp mStartTime;
// When simulating message delays, the time at which old messages will have
// finished sending and new messages may be sent.
TimeStamp mAvailableTime;
// If spew is enabled, print a message and associated info to stderr.
void PrintMessage(const char* aPrefix, const Message& aMsg);
@ -486,7 +501,7 @@ class Channel {
// Send a message to the other side of the channel. This must be called on
// the main thread, except for fatal error messages.
void SendMessage(const Message& aMsg);
void SendMessage(Message&& aMsg);
};
// Command line option used to specify the middleman pid for a child process.

Просмотреть файл

@ -382,7 +382,7 @@ void ReportFatalError(const Maybe<MinidumpInfo>& aMinidump, const char* aFormat,
msgBuf[sizeof(msgBuf) - 1] = 0;
// Don't take the message lock when sending this, to avoid touching the heap.
gChannel->SendMessage(*msg);
gChannel->SendMessage(std::move(*msg));
DirectPrint("***** Fatal Record/Replay Error *****\n");
DirectPrint(buf);
@ -677,7 +677,7 @@ void HitExecutionPoint(const js::ExecutionPoint& aPoint,
void RespondToRequest(const js::CharBuffer& aBuffer) {
DebuggerResponseMessage* msg =
DebuggerResponseMessage::New(aBuffer.begin(), aBuffer.length());
gChannel->SendMessage(*msg);
gChannel->SendMessage(std::move(*msg));
free(msg);
}
@ -693,7 +693,7 @@ void SendMiddlemanCallRequest(const char* aInputData, size_t aInputSize,
MiddlemanCallRequestMessage* msg =
MiddlemanCallRequestMessage::New(aInputData, aInputSize);
gChannel->SendMessage(*msg);
gChannel->SendMessage(std::move(*msg));
free(msg);
while (!gCallResponseMessage) {

Просмотреть файл

@ -99,7 +99,7 @@ void ChildProcessInfo::OnIncomingMessage(const Message& aMsg,
&outputData);
Message::UniquePtr response(MiddlemanCallResponseMessage::New(
outputData.begin(), outputData.length()));
SendMessage(*response);
SendMessage(std::move(*response));
break;
}
case MessageType::ResetMiddlemanCalls:
@ -110,7 +110,7 @@ void ChildProcessInfo::OnIncomingMessage(const Message& aMsg,
}
}
void ChildProcessInfo::SendMessage(const Message& aMsg) {
void ChildProcessInfo::SendMessage(Message&& aMsg) {
MOZ_RELEASE_ASSERT(NS_IsMainThread());
// Update paused state.
@ -128,7 +128,7 @@ void ChildProcessInfo::SendMessage(const Message& aMsg) {
}
mLastMessageTime = TimeStamp::Now();
mChannel->SendMessage(aMsg);
mChannel->SendMessage(std::move(aMsg));
}
///////////////////////////////////////////////////////////////////////////////
@ -208,7 +208,7 @@ void ChildProcessInfo::LaunchSubprocess(
WaitUntilPaused();
MOZ_RELEASE_ASSERT(gIntroductionMessage);
SendMessage(*gIntroductionMessage);
SendMessage(std::move(*gIntroductionMessage));
// Always save the first checkpoint in replaying child processes.
if (!IsRecording()) {

Просмотреть файл

@ -520,7 +520,7 @@ static bool Middleman_SendDebuggerRequest(JSContext* aCx, unsigned aArgc,
DebuggerRequestMessage* msg = DebuggerRequestMessage::New(
requestBuffer.begin(), requestBuffer.length());
child->SendMessage(*msg);
child->SendMessage(std::move(*msg));
free(msg);
// Wait for the child to respond to the query.

Просмотреть файл

@ -159,13 +159,8 @@ class ChildProcessInfo {
bool IsRecording() { return mRecording; }
bool IsPaused() { return mPaused; }
void SendMessage(const Message& aMessage);
// Recover to the same state as another process.
void Recover(ChildProcessInfo* aTargetProcess);
// Recover to be paused at a checkpoint with no breakpoints set.
void RecoverToCheckpoint(size_t aCheckpoint);
// Send a message over the underlying channel.
void SendMessage(Message&& aMessage);
// Handle incoming messages from this process (and no others) until it pauses.
// The return value is null if it is already paused, otherwise the message