perf: Jobify SendBatchedMessages in UnityTransport [MTT-4999] (#2304)

* Make ErrorUtilities usable from Burst

* Jobify SendBatchedMessages
This commit is contained in:
Simon Lemay 2022-11-10 11:19:07 -05:00 коммит произвёл GitHub
Родитель 83096c8135
Коммит 3716e27656
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 90 добавлений и 71 удалений

Просмотреть файл

@ -10,8 +10,10 @@ using System.Collections.Generic;
using UnityEngine;
using NetcodeNetworkEvent = Unity.Netcode.NetworkEvent;
using TransportNetworkEvent = Unity.Networking.Transport.NetworkEvent;
using Unity.Burst;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Collections;
using Unity.Jobs;
using Unity.Networking.Transport;
using Unity.Networking.Transport.Relay;
using Unity.Networking.Transport.Utilities;
@ -51,50 +53,48 @@ namespace Unity.Netcode.Transports.UTP
/// </summary>
public static class ErrorUtilities
{
private const string k_NetworkSuccess = "Success";
private const string k_NetworkIdMismatch = "NetworkId is invalid, likely caused by stale connection {0}.";
private const string k_NetworkVersionMismatch = "NetworkVersion is invalid, likely caused by stale connection {0}.";
private const string k_NetworkStateMismatch = "Sending data while connecting on connection {0} is not allowed.";
private const string k_NetworkPacketOverflow = "Unable to allocate packet due to buffer overflow.";
private const string k_NetworkSendQueueFull = "Currently unable to queue packet as there is too many in-flight packets. This could be because the send queue size ('Max Send Queue Size') is too small.";
private const string k_NetworkHeaderInvalid = "Invalid Unity Transport Protocol header.";
private const string k_NetworkDriverParallelForErr = "The parallel network driver needs to process a single unique connection per job, processing a single connection multiple times in a parallel for is not supported.";
private const string k_NetworkSendHandleInvalid = "Invalid NetworkInterface Send Handle. Likely caused by pipeline send data corruption.";
private const string k_NetworkArgumentMismatch = "Invalid NetworkEndpoint Arguments.";
private static readonly FixedString128Bytes k_NetworkSuccess = "Success";
private static readonly FixedString128Bytes k_NetworkIdMismatch = "Invalid connection ID {0}.";
private static readonly FixedString128Bytes k_NetworkVersionMismatch = "Connection ID is invalid. Likely caused by sending on stale connection {0}.";
private static readonly FixedString128Bytes k_NetworkStateMismatch = "Connection state is invalid. Likely caused by sending on connection {0} which is stale or still connecting.";
private static readonly FixedString128Bytes k_NetworkPacketOverflow = "Packet is too large to be allocated by the transport.";
private static readonly FixedString128Bytes k_NetworkSendQueueFull = "Unable to queue packet in the transport. Likely caused by send queue size ('Max Send Queue Size') being too small.";
/// <summary>
/// Convert error code to human readable error message.
/// Convert a UTP error code to human-readable error message.
/// </summary>
/// <param name="error">Status code of the error</param>
/// <param name="connectionId">Subject connection ID of the error</param>
/// <returns>Human readable error message.</returns>
/// <param name="error">UTP error code.</param>
/// <param name="connectionId">ID of the connection on which the error occurred.</param>
/// <returns>Human-readable error message.</returns>
public static string ErrorToString(Networking.Transport.Error.StatusCode error, ulong connectionId)
{
switch (error)
return ErrorToString((int)error, connectionId);
}
internal static string ErrorToString(int error, ulong connectionId)
{
return ErrorToFixedString(error, connectionId).ToString();
}
internal static FixedString128Bytes ErrorToFixedString(int error, ulong connectionId)
{
switch ((Networking.Transport.Error.StatusCode)error)
{
case Networking.Transport.Error.StatusCode.Success:
return k_NetworkSuccess;
case Networking.Transport.Error.StatusCode.NetworkIdMismatch:
return string.Format(k_NetworkIdMismatch, connectionId);
return FixedString.Format(k_NetworkIdMismatch, connectionId);
case Networking.Transport.Error.StatusCode.NetworkVersionMismatch:
return string.Format(k_NetworkVersionMismatch, connectionId);
return FixedString.Format(k_NetworkVersionMismatch, connectionId);
case Networking.Transport.Error.StatusCode.NetworkStateMismatch:
return string.Format(k_NetworkStateMismatch, connectionId);
return FixedString.Format(k_NetworkStateMismatch, connectionId);
case Networking.Transport.Error.StatusCode.NetworkPacketOverflow:
return k_NetworkPacketOverflow;
case Networking.Transport.Error.StatusCode.NetworkSendQueueFull:
return k_NetworkSendQueueFull;
case Networking.Transport.Error.StatusCode.NetworkHeaderInvalid:
return k_NetworkHeaderInvalid;
case Networking.Transport.Error.StatusCode.NetworkDriverParallelForErr:
return k_NetworkDriverParallelForErr;
case Networking.Transport.Error.StatusCode.NetworkSendHandleInvalid:
return k_NetworkSendHandleInvalid;
case Networking.Transport.Error.StatusCode.NetworkArgumentMismatch:
return k_NetworkArgumentMismatch;
default:
return FixedString.Format("Unknown error code {0}.", error);
}
return $"Unknown ErrorCode {Enum.GetName(typeof(Networking.Transport.Error.StatusCode), error)}";
}
}
@ -676,53 +676,72 @@ namespace Unity.Netcode.Transports.UTP
}
}
[BurstCompile]
private struct SendBatchedMessagesJob : IJob
{
public NetworkDriver.Concurrent Driver;
public SendTarget Target;
public BatchedSendQueue Queue;
public NetworkPipeline ReliablePipeline;
public void Execute()
{
var clientId = Target.ClientId;
var connection = ParseClientId(clientId);
var pipeline = Target.NetworkPipeline;
while (!Queue.IsEmpty)
{
var result = Driver.BeginSend(pipeline, connection, out var writer);
if (result != (int)Networking.Transport.Error.StatusCode.Success)
{
Debug.LogError($"Error sending message: {ErrorUtilities.ErrorToFixedString(result, clientId)}");
return;
}
// We don't attempt to send entire payloads over the reliable pipeline. Instead we
// fragment it manually. This is safe and easy to do since the reliable pipeline
// basically implements a stream, so as long as we separate the different messages
// in the stream (the send queue does that automatically) we are sure they'll be
// reassembled properly at the other end. This allows us to lift the limit of ~44KB
// on reliable payloads (because of the reliable window size).
var written = pipeline == ReliablePipeline ? Queue.FillWriterWithBytes(ref writer) : Queue.FillWriterWithMessages(ref writer);
result = Driver.EndSend(writer);
if (result == written)
{
// Batched message was sent successfully. Remove it from the queue.
Queue.Consume(written);
}
else
{
// Some error occured. If it's just the UTP queue being full, then don't log
// anything since that's okay (the unsent message(s) are still in the queue
// and we'll retry sending them later). Otherwise log the error and remove the
// message from the queue (we don't want to resend it again since we'll likely
// just get the same error again).
if (result != (int)Networking.Transport.Error.StatusCode.NetworkSendQueueFull)
{
Debug.LogError($"Error sending the message: {ErrorUtilities.ErrorToFixedString(result, clientId)}");
Queue.Consume(written);
}
return;
}
}
}
}
// Send as many batched messages from the queue as possible.
private void SendBatchedMessages(SendTarget sendTarget, BatchedSendQueue queue)
{
var clientId = sendTarget.ClientId;
var connection = ParseClientId(clientId);
var pipeline = sendTarget.NetworkPipeline;
while (!queue.IsEmpty)
new SendBatchedMessagesJob
{
var result = m_Driver.BeginSend(pipeline, connection, out var writer);
if (result != (int)Networking.Transport.Error.StatusCode.Success)
{
Debug.LogError("Error sending the message: " +
ErrorUtilities.ErrorToString((Networking.Transport.Error.StatusCode)result, clientId));
return;
}
// We don't attempt to send entire payloads over the reliable pipeline. Instead we
// fragment it manually. This is safe and easy to do since the reliable pipeline
// basically implements a stream, so as long as we separate the different messages
// in the stream (the send queue does that automatically) we are sure they'll be
// reassembled properly at the other end. This allows us to lift the limit of ~44KB
// on reliable payloads (because of the reliable window size).
var written = pipeline == m_ReliableSequencedPipeline ? queue.FillWriterWithBytes(ref writer) : queue.FillWriterWithMessages(ref writer);
result = m_Driver.EndSend(writer);
if (result == written)
{
// Batched message was sent successfully. Remove it from the queue.
queue.Consume(written);
}
else
{
// Some error occured. If it's just the UTP queue being full, then don't log
// anything since that's okay (the unsent message(s) are still in the queue
// and we'll retry sending the later). Otherwise log the error and remove the
// message from the queue (we don't want to resend it again since we'll likely
// just get the same error again).
if (result != (int)Networking.Transport.Error.StatusCode.NetworkSendQueueFull)
{
Debug.LogError("Error sending the message: " + ErrorUtilities.ErrorToString((Networking.Transport.Error.StatusCode)result, clientId));
queue.Consume(written);
}
return;
}
}
Driver = m_Driver.ToConcurrent(),
Target = sendTarget,
Queue = queue,
ReliablePipeline = m_ReliableSequencedPipeline
}.Run();
}
private bool AcceptConnection()