// // Copyright (c) Microsoft Corporation. All rights reserved. // SPDX-License-Identifier: Apache-2.0 // #include "TelemetrySystem.hpp" #include "utils/Utils.hpp" #include "ILogManager.hpp" #include "mat/config.h" namespace MAT_NS_BEGIN { /// /// Initializes a new instance of the class. /// /// The log manager. /// The runtime configuration. /// The offline storage. /// The HTTP client. /// The async task dispatcher. /// The bandwidth controller. TelemetrySystem::TelemetrySystem( ILogManager& logManager, IRuntimeConfig& runtimeConfig, IOfflineStorage& offlineStorage, IHttpClient& httpClient, ITaskDispatcher& taskDispatcher, IBandwidthController* bandwidthController, LogSessionDataProvider& logSessionDataProvider) : TelemetrySystemBase(logManager, runtimeConfig, taskDispatcher), compression(runtimeConfig), hcm(logManager, httpClient, taskDispatcher), httpEncoder(*this, httpClient), httpDecoder(*this), storage(*this, offlineStorage), packager(runtimeConfig), tpm(*this, taskDispatcher, bandwidthController) { // Handler for start onStart = [this, &logSessionDataProvider](void) { bool result = true; result&=storage.start(); result&=tpm.start(); // TODO: clarify how UTC subsystem initializes LogSessionData m_storageType=SessionStorageType::FileStore ? // We may not necessarily compile in SQLite support for UTC min-build. For now we assume nullptr. logSessionDataProvider.CreateLogSessionData(); result&=stats.onStart(); return result; }; // Handler for stop onStop = [this](void) { uint32_t timeoutInSec = m_config.GetTeardownTime(); bool result = true; int64_t stopTimes[5] = { 0, 0, 0, 0, 0 }; // Perform upload only if not paused if ((timeoutInSec > 0) && (!tpm.isPaused())) { // perform uploads if required stopTimes[0] = GetUptimeMs(); LOG_TRACE("Shutdown timer started..."); upload(); // Try to push thru as much data as possible. // If either data is available for upload or // If there's outstanding request (some records marked in-flight), // then try to wait for up to config[CFG_INT_MAX_TEARDOWN_TIME] while (storage.GetRecordCount() || tpm.isUploadInProgress() ) { auto uploadTime = GetUptimeMs() - stopTimes[0]; if (uploadTime >= (1000L * timeoutInSec)) { // Hard-stop if it takes longer than planned LOG_TRACE("Shutdown timer expired, exiting..."); break; } MAT::sleep(100); LOG_INFO("offline records=%zu, pending uploads=%zu", storage.GetRecordCount(), hcm.requestCount()); } stopTimes[0] = GetUptimeMs() - stopTimes[0]; } // cancel all pending and force-finish all uploads stopTimes[1] = GetUptimeMs(); // TODO: Should this still pause, since the TPM now has abort logic in addition to pause logic? // hcm.cancelAllRequests is also part of pause, so the logic is definitely redundant. Issue 387 onPause(); hcm.cancelAllRequests(); tpm.finishAllUploads(); stopTimes[1] = GetUptimeMs() - stopTimes[1]; // initiate the stop sequence stopTimes[2] = GetUptimeMs(); result &= tpm.stop(); stopTimes[2] = GetUptimeMs() - stopTimes[2]; // cancel all pending tasks stopTimes[3] = GetUptimeMs(); LOG_TRACE("Waiting for all queued callbacks..."); m_done.wait(); LOG_TRACE("Stopped."); stopTimes[3] = GetUptimeMs() - stopTimes[3]; // stop storage stopTimes[4] = GetUptimeMs(); storage.stop(); stopTimes[4] = GetUptimeMs() - stopTimes[4]; #if 1 // Shutdown performance printout LOG_WARN("upload = %lld ms", stopTimes[0]); LOG_WARN("abort = %lld ms", stopTimes[1]); LOG_WARN("stop = %lld ms", stopTimes[2]); LOG_WARN("worker = %lld ms", stopTimes[3]); LOG_WARN("storage = %lld ms", stopTimes[4]); #endif return result; }; // Handler for pause onPause = [this](void) { bool result = true; result &= tpm.pause(); hcm.cancelAllRequests(); return result; }; // Handler for resume onResume = [this](void) { return tpm.start(); }; onCleanup = [this](void) { bool result = true; hcm.cancelAllRequests(); result &= tpm.cleanup(); return result; }; tpm.allUploadsFinished >> stats.onStop >> this->flushTaskDispatcher; // On an arbitrary user thread this->sending >> bondSerializer.serialize >> this->incomingEventPrepared; // On the inner worker thread this->preparedIncomingEvent >> storage.storeRecord >> stats.onIncomingEventAccepted >> tpm.eventArrived; storage.storeRecordFailed >> stats.onIncomingEventFailed; tpm.initiateUpload >> storage.retrieveEvents; storage.retrievedEvent >> packager.addEventToPackage; storage.retrievalFinished >> packager.finalizePackage; storage.retrievalFailed >> tpm.nothingToUpload; packager.emptyPackage >> tpm.nothingToUpload; packager.packagedEvents >> #ifdef HAVE_MAT_ZLIB compression.compress >> #endif httpEncoder.encode >> clockSkewDelta.encode >> stats.onUploadStarted >> hcm.sendRequest; #ifdef HAVE_MAT_ZLIB compression.compressionFailed >> storage.releaseRecords >> stats.onPackagingFailed >> tpm.packagingFailed; #endif hcm.requestDone >> clockSkewDelta.decode >> httpDecoder.decode; httpDecoder.eventsAccepted >> storage.deleteRecords >> stats.onUploadSuccessful >> tpm.eventsUploadSuccessful; httpDecoder.eventsRejected >> storage.deleteRecords >> stats.onUploadRejected >> tpm.eventsUploadRejected; httpDecoder.temporaryNetworkFailure >> storage.releaseRecords >> stats.onUploadFailed >> tpm.eventsUploadFailed; httpDecoder.temporaryServerFailure >> storage.releaseRecordsIncRetryCount >> stats.onUploadFailed >> tpm.eventsUploadFailed; httpDecoder.requestAborted >> storage.releaseRecords >> stats.onUploadFailed >> tpm.eventsUploadAborted; // // Storage notifications // storage.opened >> stats.onStorageOpened; storage.failed >> stats.onStorageFailed; storage.trimmed >> stats.onStorageTrimmed; storage.recordsDropped >> stats.onStorageRecordsDropped; storage.recordsRejected >> stats.onStorageRecordsRejected; } TelemetrySystem::~TelemetrySystem() { } bool TelemetrySystem::upload() { size_t recordCount = storage.GetRecordCount(); if (recordCount) { tpm.scheduleUpload(std::chrono::milliseconds {}, EventLatency_Normal, true); return true; } return false; } void TelemetrySystem::handleIncomingEventPrepared(IncomingEventContextPtr const& event) { uint32_t maxBlobSize = m_config[CFG_MAP_TPM][CFG_INT_TPM_MAX_BLOB_BYTES]; if (event->record.blob.size() > maxBlobSize) { DebugEvent evt; evt.type = DebugEventType::EVT_REJECTED; evt.param1 = REJECTED_REASON_EVENT_SIZE_LIMIT_EXCEEDED; m_logManager.DispatchEvent(evt); LOG_INFO("Event %s/%s dropped because size more than 2 MB", tenantTokenToId(event->record.tenantToken).c_str(), event->source->baseType.c_str()); return; } event->source = nullptr; preparedIncomingEventAsync(event); } void TelemetrySystem::handleFlushTaskDispatcher() { signalDone(); } } MAT_NS_END