360 строки
16 KiB
C++
360 строки
16 KiB
C++
//
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
#ifndef IOFFLINESTORAGE_HPP
|
|
#define IOFFLINESTORAGE_HPP
|
|
|
|
#include "Enums.hpp"
|
|
#include "IHttpClient.hpp"
|
|
#include "ctmacros.hpp"
|
|
#include "ILogManager.hpp"
|
|
|
|
#include <functional>
|
|
#include <string>
|
|
#include <vector>
|
|
#include <map>
|
|
|
|
namespace MAT_NS_BEGIN {
|
|
|
|
constexpr unsigned int DB_FULL_NOTIFICATION_DEFAULT_PERCENTAGE = 75;
|
|
constexpr uint64_t DB_FULL_CHECK_INTERVAL_DEFAULT_MS = 5000;
|
|
|
|
using StorageRecordId = std::string;
|
|
|
|
using StorageBlob = std::vector<uint8_t>;
|
|
|
|
struct StorageRecord {
|
|
StorageRecordId id;
|
|
std::string tenantToken;
|
|
EventLatency latency = EventLatency_Unspecified;
|
|
EventPersistence persistence = EventPersistence_Normal;
|
|
int64_t timestamp = 0;
|
|
StorageBlob blob;
|
|
int retryCount = 0;
|
|
int64_t reservedUntil = 0;
|
|
#ifdef HAVE_MAT_EVT_TRACEID
|
|
std::string traceId;
|
|
#endif // HAVE_MAT_EVT_TRACEID
|
|
|
|
StorageRecord()
|
|
{}
|
|
|
|
#ifdef HAVE_MAT_EVT_TRACEID
|
|
StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence, std::string traceId)
|
|
: id(id), tenantToken(tenantToken), latency(latency), persistence(persistence), traceId(traceId)
|
|
{}
|
|
#else
|
|
StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence)
|
|
: id(id), tenantToken(tenantToken), latency(latency), persistence(persistence)
|
|
{}
|
|
#endif // HAVE_MAT_EVT_TRACEID
|
|
|
|
StorageRecord(std::string const& id, std::string const& tenantToken, EventLatency latency, EventPersistence persistence,
|
|
int64_t timestamp, std::vector<uint8_t>&& blob, int retryCount = 0, int64_t reservedUntil = 0)
|
|
: id(id), tenantToken(tenantToken), latency(latency), persistence(persistence), timestamp(timestamp), blob(blob), retryCount(retryCount), reservedUntil(reservedUntil)
|
|
{}
|
|
|
|
bool operator==(const StorageRecord& rhs) {
|
|
return ((*this).id == rhs.id);
|
|
}
|
|
|
|
};
|
|
|
|
using StorageRecordVector = std::vector<StorageRecord>;
|
|
using DroppedMap = std::map<std::string, size_t>;
|
|
|
|
class IOfflineStorageObserver {
|
|
public:
|
|
virtual ~IOfflineStorageObserver() {}
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage (re)opens its backing storage
|
|
/// <summary>
|
|
/// <remarks>
|
|
/// The <paramref name="type"> parameter is any textual description
|
|
/// of the active underlying offline storage implementation. The
|
|
/// recommended format is "<c><impl>/<state></c>", where
|
|
/// <c><impl></c> is name of the implementation (e.g. "SQLite")
|
|
/// and <c><state></c> is description of the current state
|
|
/// (e.g. if using the "Default", "Temp" or "None" database file).
|
|
/// </remarks>
|
|
/// <param name="type">Current storage description</param>
|
|
virtual void OnStorageOpened(std::string const& type) = 0;
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage encounters some failure
|
|
/// <summary>
|
|
/// <remarks>
|
|
/// The parameter <paramref name="reason"> is any textual description
|
|
/// of the problem that occurred. It does not necessarily have to be
|
|
/// human-readable or self-explaining, it can be just a numerical code
|
|
/// that only that implementation's maintainer can understand.
|
|
/// </remarks>
|
|
/// <param name="reason">Reason of the current/recent failure</param>
|
|
virtual void OnStorageFailed(std::string const& reason) = 0;
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage is not open.
|
|
/// <summary>
|
|
/// <remarks>
|
|
/// The parameter <paramref name="reason"> is any textual description
|
|
/// of the problem that occurred. It does not necessarily have to be
|
|
/// human-readable or self-explaining, it can be just a numerical code
|
|
/// that only that implementation's maintainer can understand.
|
|
/// </remarks>
|
|
/// <param name="reason">Reason of the current/recent failure</param>
|
|
virtual void OnStorageOpenFailed(std::string const& reason) = 0;
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage trims some records off in order to
|
|
/// maintain its configured size limit
|
|
/// <summary>
|
|
/// <param name="numRecords">Number of records trimmed</param>
|
|
virtual void OnStorageTrimmed(DroppedMap const& numRecords) = 0;
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage drops some records with retry count
|
|
/// over the configured limit
|
|
/// <summary>
|
|
/// <param name="numRecords">Number of records dropped</param>
|
|
virtual void OnStorageRecordsDropped(std::map<std::string, size_t> const& numRecords) = 0;
|
|
|
|
/// <summary>
|
|
/// Called when the offline storage rejects some records for reason like killSwitch
|
|
/// over the configured limit
|
|
/// <summary>
|
|
/// <param name="numRecords">Number of records dropped</param>
|
|
virtual void OnStorageRecordsRejected(std::map<std::string, size_t> const& numRecords) = 0;
|
|
|
|
virtual void OnStorageRecordsSaved(size_t numRecords) = 0;
|
|
};
|
|
|
|
class IOfflineStorage
|
|
{
|
|
public:
|
|
|
|
IOfflineStorage() noexcept = default;
|
|
virtual ~IOfflineStorage() noexcept = default;
|
|
|
|
/// <summary>
|
|
/// Initialize the offline storage
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Prepare any external libraries, open files etc. Called from the
|
|
/// internal worker thread as the initialization can take longer time.
|
|
/// Any other methods can be called only after initializing. If the
|
|
/// offline storage cannot be initialized, calling other methods later
|
|
/// must be still possible, they should return some default/error values.
|
|
/// The argument <paramref name="observer"/> specifies an instance of
|
|
/// <see cref="IOfflineStorageObserver"/> which will be used to notify the
|
|
/// owner about side actions performed by the storage implementation
|
|
/// (failures, dropping trimmed events etc.). The callback methods of
|
|
/// <paramref name="observer"/> can be invoked during execution of any of
|
|
/// the other methods of this interface, so the observer object must stay
|
|
/// alive until after the storage has been fully shut down.
|
|
/// <param name="observer">Notification observer instance</param>
|
|
virtual void Initialize(IOfflineStorageObserver& observer) = 0;
|
|
|
|
/// <summary>
|
|
/// Shut down the offline storage
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Flush any outstanding operations, close the underlying files etc.
|
|
/// No other methods can be called after shutdown. Called from the
|
|
/// internal worker thread.
|
|
/// </remarks>
|
|
virtual void Shutdown() = 0;
|
|
|
|
/// <summary>
|
|
/// Save pending records to persistent storage
|
|
/// </summary>
|
|
virtual void Flush() = 0;
|
|
|
|
/// <summary>
|
|
/// Store one telemetry event record
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The offline storage might need to trim the oldest events before
|
|
/// inserting the new one in order to maintain its configured size limit.
|
|
/// Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="record">Record data to store</param>
|
|
/// <returns>Whether the record was successfully stored</returns>
|
|
virtual bool StoreRecord(StorageRecord const& record) = 0;
|
|
|
|
/// <summary>
|
|
/// Store several telemetry event records
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The offline storage might need to trim the oldest events before
|
|
/// inserting the new one in order to maintain its configured size limit.
|
|
/// Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="record">Record data to store</param>
|
|
/// <returns>Number of records stored</returns>
|
|
virtual size_t StoreRecords(StorageRecordVector & records) = 0;
|
|
|
|
/// <summary>
|
|
/// Retrieve the best records to upload based on specified parameters
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Retrieves stored records one by one, filtered and ordered based on the
|
|
/// specified parameters. The priority is considered first: only events of
|
|
/// the highest priority found, higher or equal to
|
|
/// <paramref name="minPriority"/>, are returned during one call to this
|
|
/// method. The timestamp is considered next: events are returned in a
|
|
/// decreasing timestamp order, i.e. from the oldest to newest. The
|
|
/// specified <paramref name="maxCount"/> is checked the last. The
|
|
/// retrieval can be aborted after any record if the
|
|
/// <paramref name="consumer"/> returns <c>false</c>. Records which were
|
|
/// accepted by the consumer are reserved for the specified amount of time
|
|
/// <paramref name="leaseTimeMs"/> and will not be returned again by this
|
|
/// method until explicitly released or deleted or until their reservation
|
|
/// period expires. Called from the internal worker thread.
|
|
/// <param name="consumer">Callback functor processing the individual
|
|
/// retrieved records</param>
|
|
/// <param name="leaseTimeMs">Amount of time all acccepted records should
|
|
/// be reserved for, in milliseconds</param>
|
|
/// <param name="minPriority">Minimum priority of events to be
|
|
/// retrieved</param>
|
|
/// <param name="maxCount">Maximum number of events to retrieve</param>
|
|
/// <returns><c>true</c> if everything went well (even with no events
|
|
/// really accepted by the consumer), <c>false</c> if an error occurred and
|
|
/// the retrieval ended prematurely, records could not be reserved
|
|
/// etc.</returns>
|
|
virtual bool GetAndReserveRecords(std::function<bool(StorageRecord&&)> const& consumer, unsigned leaseTimeMs,
|
|
EventLatency minLatency = EventLatency_Unspecified, unsigned maxCount = 0) = 0;
|
|
|
|
/// <summary>
|
|
/// return where the last read was memory or disk
|
|
/// </summary>
|
|
/// <remarks>
|
|
virtual bool IsLastReadFromMemory() = 0;
|
|
|
|
/// <summary>
|
|
/// return last read count
|
|
/// </summary>
|
|
/// <remarks>
|
|
virtual unsigned LastReadRecordCount() = 0;
|
|
|
|
/// <summary>
|
|
/// Delete all records from storage
|
|
/// </summary>
|
|
virtual void DeleteAllRecords() = 0;
|
|
|
|
/// <summary>
|
|
/// Bulk delete records using "where" clause.
|
|
/// Specify condition using key-value pairs in the map.
|
|
/// </summary>
|
|
/// <remarks>
|
|
virtual void DeleteRecords(const std::map<std::string, std::string> & whereFilter) = 0;
|
|
|
|
/// <summary>
|
|
/// Delete records with specified IDs
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// IDs of records that are no longer found in the storage are silently
|
|
/// ignored. Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="ids">Identifiers of records to delete</param>
|
|
virtual void DeleteRecords(std::vector<StorageRecordId> const& ids, HttpHeaders headers, bool& fromMemory) = 0;
|
|
|
|
/// <summary>
|
|
/// Release event records with specified IDs
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// IDs of events that are no longer found in the storage are silently
|
|
/// ignored. If <paramref name="incrementRetryCount"/> is set and the retry
|
|
/// counter of some records reaches the maximum retry count, those events
|
|
/// may be dropped as part of the releasing procedure. Persistent storage
|
|
/// implementations of this interface drop these records. MemoryStorage does not.
|
|
/// Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="ids">Identifiers of records to release</param>
|
|
/// <param name="incrementRetryCount">Determines whether the retry
|
|
/// counter should be incremented for the records</param>
|
|
virtual void ReleaseRecords(std::vector<StorageRecordId> const& ids, bool incrementRetryCount, HttpHeaders headers, bool& fromMemory) = 0;
|
|
|
|
/// <summary>
|
|
/// Delete value of an auxiliary persistent configuration value
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If a setting with the specified name does not exist, success is returned.
|
|
/// </remarks>
|
|
/// <param name="name">Name of the setting to retrieve</param>
|
|
/// <returns>Status of operation</returns>
|
|
virtual bool DeleteSetting(std::string const& name) = 0;
|
|
|
|
/// <summary>
|
|
/// Set value of an auxiliary persistent configuration value
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Set <paramref name="value"/> to an empty string to delete any existing
|
|
/// value. Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="name">Name of the setting to update</param>
|
|
/// <param name="value">New setting value</param>
|
|
/// <returns></returns>
|
|
virtual bool StoreSetting(std::string const& name, std::string const& value) = 0;
|
|
|
|
/// <summary>
|
|
/// Get value of an auxiliary persistent configuration value
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If a setting with the specified name does not exist, an empty string is
|
|
/// returned. Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="name">Name of the setting to retrieve</param>
|
|
/// <returns>Value of the requested setting or an empty string</returns>
|
|
virtual std::string GetSetting(std::string const& name) = 0;
|
|
|
|
/// <summary>
|
|
/// Get size of the DB
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Get current Db size returned. Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <returns>Value of the requested DB size</returns>
|
|
virtual size_t GetSize() = 0;
|
|
|
|
/// <summary>
|
|
/// Get number of records of specific latency.
|
|
/// If latency is unspecified, then get the total number of records in storage.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Gets the total number of records. Primarily used on shutdown to evaluate
|
|
/// if upload still has to be done for the remaining records.
|
|
/// </remarks>
|
|
/// <returns>Number of records</returns>
|
|
virtual size_t GetRecordCount(EventLatency latency = EventLatency_Unspecified) const = 0;
|
|
|
|
/// <summary>
|
|
/// Get Vector of records from DB
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If a setting with the specified name does not exist, an empty string is
|
|
/// returned. Called from the internal worker thread.
|
|
/// </remarks>
|
|
/// <param name="shutdown">if this is called at shutdown ot not</param>
|
|
/// <param name="minPriority">lowest priority selected</param>
|
|
/// <param name="maxCount"> max count to be selected</param>
|
|
/// <returns>Value of the requested setting or an empty string</returns>
|
|
virtual std::vector<StorageRecord> GetRecords(bool shutdown, EventLatency minLatency = EventLatency_Unspecified, unsigned maxCount = 0) = 0;
|
|
|
|
virtual bool ResizeDb() = 0;
|
|
|
|
virtual void ReleaseAllRecords() {};
|
|
|
|
};
|
|
|
|
// IOfflineStorage as Module. External offline storage implementations need to inherit from it.
|
|
class IOfflineStorageModule : public IOfflineStorage, public IModule
|
|
{
|
|
};
|
|
|
|
|
|
} MAT_NS_END
|
|
#endif
|
|
|