// // Copyright (c) Microsoft Corporation. All rights reserved. // SPDX-License-Identifier: Apache-2.0 // #ifndef IOFFLINESTORAGE_HPP #define IOFFLINESTORAGE_HPP #include "Enums.hpp" #include "IHttpClient.hpp" #include "ctmacros.hpp" #include "ILogManager.hpp" #include #include #include #include namespace MAT_NS_BEGIN { constexpr unsigned int DB_FULL_NOTIFICATION_DEFAULT_PERCENTAGE = 75; constexpr uint64_t DB_FULL_CHECK_INTERVAL_DEFAULT_MS = 5000; using StorageRecordId = std::string; using StorageBlob = std::vector; struct StorageRecord { StorageRecordId id; std::string tenantToken; EventLatency latency = EventLatency_Unspecified; EventPersistence persistence = EventPersistence_Normal; int64_t timestamp = 0; StorageBlob blob; int retryCount = 0; int64_t reservedUntil = 0; #ifdef HAVE_MAT_EVT_TRACEID std::string traceId; #endif // HAVE_MAT_EVT_TRACEID StorageRecord() {} #ifdef HAVE_MAT_EVT_TRACEID StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence, std::string traceId) : id(id), tenantToken(tenantToken), latency(latency), persistence(persistence), traceId(traceId) {} #else StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence) : id(id), tenantToken(tenantToken), latency(latency), persistence(persistence) {} #endif // HAVE_MAT_EVT_TRACEID StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence, int64_t timestamp, std::vector&& blob, int retryCount = 0, int64_t reservedUntil = 0) : id(id), tenantToken(tenantToken), latency(latency), persistence(persistence), timestamp(timestamp), blob(blob), retryCount(retryCount), reservedUntil(reservedUntil) {} bool operator==(const StorageRecord& rhs) { return ((*this).id == rhs.id); } }; using StorageRecordVector = std::vector; using DroppedMap = std::map; class IOfflineStorageObserver { public: virtual ~IOfflineStorageObserver() {} /// /// Called when the offline storage (re)opens its backing storage /// /// /// The parameter is any textual description /// of the active underlying offline storage implementation. The /// recommended format is "<impl>/<state>", where /// <impl> is name of the implementation (e.g. "SQLite") /// and <state> is description of the current state /// (e.g. if using the "Default", "Temp" or "None" database file). /// /// Current storage description virtual void OnStorageOpened(std::string const& type) = 0; /// /// Called when the offline storage encounters some failure /// /// /// The parameter is any textual description /// of the problem that occurred. It does not necessarily have to be /// human-readable or self-explaining, it can be just a numerical code /// that only that implementation's maintainer can understand. /// /// Reason of the current/recent failure virtual void OnStorageFailed(std::string const& reason) = 0; /// /// Called when the offline storage is not open. /// /// /// The parameter is any textual description /// of the problem that occurred. It does not necessarily have to be /// human-readable or self-explaining, it can be just a numerical code /// that only that implementation's maintainer can understand. /// /// Reason of the current/recent failure virtual void OnStorageOpenFailed(std::string const& reason) = 0; /// /// Called when the offline storage trims some records off in order to /// maintain its configured size limit /// /// Number of records trimmed virtual void OnStorageTrimmed(DroppedMap const& numRecords) = 0; /// /// Called when the offline storage drops some records with retry count /// over the configured limit /// /// Number of records dropped virtual void OnStorageRecordsDropped(std::map const& numRecords) = 0; /// /// Called when the offline storage rejects some records for reason like killSwitch /// over the configured limit /// /// Number of records dropped virtual void OnStorageRecordsRejected(std::map const& numRecords) = 0; virtual void OnStorageRecordsSaved(size_t numRecords) = 0; }; class IOfflineStorage { public: IOfflineStorage() noexcept = default; virtual ~IOfflineStorage() noexcept = default; /// /// Initialize the offline storage /// /// /// Prepare any external libraries, open files etc. Called from the /// internal worker thread as the initialization can take longer time. /// Any other methods can be called only after initializing. If the /// offline storage cannot be initialized, calling other methods later /// must be still possible, they should return some default/error values. /// The argument specifies an instance of /// which will be used to notify the /// owner about side actions performed by the storage implementation /// (failures, dropping trimmed events etc.). The callback methods of /// can be invoked during execution of any of /// the other methods of this interface, so the observer object must stay /// alive until after the storage has been fully shut down. /// Notification observer instance virtual void Initialize(IOfflineStorageObserver& observer) = 0; /// /// Shut down the offline storage /// /// /// Flush any outstanding operations, close the underlying files etc. /// No other methods can be called after shutdown. Called from the /// internal worker thread. /// virtual void Shutdown() = 0; /// /// Save pending records to persistent storage /// virtual void Flush() = 0; /// /// Store one telemetry event record /// /// /// The offline storage might need to trim the oldest events before /// inserting the new one in order to maintain its configured size limit. /// Called from the internal worker thread. /// /// Record data to store /// Whether the record was successfully stored virtual bool StoreRecord(StorageRecord const& record) = 0; /// /// Store several telemetry event records /// /// /// The offline storage might need to trim the oldest events before /// inserting the new one in order to maintain its configured size limit. /// Called from the internal worker thread. /// /// Record data to store /// Number of records stored virtual size_t StoreRecords(StorageRecordVector & records) = 0; /// /// Retrieve the best records to upload based on specified parameters /// /// /// Retrieves stored records one by one, filtered and ordered based on the /// specified parameters. The priority is considered first: only events of /// the highest priority found, higher or equal to /// , are returned during one call to this /// method. The timestamp is considered next: events are returned in a /// decreasing timestamp order, i.e. from the oldest to newest. The /// specified is checked the last. The /// retrieval can be aborted after any record if the /// returns false. Records which were /// accepted by the consumer are reserved for the specified amount of time /// and will not be returned again by this /// method until explicitly released or deleted or until their reservation /// period expires. Called from the internal worker thread. /// Callback functor processing the individual /// retrieved records /// Amount of time all acccepted records should /// be reserved for, in milliseconds /// Minimum priority of events to be /// retrieved /// Maximum number of events to retrieve /// true if everything went well (even with no events /// really accepted by the consumer), false if an error occurred and /// the retrieval ended prematurely, records could not be reserved /// etc. virtual bool GetAndReserveRecords(std::function const& consumer, unsigned leaseTimeMs, EventLatency minLatency = EventLatency_Unspecified, unsigned maxCount = 0) = 0; /// /// return where the last read was memory or disk /// /// virtual bool IsLastReadFromMemory() = 0; /// /// return last read count /// /// virtual unsigned LastReadRecordCount() = 0; /// /// Delete all records from storage /// virtual void DeleteAllRecords() = 0; /// /// Bulk delete records using "where" clause. /// Specify condition using key-value pairs in the map. /// /// virtual void DeleteRecords(const std::map & whereFilter) = 0; /// /// Delete records with specified IDs /// /// /// IDs of records that are no longer found in the storage are silently /// ignored. Called from the internal worker thread. /// /// Identifiers of records to delete virtual void DeleteRecords(std::vector const& ids, HttpHeaders headers, bool& fromMemory) = 0; /// /// Release event records with specified IDs /// /// /// IDs of events that are no longer found in the storage are silently /// ignored. If is set and the retry /// counter of some records reaches the maximum retry count, those events /// may be dropped as part of the releasing procedure. Persistent storage /// implementations of this interface drop these records. MemoryStorage does not. /// Called from the internal worker thread. /// /// Identifiers of records to release /// Determines whether the retry /// counter should be incremented for the records virtual void ReleaseRecords(std::vector const& ids, bool incrementRetryCount, HttpHeaders headers, bool& fromMemory) = 0; /// /// Delete value of an auxiliary persistent configuration value /// /// /// If a setting with the specified name does not exist, success is returned. /// /// Name of the setting to retrieve /// Status of operation virtual bool DeleteSetting(std::string const& name) = 0; /// /// Set value of an auxiliary persistent configuration value /// /// /// Set to an empty string to delete any existing /// value. Called from the internal worker thread. /// /// Name of the setting to update /// New setting value /// virtual bool StoreSetting(std::string const& name, std::string const& value) = 0; /// /// Get value of an auxiliary persistent configuration value /// /// /// If a setting with the specified name does not exist, an empty string is /// returned. Called from the internal worker thread. /// /// Name of the setting to retrieve /// Value of the requested setting or an empty string virtual std::string GetSetting(std::string const& name) = 0; /// /// Get size of the DB /// /// /// Get current Db size returned. Called from the internal worker thread. /// /// Value of the requested DB size virtual size_t GetSize() = 0; /// /// Get number of records of specific latency. /// If latency is unspecified, then get the total number of records in storage. /// /// /// Gets the total number of records. Primarily used on shutdown to evaluate /// if upload still has to be done for the remaining records. /// /// Number of records virtual size_t GetRecordCount(EventLatency latency = EventLatency_Unspecified) const = 0; /// /// Get Vector of records from DB /// /// /// If a setting with the specified name does not exist, an empty string is /// returned. Called from the internal worker thread. /// /// if this is called at shutdown ot not /// lowest priority selected /// max count to be selected /// Value of the requested setting or an empty string virtual std::vector GetRecords(bool shutdown, EventLatency minLatency = EventLatency_Unspecified, unsigned maxCount = 0) = 0; virtual bool ResizeDb() = 0; virtual void ReleaseAllRecords() {}; }; // IOfflineStorage as Module. External offline storage implementations need to inherit from it. class IOfflineStorageModule : public IOfflineStorage, public IModule { }; } MAT_NS_END #endif