Use separate flag for MQTT Buffer pooling (#5580)

Port 50027ff39e to master

It is better to use Unpooled buffers in dotnetty by default, as it is more stable, and should provide comparable performance. Adding a new flag "mqttSettings__UsePooledBuffers" to control buffer pooling a OptimizeForPerformance affects other things and we want to be able to control pooled/unpooled buffers separately.
This commit is contained in:
Varun Puranik 2021-09-29 14:00:06 -07:00 коммит произвёл GitHub
Родитель 7a34d05550
Коммит 089a1c6ee5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 6 добавлений и 9 удалений

Просмотреть файл

@ -71,6 +71,7 @@
| Metrics__Listener__Suffix | Appended to the metrics listener URL | string | metrics |
| MinTwinSyncPeriodSecs | Maximum frequency for pull any device/module twin | int32 | 120 |
| MqttSettings__Enabled | Whether the MQTT broker should be enabled | bool | true |
| MqttSettings__UsePooledBuffers | Whether MQTT protocol head should use pooled buffers | bool | false |
| OptimizeForPerformance | Increase RocksDB file I/O usage to speed up message storage | bool | true |
| ReportedPropertiesSyncFrequencySecs | Maximum frequency for pushing reported properties upstream | int32 | 5 |
| RocksDB_MaxOpenFiles | Max number of files to be concurrently opened by RocksDB | int32 | |

Просмотреть файл

@ -143,7 +143,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service
this.RegisterCommonModule(builder, optimizeForPerformance, storeAndForward, metricsConfig, nestedEdgeEnabled, authenticationMode);
this.RegisterRoutingModule(builder, storeAndForward, experimentalFeatures, nestedEdgeEnabled, authenticationMode == AuthenticationMode.Scope, trackDeviceState);
this.RegisterMqttModule(builder, storeAndForward, optimizeForPerformance, experimentalFeatures);
this.RegisterMqttModule(builder, storeAndForward, experimentalFeatures);
this.RegisterAmqpModule(builder);
builder.RegisterModule(new HttpModule(this.iotHubHostname, this.edgeDeviceId, proxyModuleId));
@ -172,7 +172,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service
void RegisterMqttModule(
ContainerBuilder builder,
StoreAndForward storeAndForward,
bool optimizeForPerformance,
ExperimentalFeatures experimentalFeatures)
{
var topics = new MessageAddressConversionConfiguration(
@ -186,7 +185,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service
// MQTT broker overrides the legacy MQTT protocol head
if (mqttSettingsConfiguration.GetValue("enabled", true) && !experimentalFeatures.EnableMqttBroker)
{
builder.RegisterModule(new MqttModule(mqttSettingsConfiguration, topics, this.serverCertificate, storeAndForward.IsEnabled, clientCertAuthEnabled, optimizeForPerformance, this.sslProtocols));
builder.RegisterModule(new MqttModule(mqttSettingsConfiguration, topics, this.serverCertificate, storeAndForward.IsEnabled, clientCertAuthEnabled, this.sslProtocols));
}
}

Просмотреть файл

@ -28,7 +28,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service.Modules
readonly bool isStoreAndForwardEnabled;
readonly X509Certificate2 tlsCertificate;
readonly bool clientCertAuthAllowed;
readonly bool optimizeForPerformance;
readonly SslProtocols sslProtocols;
public MqttModule(
@ -37,7 +36,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service.Modules
X509Certificate2 tlsCertificate,
bool isStoreAndForwardEnabled,
bool clientCertAuthAllowed,
bool optimizeForPerformance,
SslProtocols sslProtocols)
{
this.mqttSettingsConfiguration = Preconditions.CheckNotNull(mqttSettingsConfiguration, nameof(mqttSettingsConfiguration));
@ -45,7 +43,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service.Modules
this.tlsCertificate = Preconditions.CheckNotNull(tlsCertificate, nameof(tlsCertificate));
this.isStoreAndForwardEnabled = isStoreAndForwardEnabled;
this.clientCertAuthAllowed = clientCertAuthAllowed;
this.optimizeForPerformance = optimizeForPerformance;
this.sslProtocols = sslProtocols;
}
@ -55,8 +52,8 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service.Modules
builder.Register(
c =>
{
// TODO - We should probably also use some heuristics to make this determination, like how much memory does the system have.
return this.optimizeForPerformance ? PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default as IByteBufferAllocator;
var usePooledBuffers = this.mqttSettingsConfiguration.GetValue("UsePooledBuffers", false);
return usePooledBuffers ? PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default as IByteBufferAllocator;
})
.As<IByteBufferAllocator>()
.SingleInstance();

Просмотреть файл

@ -219,7 +219,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.E2E.Test
Option.None<X509Certificate2>()));
builder.RegisterModule(new HttpModule("Edge1", iotHubConnectionStringBuilder.DeviceId, "iotedgeApiProxy"));
builder.RegisterModule(new MqttModule(mqttSettingsConfiguration.Object, topics, this.serverCertificate, false, false, false, this.sslProtocols));
builder.RegisterModule(new MqttModule(mqttSettingsConfiguration.Object, topics, this.serverCertificate, false, false, this.sslProtocols));
builder.RegisterModule(new AmqpModule("amqps", 5671, this.serverCertificate, iotHubConnectionStringBuilder.HostName, true, this.sslProtocols));
}