Merge pull request #438 from microsoft/maharrim/roomc

Change OfflineRoom schema
This commit is contained in:
larvacea 2020-06-15 12:02:06 -07:00 коммит произвёл GitHub
Родитель 51abc164c0 00ea8f6110
Коммит 670d7dc52e
28 изменённых файлов: 1548 добавлений и 461 удалений

3
.github/workflows/test-android-mac.yml поставляемый
Просмотреть файл

@ -23,11 +23,12 @@ jobs:
uses: reactivecircus/android-emulator-runner@v2
with:
api-level: 29
ndk: 21.0.6113669
ndk: 21.1.6352462
cmake: 3.10.2.4988404
working-directory: ./lib/android_build
script: ./testandlog
- name: Upload
if: ${{ always() }}
uses: actions/upload-artifact@v2
with:
name: logcat

Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-14/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/>
</classpath>

Просмотреть файл

@ -13,7 +13,7 @@ android {
externalNativeBuild {
cmake {
// Passes optional arguments to CMake.
arguments "-DANDROID_STL=c++_shared"
arguments "-DANDROID_STL=c++_shared", "-DUSE_ROOM=1"
}
}
}
@ -36,7 +36,7 @@ dependencies {
implementation project(':maesdk')
implementation 'androidx.appcompat:appcompat:1.1.0'
implementation 'androidx.constraintlayout:constraintlayout:1.1.3'
testImplementation 'junit:junit:4.12'
testImplementation 'junit:junit:4.13'
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
}

Просмотреть файл

@ -9,11 +9,14 @@ import com.microsoft.applications.events.ByTenant;
import com.microsoft.applications.events.OfflineRoom;
import com.microsoft.applications.events.StorageRecord;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@RunWith(AndroidJUnit4.class)
public class OfflineRoomUnitTest {
@ -21,24 +24,22 @@ public class OfflineRoomUnitTest {
public void storeOneRecord() {
// Context of the app under test.
Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit");
room.deleteAllRecords();
StorageRecord record = new StorageRecord(
"Fred", "George",
StorageRecord.EventLatency_Normal,
StorageRecord.EventPersistence_Normal,
32,
1,
0,
new byte[] {1, 2, 3});
room.storeRecords(record);
assertEquals(1, room.getRecordCount(StorageRecord.EventLatency_Unspecified));
assertEquals(1, room.getRecordCount(StorageRecord.EventLatency_Normal));
assertEquals(
record.id.length() + record.tenantToken.length() + record.blob.length + 32,
room.totalSize());
assertEquals(1, room.deleteAllRecords());
try (OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit")) {
room.deleteAllRecords();
StorageRecord record = new StorageRecord(
0, "George",
StorageRecord.EventLatency_Normal,
StorageRecord.EventPersistence_Normal,
32,
1,
0,
new byte[]{1, 2, 3});
room.storeRecords(record);
assertEquals(1, room.getRecordCount(StorageRecord.EventLatency_Unspecified));
assertEquals(1, room.getRecordCount(StorageRecord.EventLatency_Normal));
assertThat(room.totalSize(), Matchers.greaterThan(new Long(0)));
assertEquals(1, room.deleteAllRecords());
}
}
protected void makeTenRecords(
@ -46,10 +47,8 @@ public class OfflineRoomUnitTest {
int latency,
int persistence
) {
room.deleteAllRecords();
StorageRecord record = new StorageRecord(
"",
0,
String.format("George-%d-%d", latency, persistence),
latency,
persistence,
@ -59,7 +58,6 @@ public class OfflineRoomUnitTest {
new byte[] {1, 2, 3}
);
for (int i = 0; i < 10; ++i) {
record.id = String.format("Fred-%d-%d-%d", latency, persistence, i);
record.tenantToken = String.format("George-%d-%d", latency, i);
room.storeRecords(record);
}
@ -70,65 +68,78 @@ public class OfflineRoomUnitTest {
public void TrimRecords() {
// Context of the app under test.
Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit");
try (OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit")) {
room.deleteAllRecords();
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
assertEquals(10, room.getRecordCount(StorageRecord.EventLatency_Normal));
assertEquals(5, room.trim(0.5));
assertEquals(5, room.getRecordCount(StorageRecord.EventLatency_Normal));
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
assertEquals(10, room.getRecordCount(StorageRecord.EventLatency_Normal));
long n = room.totalSize();
assertEquals(5, room.trim(n / 2));
assertEquals(5, room.getRecordCount(StorageRecord.EventLatency_Normal));
}
}
@Test
public void GetAndReserve() {
Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit");
try (OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit")) {
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
makeTenRecords(room, StorageRecord.EventLatency_RealTime, StorageRecord.EventPersistence_Normal);
assertEquals(10, room.getRecordCount(StorageRecord.EventLatency_Unspecified));
StorageRecord[] records = room.getAndReserve(StorageRecord.EventLatency_Normal, 3, 2, 5);
assertEquals(3, records.length);
for (StorageRecord record : records) {
assertEquals(StorageRecord.EventLatency_RealTime, record.latency);
assertEquals(0, record.retryCount);
assertEquals(0, record.reservedUntil);
}
records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(7, records.length);
for (StorageRecord record : records) {
assertEquals(StorageRecord.EventLatency_RealTime, record.latency);
assertEquals(0, record.retryCount);
assertEquals(0, record.reservedUntil);
}
records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(10, records.length);
for (StorageRecord record : records) {
assertEquals(StorageRecord.EventLatency_Normal, record.latency);
assertEquals(0, record.retryCount);
assertEquals(0, record.reservedUntil);
room.deleteAllRecords();
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
makeTenRecords(room, StorageRecord.EventLatency_RealTime, StorageRecord.EventPersistence_Normal);
assertEquals(20, room.getRecordCount(StorageRecord.EventLatency_Unspecified));
StorageRecord[] records =
room.getAndReserve(StorageRecord.EventLatency_Normal, 3, 2, 5);
assertEquals(3, records.length);
for (StorageRecord record : records) {
assertEquals(StorageRecord.EventLatency_RealTime, record.latency);
assertEquals(0, record.retryCount);
assertEquals(0, record.reservedUntil);
}
records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(17, records.length);
int was = records[0].latency;
for (StorageRecord record : records) {
assertThat(record.latency, lessThanOrEqualTo(was));
was = record.latency;
assertThat(
record.latency,
anyOf(
is(StorageRecord.EventLatency_Normal),
is(StorageRecord.EventLatency_RealTime)
)
);
assertEquals(0, record.retryCount);
assertEquals(0, record.reservedUntil);
}
}
}
@Test
public void ReleaseUnconsumed() {
Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit");
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
StorageRecord[] records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2,5);
assertEquals(10, records.length);
StorageRecord[] nothing = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2,5);
assertEquals(0, nothing.length);
room.releaseUnconsumed(records, 5);
StorageRecord[] released = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2,5);
assertEquals(5, released.length);
try (OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit")) {
room.deleteAllRecords();
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
StorageRecord[] records =
room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(10, records.length);
StorageRecord[] nothing =
room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(0, nothing.length);
room.releaseUnconsumed(records, 5);
StorageRecord[] released =
room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertEquals(5, released.length);
}
}
public String[] collectIds(StorageRecord[] records) {
public long[] collectIds(StorageRecord[] records) {
if (records == null || records.length == 0) {
return new String[0];
return new long[0];
}
String[] ids = new String[records.length];
long[] ids = new long[records.length];
for (int i = 0; i < records.length; ++i) {
ids[i] = records[i].id;
}
@ -138,26 +149,28 @@ public class OfflineRoomUnitTest {
@Test
public void RetireRetries() {
Context appContext = InstrumentationRegistry.getInstrumentation().getTargetContext();
OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit");
try (OfflineRoom room = new OfflineRoom(appContext, "OfflineRoomUnit")) {
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
StorageRecord[] records = room.getAndReserve(StorageRecord.EventLatency_Normal, 5, 2,5);
assertEquals(5, records.length);
String[] ids = collectIds(records);
ByTenant[] timedOut = room.releaseRecords(ids, true, 1);
assertNotNull(timedOut);
assertEquals(0, timedOut.length);
records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2,5);
assertNotNull(records);
assertEquals(10, records.length);
ids = collectIds(records);
timedOut = room.releaseRecords(ids, true, 1);
assertNotNull(timedOut);
assertEquals(5, timedOut.length);
for (int i = 0; i < 5; ++i) {
assertNotNull(timedOut[i]);
assertNotNull(timedOut[i].tenantToken);
assertEquals(1, timedOut[i].count);
makeTenRecords(room, StorageRecord.EventLatency_Normal, StorageRecord.EventPersistence_Normal);
StorageRecord[] records =
room.getAndReserve(StorageRecord.EventLatency_Normal, 5, 2, 5);
assertEquals(5, records.length);
long[] ids = collectIds(records);
ByTenant[] timedOut = room.releaseRecords(ids, true, 1);
assertNotNull(timedOut);
assertEquals(0, timedOut.length);
records = room.getAndReserve(StorageRecord.EventLatency_Normal, 1000, 2, 5);
assertNotNull(records);
assertEquals(10, records.length);
ids = collectIds(records);
timedOut = room.releaseRecords(ids, true, 1);
assertNotNull(timedOut);
assertEquals(5, timedOut.length);
for (int i = 0; i < 5; ++i) {
assertNotNull(timedOut[i]);
assertNotNull(timedOut[i].tenantToken);
assertEquals(1, timedOut[i].count);
}
}
}
}

Просмотреть файл

@ -25,19 +25,12 @@ include_directories(AFTER
# Test sources.
"${gtest_dir}")
# Creates and names a library, sets it as either STATIC
# or SHARED, and provides the relative paths to its source code.
# You can define multiple libraries, and CMake builds them for you.
# Gradle automatically packages shared libraries with your APK.
set(TESTS_COMMON_SRCS
${SDK_ROOT}/tests/common/Common.cpp
${SDK_ROOT}/tests/common/Mocks.cpp
${SDK_ROOT}/tests/common/Reactor.cpp
${SDK_ROOT}/lib/decoder/PayloadDecoder.cpp
${SDK_ROOT}/lib/offline/OfflineStorage_SQLite.cpp
${SDK_ROOT}/sqlite/sqlite3.c
)
)
set(TESTS_SRCS
${SDK_ROOT}/tests/unittests/BackoffTests_ExponentialWithJitter.cpp
@ -76,7 +69,7 @@ set(TESTS_SRCS
${SDK_ROOT}/tests/unittests/TransmissionPolicyManagerTests.cpp
${SDK_ROOT}/tests/unittests/TransmitProfilesTests.cpp
${SDK_ROOT}/tests/unittests/UtilsTests.cpp
)
)
find_package( ZLIB REQUIRED )
include_directories(
@ -91,6 +84,19 @@ set_target_properties(maesdk PROPERTIES
IMPORTED_LOCATION "${SDK_ROOT}/lib/android_build/maesdk/build/intermediates/cmake/debug/obj/${CMAKE_ANDROID_ARCH_ABI}/libmaesdk.so"
IMPORTED_LOCATION_Release "${SDK_ROOT}/lib/android_build/maesdk/build/intermediates/cmake/debug/obj/${CMAKE_ANDROID_ARCH_ABI}/libmaesdk.so")
# include the other flavor of database: if maesdk builds with Room include sqlite
# if maesdk builds with native sqlite, include Room
if (USE_ROOM)
set(OTHER_OFFLINE_SRCS
${SDK_ROOT}/lib/offline/OfflineStorage_SQLite.cpp
${SDK_ROOT}/sqlite/sqlite3.c
)
else()
set(OTHER_OFFLINE_SRCS
${SDK_ROOT}/lib/offline/OfflineStorage_Room.cpp
)
endif()
# Searches for a specified prebuilt library and stores the path as a
# variable. Because CMake includes system libraries in the search path by
@ -113,6 +119,7 @@ add_library( # Sets the name of the library.
# Provides a relative path to your source file(s).
native-lib.cpp
${OTHER_OFFLINE_SRCS}
${gtest_dir}/src/gtest-all.cc
${gmock_dir}/src/gmock-all.cc
${TESTS_COMMON_SRCS}
@ -127,8 +134,9 @@ add_library( # Sets the name of the library.
target_link_libraries( # Specifies the target library.
native-lib
maesdk
${zlib-path}
# Links the target library to the log library
# included in the NDK.
${log-lib})
${log-lib}
${zlib-path}
)

Просмотреть файл

@ -85,23 +85,23 @@ public:
);
}
void OnTestProgramEnd(const ::testing::UnitTest& unitTest) override {
void OnTestProgramEnd(const ::testing::UnitTest& unit) override {
__android_log_print(
ANDROID_LOG_INFO,
"MAE",
"End tests: %d succeeded, %d failed, %d total\n",
unitTest.successful_test_count(),
unitTest.failed_test_count(),
unitTest.total_test_count()
"End tests: %d success, %d fail, %d total",
unit.successful_test_count(),
unit.failed_test_count(),
unit.total_test_count()
);
}
};
int RunTests::run_all_tests(JNIEnv * env, jobject java_logger)
{
int argc = 1;
int argc = 2;
char command_name[] = "maesdk-test";
char filter[] = "--gtest_filter=Storage/*";
char filter[] = "--gtest_filter=*";
char *argv[] = { command_name, filter };
::testing::InitGoogleTest(&argc, argv);
::testing::TestEventListeners& listeners =

Просмотреть файл

@ -1,4 +1,4 @@
#Tue Jun 02 15:30:19 PDT 2020
#Fri Jun 05 11:46:20 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME

Просмотреть файл

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/>
</classpath>
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-14/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/>
</classpath>

Просмотреть файл

@ -1,3 +1,8 @@
plugins {
id 'maven-publish'
id 'net.linguica.maven-settings' version '0.5'
}
apply plugin: 'com.android.library'
android {
@ -16,7 +21,7 @@ android {
externalNativeBuild {
cmake {
// Passes optional arguments to CMake.
arguments "-DANDROID_STL=c++_shared", "-DBUILD_SHARED_LIBS=1"
arguments "-DANDROID_STL=c++_shared", "-DBUILD_SHARED_LIBS=1", "-DUSE_ROOM=1"
}
}
javaCompileOptions {
@ -54,9 +59,55 @@ dependencies {
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
testImplementation 'junit:junit:4.12'
testImplementation 'junit:junit:4.13'
testImplementation 'org.mockito:mockito-inline:3.2.4'
testImplementation "androidx.room:room-testing:$room_version"
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
}
repositories {
maven {
url 'https://office.pkgs.visualstudio.com/_packaging/1DS-SDK/maven/v1'
name '1DS-SDK'
authentication {
basic(BasicAuthentication)
}
}
}
afterEvaluate {
publishing {
repositories {
maven {
url 'https://office.pkgs.visualstudio.com/_packaging/1DS-SDK/maven/v1'
name '1DS-SDK'
authentication {
basic(BasicAuthentication)
}
}
}
publications {
// Creates a Maven publication called "release".
release(MavenPublication) {
// Applies the component for the release build variant.
from components.release
// You can then customize attributes of the publication as shown below.
groupId = 'com.microsoft.applications.events'
artifactId = 'maesdk'
version = '0.0.1-a'
}
// Creates a Maven publication called debug.
debug(MavenPublication) {
// Applies the component for the debug build variant.
from components.debug
groupId = 'com.microsoft.applications.events'
artifactId = 'maesdk-debug'
version = '0.0.0-a'
}
}
}
}

Просмотреть файл

@ -0,0 +1,111 @@
{
"formatVersion": 1,
"database": {
"version": 1,
"identityHash": "b2cab3c4102bcb772aa1a6715d44fc1c",
"entities": [
{
"tableName": "StorageRecord",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`id` TEXT NOT NULL, `tenantToken` TEXT, `latency` INTEGER NOT NULL, `persistence` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `retryCount` INTEGER NOT NULL, `reservedUntil` INTEGER NOT NULL, `blob` BLOB, PRIMARY KEY(`id`))",
"fields": [
{
"fieldPath": "id",
"columnName": "id",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "tenantToken",
"columnName": "tenantToken",
"affinity": "TEXT",
"notNull": false
},
{
"fieldPath": "latency",
"columnName": "latency",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "persistence",
"columnName": "persistence",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "timestamp",
"columnName": "timestamp",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "retryCount",
"columnName": "retryCount",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "reservedUntil",
"columnName": "reservedUntil",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "blob",
"columnName": "blob",
"affinity": "BLOB",
"notNull": false
}
],
"primaryKey": {
"columnNames": [
"id"
],
"autoGenerate": false
},
"indices": [
{
"name": "index_StorageRecord_latency",
"unique": false,
"columnNames": [
"latency"
],
"createSql": "CREATE INDEX IF NOT EXISTS `index_StorageRecord_latency` ON `${TABLE_NAME}` (`latency`)"
}
],
"foreignKeys": []
},
{
"tableName": "StorageSetting",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`name` TEXT NOT NULL, `value` TEXT NOT NULL, PRIMARY KEY(`name`))",
"fields": [
{
"fieldPath": "name",
"columnName": "name",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "value",
"columnName": "value",
"affinity": "TEXT",
"notNull": true
}
],
"primaryKey": {
"columnNames": [
"name"
],
"autoGenerate": false
},
"indices": [],
"foreignKeys": []
}
],
"views": [],
"setupQueries": [
"CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)",
"INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, 'b2cab3c4102bcb772aa1a6715d44fc1c')"
]
}
}

Просмотреть файл

@ -0,0 +1,111 @@
{
"formatVersion": 1,
"database": {
"version": 2,
"identityHash": "b2cab3c4102bcb772aa1a6715d44fc1c",
"entities": [
{
"tableName": "StorageRecord",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`id` TEXT NOT NULL, `tenantToken` TEXT, `latency` INTEGER NOT NULL, `persistence` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `retryCount` INTEGER NOT NULL, `reservedUntil` INTEGER NOT NULL, `blob` BLOB, PRIMARY KEY(`id`))",
"fields": [
{
"fieldPath": "id",
"columnName": "id",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "tenantToken",
"columnName": "tenantToken",
"affinity": "TEXT",
"notNull": false
},
{
"fieldPath": "latency",
"columnName": "latency",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "persistence",
"columnName": "persistence",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "timestamp",
"columnName": "timestamp",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "retryCount",
"columnName": "retryCount",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "reservedUntil",
"columnName": "reservedUntil",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "blob",
"columnName": "blob",
"affinity": "BLOB",
"notNull": false
}
],
"primaryKey": {
"columnNames": [
"id"
],
"autoGenerate": false
},
"indices": [
{
"name": "index_StorageRecord_latency",
"unique": false,
"columnNames": [
"latency"
],
"createSql": "CREATE INDEX IF NOT EXISTS `index_StorageRecord_latency` ON `${TABLE_NAME}` (`latency`)"
}
],
"foreignKeys": []
},
{
"tableName": "StorageSetting",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`name` TEXT NOT NULL, `value` TEXT NOT NULL, PRIMARY KEY(`name`))",
"fields": [
{
"fieldPath": "name",
"columnName": "name",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "value",
"columnName": "value",
"affinity": "TEXT",
"notNull": true
}
],
"primaryKey": {
"columnNames": [
"name"
],
"autoGenerate": false
},
"indices": [],
"foreignKeys": []
}
],
"views": [],
"setupQueries": [
"CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)",
"INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, 'b2cab3c4102bcb772aa1a6715d44fc1c')"
]
}
}

Просмотреть файл

@ -0,0 +1,119 @@
{
"formatVersion": 1,
"database": {
"version": 3,
"identityHash": "c562644244e4b7e47787917e9f63a59e",
"entities": [
{
"tableName": "StorageRecord",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, `tenantToken` TEXT, `latency` INTEGER NOT NULL, `persistence` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `retryCount` INTEGER NOT NULL, `reservedUntil` INTEGER NOT NULL, `blob` BLOB)",
"fields": [
{
"fieldPath": "id",
"columnName": "id",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "tenantToken",
"columnName": "tenantToken",
"affinity": "TEXT",
"notNull": false
},
{
"fieldPath": "latency",
"columnName": "latency",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "persistence",
"columnName": "persistence",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "timestamp",
"columnName": "timestamp",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "retryCount",
"columnName": "retryCount",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "reservedUntil",
"columnName": "reservedUntil",
"affinity": "INTEGER",
"notNull": true
},
{
"fieldPath": "blob",
"columnName": "blob",
"affinity": "BLOB",
"notNull": false
}
],
"primaryKey": {
"columnNames": [
"id"
],
"autoGenerate": true
},
"indices": [
{
"name": "index_StorageRecord_id",
"unique": true,
"columnNames": [
"id"
],
"createSql": "CREATE UNIQUE INDEX IF NOT EXISTS `index_StorageRecord_id` ON `${TABLE_NAME}` (`id`)"
},
{
"name": "index_StorageRecord_latency",
"unique": false,
"columnNames": [
"latency"
],
"createSql": "CREATE INDEX IF NOT EXISTS `index_StorageRecord_latency` ON `${TABLE_NAME}` (`latency`)"
}
],
"foreignKeys": []
},
{
"tableName": "StorageSetting",
"createSql": "CREATE TABLE IF NOT EXISTS `${TABLE_NAME}` (`name` TEXT NOT NULL, `value` TEXT NOT NULL, PRIMARY KEY(`name`))",
"fields": [
{
"fieldPath": "name",
"columnName": "name",
"affinity": "TEXT",
"notNull": true
},
{
"fieldPath": "value",
"columnName": "value",
"affinity": "TEXT",
"notNull": true
}
],
"primaryKey": {
"columnNames": [
"name"
],
"autoGenerate": false
},
"indices": [],
"foreignKeys": []
}
],
"views": [],
"setupQueries": [
"CREATE TABLE IF NOT EXISTS room_master_table (id INTEGER PRIMARY KEY,identity_hash TEXT)",
"INSERT OR REPLACE INTO room_master_table (id,identity_hash) VALUES(42, 'c562644244e4b7e47787917e9f63a59e')"
]
}
}

Просмотреть файл

@ -13,8 +13,6 @@ if (USE_CURL)
find_package(CURL REQUIRED)
endif()
set(USE_ROOM 1)
include_directories(AFTER
${SDK_ROOT}/lib
${SDK_ROOT}/lib/include/public

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.applications.events;
import android.content.Context;
import android.database.Cursor;
import android.util.Log;
import androidx.annotation.NonNull;
@ -9,11 +10,81 @@ import androidx.room.RoomDatabase;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
public class OfflineRoom {
class TrimTransaction implements Callable<Long>
{
OfflineRoom m_room = null;
long m_byteLimit = 0;
public TrimTransaction(OfflineRoom room, long byteLimit)
{
m_room = room;
m_byteLimit = byteLimit;
}
protected long vacuum(long pre) {
try (Cursor c = m_room.m_db.query("VACUUM", null)) {};
long post = m_room.totalSize();
Log.i("MAE", String.format(
"Vacuum: %d before, %d after",
pre,
post));
return post;
}
public Long call()
{
if (m_room == null || m_byteLimit == 0) {
return null;
}
long currentSize = m_room.totalSize();
if (currentSize <= m_byteLimit) {
return new Long(0);
}
long postVacuum = currentSize;
try {
postVacuum = vacuum(currentSize);
} catch (Exception e) {
Log.e("MAE", "Exception in VACUUM", e);
postVacuum = currentSize;
}
if (postVacuum <= m_byteLimit) {
return new Long(0);
}
long records = m_room.m_srDao.totalRecordCount();
double fraction = 0.25; // fraction of current to be dropped
if (m_byteLimit > m_room.m_pageSize) {
double dLimit = m_byteLimit;
double dCurrent = postVacuum;
fraction = Math.max(0.25, 1.0 - (dLimit / dCurrent));
}
long to_drop = (long) Math.ceil(fraction * records);
if (to_drop <= 0) {
return new Long(0);
}
long recordsDropped = m_room.m_srDao.trim(to_drop);
long postDrop = m_room.totalSize();
long reVacuum = postDrop;
if (postDrop > m_byteLimit) {
reVacuum = vacuum(postDrop);
}
Log.i(
"MAE", String.format(
"Trim: dropped %d records, new size %d bytes",
recordsDropped,
reVacuum));
return new Long(recordsDropped);
}
}
public class OfflineRoom implements AutoCloseable {
OfflineRoomDatabase m_db = null;
StorageRecordDao m_srDao = null;
StorageSettingDao m_settingDao = null;
long m_pageSize = 4096;
public OfflineRoom(Context context, String name) {
RoomDatabase.Builder<OfflineRoomDatabase> builder;
@ -28,45 +99,100 @@ public class OfflineRoom {
m_db = builder.build();
m_srDao = m_db.getStorageRecordDao();
m_settingDao = m_db.getStorageSettingDao();
Log.i("MAE", String.format("Opened %s: %d records, %d settings", name, m_srDao.totalRecordCount(), m_settingDao.totalSettingCount()));
try (Cursor c = m_db.query("PRAGMA page_size", null)) {
if (c.getCount() == 1 && c.getColumnCount() == 1) {
c.moveToFirst();
m_pageSize = c.getLong(0);
} else {
Log.e("MAE",
String.format("Unexpected result from PRAGMA page_size: %d rows, %d columns",
c.getCount(),
c.getColumnCount()));
}
}
long pageCount = -1;
try (Cursor c = m_db.query("PRAGMA page_count", null)) {
if (c.getCount() == 1 && c.getColumnCount() == 1) {
c.moveToFirst();
pageCount = c.getLong(0);
}
}
Log.i("MAE", String.format("Opened %s: %d records, %d settings, page size %d, %d pages",
name,
m_srDao.totalRecordCount(),
m_settingDao.totalSettingCount(),
m_pageSize,
pageCount));
}
public void storeRecords(StorageRecord... records) {
m_srDao.insertRecords(records);
public long[] storeRecords(StorageRecord... records) {
return m_srDao.insertRecords(records);
}
public void closeConnection()
public void close()
{
if (m_db.isOpen()) {
m_db.close();
}
m_srDao = null;
m_settingDao = null;
m_db = null;
}
public void storeFromBuffers(int count, int[] indices, byte[] bytes, int[] small, long[] big)
public void explain(String query)
{
try (Cursor c = m_db.query("EXPLAIN QUERY PLAN " + query, null)) {
int n = c.getCount();
int m = c.getColumnCount();
boolean noMove = c.moveToFirst();
String[] names = c.getColumnNames();
for (int i = 0; i < names.length; ++i) {
Log.i("MAE", String.format("Type for column %s (%d): %d", names[i], i, c.getType(i)));
}
for (int j = 0; j < n; ++j) {
if (!c.moveToPosition(j)) {
break;
}
for (int i = 0; i < m; ++i) {
Log.i("MAE", String.format("%d %s: %s",
j,
names[i],
c.getString(i)));
}
}
}
}
public long[] storeFromBuffersIds(int count, int[] indices, byte[] bytes, int[] small, long[] big)
{
int byteIndex = 0;
StorageRecord[] records = new StorageRecord[count];
for (int i = 0; i < count; ++i) {
String id = new String(bytes, byteIndex, indices[3 * i]);
byteIndex += indices[3 * i];
String tenantToken = new String(bytes, byteIndex, indices[3 * i + 1]);
byteIndex += indices[3 * i + 1];
byte[] blob = new byte[indices[3 * i + 2]];
for (int j = 0; j < indices[3 * i + 2]; ++j) {
long id = big[3*i];
String tenantToken = new String(bytes, byteIndex, indices[2 * i]);
byteIndex += indices[2 * i];
byte[] blob = new byte[indices[2 * i + 1]];
for (int j = 0; j < indices[2 * i + 1]; ++j) {
blob[j] = bytes[j + byteIndex];
}
byteIndex += indices[3 * i + 2];
byteIndex += indices[2 * i + 1];
records[i] = new StorageRecord(
id,
tenantToken,
small[3 * i],
small[3 * i + 1],
big[2 * i],
big[3 * i + 1],
small[3 * i + 2],
big[2 * i + 1],
big[3 * i + 2],
blob);
}
storeRecords(records);
long[] ids = storeRecords(records);
return ids;
}
public void storeFromBuffers(int count, int[] indices, byte[] bytes, int[] small, long[] big)
{
storeFromBuffersIds(count, indices, bytes, small, big);
}
public long getRecordCount(int latency) {
@ -76,18 +202,26 @@ public class OfflineRoom {
return m_srDao.recordCount(latency);
}
public long trim(double fraction) {
long records = m_srDao.totalRecordCount();
long to_drop = (long) Math.ceil(fraction * records);
if (to_drop == 0) {
public long trim(long byteLimit) {
long currentSize = totalSize();
if (currentSize <= byteLimit) {
return 0;
}
return m_srDao.trim(to_drop);
Log.i("MAE","Start trim");
TrimTransaction transaction = new TrimTransaction(this, byteLimit);
Long recordsDropped = m_db.runInTransaction(transaction);
if (recordsDropped == null) {
Log.e("MAE", "Null result from trim");
return 0;
}
Log.i("MAE", String.format("Dropped %d records in trim", recordsDropped));
return recordsDropped;
}
public ByTenant[] releaseRecords(String[] ids, boolean incrementRetry, long maximumRetries)
public ByTenant[] releaseRecords(long[] ids, boolean incrementRetry, long maximumRetries)
{
TreeMap<String, Long> map = new TreeMap<String, Long>();
m_srDao.releaseRecords(ids, incrementRetry, maximumRetries, map);
ByTenant[] results = new ByTenant[map.size()];
int index = 0;
@ -123,7 +257,14 @@ public class OfflineRoom {
public long totalSize()
{
return m_srDao.totalSize() + m_settingDao.totalSize();
long result = 0;
try (Cursor c = m_db.query("PRAGMA page_count", null)) {
assert(c.getCount() == 1 && c.getColumnCount() == 1);
c.moveToFirst();
long pages = c.getLong(0);
result = pages * m_pageSize;
}
return result;
}
public StorageRecord[] getRecords(boolean shutdown, int minLatency, long limit)
@ -131,10 +272,15 @@ public class OfflineRoom {
return m_srDao.getRecords(shutdown, minLatency, limit);
}
public long deleteById(String[] ids) {
public long deleteById(long[] ids) {
return m_srDao.deleteById(ids);
}
public long deleteByToken(String token)
{
return m_srDao.deleteRecordsByToken(token);
}
public StorageRecord[] getAndReserve(int minLatency, long limit, long now, long until)
{
return m_srDao.getAndReserve(minLatency, limit, now, until);

Просмотреть файл

@ -3,7 +3,7 @@ package com.microsoft.applications.events;
import androidx.room.Database;
import androidx.room.RoomDatabase;
@Database(version = 1, entities = { StorageRecord.class, StorageSetting.class })
@Database(version = 3, entities = { StorageRecord.class, StorageSetting.class })
public abstract class OfflineRoomDatabase extends RoomDatabase {
abstract public StorageRecordDao getStorageRecordDao();
abstract public StorageSettingDao getStorageSettingDao();

Просмотреть файл

@ -1,10 +1,12 @@
package com.microsoft.applications.events;
import androidx.annotation.NonNull;
import androidx.room.ColumnInfo;
import androidx.room.Entity;
import androidx.room.Index;
import androidx.room.PrimaryKey;
@Entity
@Entity(indices = {@Index(value = {"id"}, unique = true)})
public class StorageRecord {
final public static int EventLatency_Unspecified = -1;
final public static int EventLatency_Normal = 1;
@ -13,8 +15,7 @@ public class StorageRecord {
public StorageRecord(
@NonNull
String id,
long id,
String tenantToken,
int latency,
int persistence,
@ -33,10 +34,11 @@ public class StorageRecord {
this.blob = blob;
}
@PrimaryKey
@NonNull
public String id;
@PrimaryKey(autoGenerate = true)
@ColumnInfo(typeAffinity = ColumnInfo.INTEGER)
public long id = 0;
public String tenantToken;
@ColumnInfo(index = true)
public int latency = EventLatency_Unspecified;
public int persistence = EventPersistence_Normal;
public long timestamp = 0;

Просмотреть файл

@ -1,5 +1,7 @@
package com.microsoft.applications.events;
import android.util.Log;
import androidx.room.Dao;
import androidx.room.Delete;
import androidx.room.Insert;
@ -12,7 +14,7 @@ import java.util.TreeMap;
@Dao
public abstract class StorageRecordDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
public abstract void insertRecords(StorageRecord... records);
public abstract long[] insertRecords(StorageRecord... records);
@Query("SELECT count(*) from StorageRecord WHERE latency = :latency")
public abstract long recordCount(int latency);
@ -23,52 +25,59 @@ public abstract class StorageRecordDao {
@Query("DELETE FROM StorageRecord")
public abstract int deleteAllRecords();
@Query(value = "SELECT sum(length(id)) + sum(length(tenantToken)) + sum(length(blob)) + 32*count(*) from StorageRecord;")
@Query(value = "SELECT sum(length(id)) + sum(length(tenantToken)) + sum(length(blob)) + 40*count(*) from StorageRecord;")
public abstract long totalSize();
@Query("DELETE FROM StorageRecord WHERE id IN (SELECT id FROM StorageRecord ORDER BY persistence ASC, timestamp ASC LIMIT :count)")
public abstract int trim(long count);
@Transaction
@Query("SELECT * FROM StorageRecord WHERE latency >= :minLatency ORDER BY latency DESC, persistence DESC, timestamp ASC LIMIT :limit")
public abstract StorageRecord[] getRecords(int minLatency, long limit);
@Transaction
@Query("SELECT * FROM StorageRecord WHERE latency >= :minLatency AND reservedUntil = 0 ORDER BY latency DESC, persistence DESC, timestamp ASC LIMIT :limit")
public abstract StorageRecord[] getUnreservedRecords(int minLatency, long limit);
@Transaction
@Query("DELETE FROM StorageRecord WHERE id IN (:ids)")
public abstract int deleteByIdBlock(String[] ids);
public abstract int deleteByIdBlock(long[] ids);
@Query("UPDATE StorageRecord SET reservedUntil = :until WHERE id IN (:ids)")
public abstract int setReservedBlock(String[] ids, long until);
public abstract int setReservedBlock(long[] ids, long until);
@Transaction
@Query("SELECT * FROM StorageRecord WHERE id IN (:ids) AND retryCount >= :maximumRetries")
public abstract StorageRecord[] getRetryExpired(String[] ids, long maximumRetries);
public abstract StorageRecord[] getRetryExpired(long[] ids, long maximumRetries);
@Delete
public abstract int deleteRecordInner(StorageRecord[] records);
@Query("UPDATE StorageRecord SET reservedUntil = 0, retryCount = retryCount + 1 WHERE id IN (:ids)")
public abstract int releaseAndIncrementRetryCounts(String[] ids);
public abstract int releaseAndIncrementRetryCounts(long[] ids);
@Query("SELECT min(latency) FROM StorageRecord WHERE latency >= :minLatency AND reservedUntil = 0")
public abstract Long getMinLatency(long minLatency);
@Transaction
@Query("SELECT * FROM StorageRecord WHERE latency = :latency AND reservedUntil = 0 ORDER BY persistence DESC, timestamp ASC LIMIT :limit")
public abstract StorageRecord[] getUnreservedByLatency(long latency, long limit);
@Query("UPDATE StorageRecord SET reservedUntil = 0 WHERE reservedUntil > 0 AND reservedUntil < :now")
public abstract int releaseExpired(long now);
@Query("DELETE FROM StorageRecord WHERE tenantToken = :token")
public abstract int deleteRecordsByToken(String token);
static protected final int idCount = 64;
@Transaction
public int deleteById(String[] ids)
public int deleteById(long[] ids)
{
int deleted = 0;
for (int i = 0; i < ids.length; i += idCount) {
int count = Math.min(idCount, ids.length - i);
String[] block = new String[count];
long[] block = new long[count];
for (int j = 0; j < count; ++j) {
block[j] = ids[i + j];
}
@ -104,7 +113,7 @@ public abstract class StorageRecordDao {
if (selected.length == 0) {
return selected;
}
String[] ids = new String[selected.length];
long[] ids = new long[selected.length];
for (int j = 0; j < selected.length; ++j) {
ids[j] = selected[j].id;
}
@ -115,7 +124,7 @@ public abstract class StorageRecordDao {
public void releaseUnconsumed(StorageRecord[] selected, int index)
{
int unconsumed = selected.length - index;
String[] ids = new String[unconsumed];
long[] ids = new long[unconsumed];
int j;
for (j = 0; j < unconsumed; ++j) {
ids[j] = selected[j].id;
@ -124,11 +133,11 @@ public abstract class StorageRecordDao {
}
@Transaction
public void setReserved(String[] ids, long until)
public void setReserved(long[] ids, long until)
{
for (int i = 0; i < ids.length; i += idCount) {
int count = Math.min(ids.length - i, idCount);
String[] block = new String[count];
long[] block = new long[count];
for (int j = 0; j < count; ++j) {
block[j] = ids[i + j];
}
@ -138,7 +147,7 @@ public abstract class StorageRecordDao {
@Transaction
public long releaseRecords(
String[] ids,
long[] ids,
boolean incrementRetry,
long maximumRetries,
TreeMap<String, Long> byTenant
@ -148,7 +157,7 @@ public abstract class StorageRecordDao {
if (incrementRetry) {
for (int i = 0; i < ids.length; i += idCount) {
int count = Math.min(ids.length - i, idCount);
String[] block = new String[count];
long[] block = new long[count];
for (int j = 0; j < count; ++j) {
block[j] = ids[i + j];
}

Просмотреть файл

@ -1,3 +1,5 @@
#!/usr/bin/env sh
./gradlew app:connectedDebugAndroidTest
RC=$?
adb logcat -t 2000 MAE:D '*:E' > ./logcat.txt
exit $RC

Просмотреть файл

@ -48,8 +48,8 @@ namespace ARIASDK_NS_BEGIN {
};
using DroppedMap = std::map<std::string, size_t>;
using StorageRecordVector = std::vector<StorageRecord>;
using DroppedMap = std::map<std::string, size_t>;
class IOfflineStorageObserver {
public:

Просмотреть файл

@ -147,29 +147,29 @@ namespace ARIASDK_NS_BEGIN {
{
while (maxCount && (m_records[latency]).size())
{
m_lastReadCount++;
StorageRecord & record = m_records[latency].back();
size_t recordSize = record.blob.size() + sizeof(record);
// Reserve records only if asked
size_t recordSize = record.blob.size() + sizeof(record);
StorageRecord forConsumer(record);
if (leaseTimeMs)
{
record.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs;
m_reserved_records[record.id] = record; // copy to reserved
forConsumer.reservedUntil = PAL::getUtcSystemTimeMs() + leaseTimeMs;
}
bool wantMore = consumer(std::move(record)); // move to consumer
m_records[latency].pop_back(); // destroy in records
bool wantMore = consumer(std::move(forConsumer)); // move to consumer
if (!wantMore) {
return true;
}
if (leaseTimeMs) {
m_reserved_records[record.id] = std::move(record); // move to reserved
}
m_records[latency].pop_back();
m_size -= std::min(m_size, recordSize);
maxCount--;
// If consumer has no space left for the records, exit
if (!wantMore)
return true;
m_lastReadCount++;
}
}
return true;
}
@ -253,7 +253,6 @@ namespace ARIASDK_NS_BEGIN {
}
}
}
/// <summary>
/// MemoryStorage delete from reserved and ram queue.

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -3,39 +3,41 @@
// Copyright (c) Microsoft. All rights reserved.
#pragma once
#include "pal/PAL.hpp"
#include "IOfflineStorage.hpp"
#include "pal/PAL.hpp"
#include "api/IRuntimeConfig.hpp"
#include "ILogManager.hpp"
#include <memory>
#include <atomic>
#include <memory>
#include <mutex>
#include <jni.h>
#define ENABLE_LOCKING // Enable DB locking for flush
namespace ARIASDK_NS_BEGIN {
#define ENABLE_LOCKING // Enable DB locking for flush
namespace ARIASDK_NS_BEGIN
{
class OfflineStorage_Room : public IOfflineStorage
{
protected:
class ConnectedEnv {
protected:
class ConnectedEnv
{
JNIEnv* env = nullptr;
JavaVM* vm = nullptr;
size_t push_count = 0;
static std::mutex s_envValuesMutex;
static std::map<JNIEnv *, size_t> s_envValues;
static std::map<JNIEnv*, size_t> s_envValues;
public:
public:
ConnectedEnv() = delete;
ConnectedEnv(JavaVM *vm_);
ConnectedEnv(JavaVM* vm_);
~ConnectedEnv();
bool operator!() const {
bool operator!() const
{
return !env;
}
@ -43,33 +45,34 @@ namespace ARIASDK_NS_BEGIN {
void popLocalFrame();
JNIEnv* operator-> () const {
JNIEnv* operator->() const
{
return env;
}
JNIEnv* getInner() const {
JNIEnv* getInner() const
{
return env;
}
};
public:
public:
OfflineStorage_Room(ILogManager& logManager, IRuntimeConfig& runtimeConfig);
OfflineStorage_Room() = delete;
virtual ~OfflineStorage_Room();
void Initialize(IOfflineStorageObserver& observer) override;
void Shutdown() override;
void Flush() override {};
void Flush() override{};
bool StoreRecord(StorageRecord const& record) override;
size_t StoreRecords(StorageRecordVector & records) override;
size_t StoreRecords(StorageRecordVector& records) override;
bool GetAndReserveRecords(std::function<bool(StorageRecord&&)> const& consumer, unsigned leaseTimeMs, EventLatency minLatency = EventLatency_Normal, unsigned maxCount = 0) override;
bool IsLastReadFromMemory() override;
unsigned LastReadRecordCount() override;
void DeleteRecords(const std::map<std::string, std::string> & whereFilter) override;
void DeleteRecords(std::vector<StorageRecordId> const& ids, HttpHeaders headers, bool& fromMemory) override;
void ReleaseRecords(std::vector<StorageRecordId> const& ids, bool incrementRetryCount, HttpHeaders headers, bool& fromMemory) override;
void DeleteRecords(const std::map<std::string, std::string>& whereFilter) override;
void DeleteRecords(std::vector<StorageRecordId> const& ids, HttpHeaders, bool&) override;
void ReleaseRecords(std::vector<StorageRecordId> const& ids, bool incrementRetryCount, HttpHeaders, bool&) override;
bool StoreSetting(std::string const& name, std::string const& value) override;
void DeleteSetting(std::string const& name);
@ -79,34 +82,34 @@ namespace ARIASDK_NS_BEGIN {
StorageRecordVector GetRecords(bool shutdown, EventLatency minLatency = EventLatency_Normal, unsigned maxCount = 0) override;
bool ResizeDb() override;
static void ConnectJVM(JNIEnv* env, jobject appContext, jclass room_class);
protected:
static void ConnectJVM(JNIEnv* env, jobject appContext);
protected:
MATSDK_LOG_DECL_COMPONENT_CLASS();
size_t GetSizeInternal(ConnectedEnv & env) const;
size_t GetSizeInternal(ConnectedEnv& env) const;
bool ResizeDbInternal(ConnectedEnv& env);
ILogManager &m_manager;
IRuntimeConfig &m_config;
IOfflineStorageObserver *m_observer;
ILogManager& m_manager;
IRuntimeConfig& m_config;
IOfflineStorageObserver* m_observer;
jobject m_room = nullptr;
std::recursive_mutex m_resize_mutex;
std::mutex m_resize_mutex;
constexpr static size_t CHECK_INSERT_COUNT = 1000;
size_t m_size_limit = 3 * 1024 * 1024; // 3 MB
size_t m_size_limit = 3 * 1024 * 1024; // 3 MB
double m_notify_fraction = 0.75;
uint64_t m_storageFullNotifyTime = 0;
constexpr static uint64_t DB_FULL_CHECK_TIME_MS = 5000;
std::atomic<size_t> m_checkAfterInsertCounter;
std::atomic<unsigned> m_lastReadCount;
std::mutex m_jniThreadsMutex;
std::set<JNIEnv *> m_jniThreads;
void ThrowLogic(ConnectedEnv & env, const char * message) const;
void ThrowRuntime(ConnectedEnv & env, const char * message) const;
std::set<JNIEnv*> m_jniThreads;
void ThrowLogic(ConnectedEnv& env, const char* message) const;
void ThrowRuntime(ConnectedEnv& env, const char* message) const;
static JavaVM* s_vm;
static jobject s_context;
};
} ARIASDK_NS_END
}
ARIASDK_NS_END
#endif

Просмотреть файл

@ -12,6 +12,8 @@
namespace ARIASDK_NS_BEGIN {
constexpr static size_t kBlockSize = 8192;
class DbTransaction {
SqliteDB* m_db;
public:
@ -57,7 +59,8 @@ namespace ARIASDK_NS_BEGIN {
, m_logManager(logManager)
{
uint32_t percentage = (inMemory) ? m_config[CFG_INT_RAMCACHE_FULL_PCT] : m_config[CFG_INT_STORAGE_FULL_PCT];
m_DbSizeLimit = (inMemory) ? static_cast<uint32_t>(m_config[CFG_INT_RAM_QUEUE_SIZE]) : static_cast<uint32_t>(m_config[CFG_INT_CACHE_FILE_SIZE]);
m_DbSizeLimit = (inMemory) ? static_cast<uint32_t>(m_config[CFG_INT_RAM_QUEUE_SIZE])
: m_config.GetOfflineStorageMaximumSizeBytes();
m_offlineStorageFileName = (inMemory) ? ":memory:" : (const char *)m_config[CFG_STR_CACHE_FILE_PATH];
if ((percentage == 0)||(percentage > 100))
@ -309,13 +312,17 @@ namespace ARIASDK_NS_BEGIN {
LOG_TRACE("Reserving %u event(s) {%s%s} for %u milliseconds",
static_cast<unsigned>(consumedIds.size()), consumedIds.front().c_str(), (consumedIds.size() > 1) ? ", ..." : "", leaseTimeMs);
std::vector<uint8_t> idList = packageIdList(consumedIds);
if (!SqliteStatement(*m_db, m_stmtReserveEvents).execute(idList, PAL::getUtcSystemTimeMs() + leaseTimeMs)) {
LOG_ERROR("Failed to reserve events to send: Database error occurred, recreating database");
recreate(207);
return false;
for (size_t i = 0; i < consumedIds.size(); i += kBlockSize)
{
auto count = std::min(kBlockSize, consumedIds.size() - i);
std::vector<uint8_t> idList = packageIdList(consumedIds.begin() + i, consumedIds.begin() + i + count);
if (!SqliteStatement(*m_db, m_stmtReserveEvents).execute(idList, PAL::getUtcSystemTimeMs() + leaseTimeMs))
{
LOG_ERROR("Failed to reserve events to send: Database error occurred, recreating database");
recreate(207);
return false;
}
}
m_lastReadCount = static_cast<unsigned>(consumedIds.size());
}
return true;
@ -452,12 +459,18 @@ namespace ARIASDK_NS_BEGIN {
#endif
LOG_TRACE("Deleting %u sent event(s) {%s%s}...", static_cast<unsigned>(ids.size()), ids.front().c_str(), (ids.size() > 1) ? ", ..." : "");
std::vector<uint8_t> idList = packageIdList(ids);
if (!SqliteStatement(*m_db, m_stmtDeleteEvents_ids).execute(idList)) {
LOG_ERROR("Failed to delete %u sent event(s) {%s%s}: Database error occurred, recreating database",
static_cast<unsigned>(ids.size()), ids.front().c_str(), (ids.size() > 1) ? ", ..." : "");
recreate(302);
return;
for (size_t i = 0; i < ids.size(); i += kBlockSize) {
size_t count = std::min(kBlockSize, ids.size() - i);
std::vector<uint8_t> idList = packageIdList(ids.begin() + i,
ids.begin() + i + count);
if (!SqliteStatement(*m_db, m_stmtDeleteEvents_ids).execute(idList)) {
LOG_ERROR(
"Failed to delete %u sent event(s) {%s%s}: Database error occurred, recreating database",
static_cast<unsigned>(ids.size()), ids.front().c_str(),
(ids.size() > 1) ? ", ..." : "");
recreate(302);
return;
}
}
}
}
@ -488,13 +501,19 @@ namespace ARIASDK_NS_BEGIN {
LOG_TRACE("Releasing %u event(s) {%s%s}, retry count %s...",
static_cast<unsigned>(ids.size()), ids.front().c_str(), (ids.size() > 1) ? ", ..." : "", incrementRetryCount ? "+1" : "not changed");
std::vector<uint8_t> idList = packageIdList(ids);
SqliteStatement releaseStmt(*m_db, m_stmtReleaseEvents_ids_retryCountDelta);
if (!releaseStmt.execute(idList, incrementRetryCount ? 1 : 0)) {
LOG_ERROR("Failed to release %u event(s) {%s%s}, retry count %s: Database error occurred, recreating database",
static_cast<unsigned>(ids.size()), ids.front().c_str(), (ids.size() > 1) ? ", ..." : "", incrementRetryCount ? "+1" : "not changed");
recreate(403);
return;
for (size_t i = 0; i < ids.size(); i += kBlockSize) {
size_t count = std::min(kBlockSize, ids.size() - i);
std::vector<uint8_t> idList = packageIdList(ids.begin() + i, ids.begin() + i + count);
if (!releaseStmt.execute(idList, incrementRetryCount ? 1 : 0)) {
LOG_ERROR(
"Failed to release %u event(s) {%s%s}, retry count %s: Database error occurred, recreating database",
static_cast<unsigned>(ids.size()), ids.front().c_str(),
(ids.size() > 1) ? ", ..." : "",
incrementRetryCount ? "+1" : "not changed");
recreate(403);
return;
}
}
LOG_TRACE("Successfully released %u requested event(s), %u were not found anymore",
releaseStmt.changes(), static_cast<unsigned>(ids.size()) - releaseStmt.changes());
@ -631,6 +650,13 @@ namespace ARIASDK_NS_BEGIN {
SqliteStatement(*m_db, "PRAGMA auto_vacuum=FULL").select();
SqliteStatement(*m_db, "PRAGMA journal_mode=WAL").select();
SqliteStatement(*m_db, "PRAGMA synchronous=NORMAL").select();
{
std::ostringstream tempPragma;
tempPragma << "PRAGMA temp_store_directory = '" << GetTempDirectory() << "'";
SqliteStatement(*m_db, tempPragma.str().c_str()).select();
const char * result = sqlite3_temp_directory;
LOG_INFO("Set sqlite3 temp_store_directory to '%s'", result);
}
int openedDbVersion;
{
@ -722,11 +748,15 @@ namespace ARIASDK_NS_BEGIN {
"(SELECT COUNT(record_id) FROM " TABLE_NAME_EVENTS ")"
"* ? / 100)");
PREPARE_SQL(m_stmtTrimEvents_percent,
"DELETE FROM " TABLE_NAME_EVENTS " WHERE record_id IN ("
"SELECT record_id FROM " TABLE_NAME_EVENTS " ORDER BY persistence ASC, timestamp ASC LIMIT MAX(1,"
"(SELECT COUNT(record_id) FROM " TABLE_NAME_EVENTS ")"
"* ? / 100)"
")");
"DELETE FROM " TABLE_NAME_EVENTS " WHERE record_id IN ("
"SELECT record_id FROM " TABLE_NAME_EVENTS " ORDER BY persistence ASC, timestamp ASC LIMIT MAX(1,"
"(SELECT COUNT(record_id) FROM " TABLE_NAME_EVENTS ")"
"* ? / 100)"
")");
PREPARE_SQL(m_stmtDeleteEvents_tenants,
SQL_SUPPLY_PACKAGED_IDS
"DELETE FROM " TABLE_NAME_EVENTS " WHERE tenant_token IN ids");
PREPARE_SQL(m_stmtDeleteEvents_ids,
SQL_SUPPLY_PACKAGED_IDS
"DELETE FROM " TABLE_NAME_EVENTS " WHERE record_id IN ids");
@ -888,23 +918,26 @@ namespace ARIASDK_NS_BEGIN {
return true;
}
std::vector<uint8_t> OfflineStorage_SQLite::packageIdList(std::vector<std::string> const& ids)
std::vector<uint8_t> OfflineStorage_SQLite::packageIdList(
std::vector<std::string>::const_iterator const & begin,
std::vector<std::string>::const_iterator const & end) const
{
size_t size = std::accumulate(ids.cbegin(), ids.cend(), size_t(0), [](size_t sum, std::string const& id) -> size_t {
size_t size = std::accumulate(begin, end, size_t(0), [](size_t sum, std::string const& id) -> size_t {
return sum + id.length() + 1;
});
std::vector<uint8_t> result;
result.reserve(size);
for (std::string const& id : ids) {
for (auto i = begin; i != end; ++i)
{
std::string const & id(*i);
uint8_t const* ptr = reinterpret_cast<uint8_t const*>(id.c_str());
result.insert(result.end(), ptr, ptr + id.size() + 1);
}
return result;
}
} ARIASDK_NS_END
#endif

Просмотреть файл

@ -51,7 +51,9 @@ namespace ARIASDK_NS_BEGIN {
bool initializeDatabase();
bool recreate(unsigned failureCode);
std::vector<uint8_t> packageIdList(std::vector<std::string> const& ids);
std::vector<uint8_t> packageIdList(
std::vector<std::string>::const_iterator const & begin,
std::vector<std::string>::const_iterator const & end) const;
// Debug routine to print record count in the DB
void printRecordCount();
@ -86,6 +88,7 @@ namespace ARIASDK_NS_BEGIN {
size_t m_stmtTrimEvents_percent {};
size_t m_stmtDeleteEvents_ids {};
size_t m_stmtReleaseExpiredEvents {};
size_t m_stmtDeleteEvents_tenants {};
size_t m_stmtSelectEvents {};
size_t m_stmtSelectEventAtShutdown {};
size_t m_stmtSelectEventsMinlatency {};

Просмотреть файл

@ -21,6 +21,7 @@ class MockIOfflineStorage : public MAT::IOfflineStorage {
MOCK_METHOD0(IsLastReadFromMemory, bool());
MOCK_METHOD0(LastReadRecordCount, unsigned());
MOCK_METHOD3(DeleteRecords, void(std::vector<MAT::StorageRecordId> const &, MAT::HttpHeaders, bool& ));
MOCK_METHOD1(DeleteRecordsByToken, void(std::vector<std::string> const &));
MOCK_METHOD4(ReleaseRecords, void(std::vector<MAT::StorageRecordId> const &, bool, MAT::HttpHeaders, bool&));
MOCK_METHOD2(StoreSetting, bool(std::string const &, std::string const &));
MOCK_METHOD1(GetSetting, std::string(std::string const &));

Просмотреть файл

@ -30,6 +30,7 @@ set(SRCS
MetaStatsTests.cpp
OacrTests.cpp
OfflineStorageTests.cpp
OfflineStorageTests_Room.cpp
OfflineStorageTests_SQLite.cpp
PackagerTests.cpp
PalTests.cpp

Просмотреть файл

@ -218,8 +218,32 @@ TEST(MemoryStorageTests, ReleaseRecords)
EXPECT_THAT(storage.GetReservedCount(), 0);
}
TEST(MemoryStorageTests, GetAndReserveSome)
{
MemoryStorage storage(testLogManager, testConfig);
storage.Initialize(testObserver);
addEvents(storage);
auto totalCount = storage.GetRecordCount();
constexpr size_t howMany = 32;
std::vector<StorageRecord> someRecords;
storage.GetAndReserveRecords(
[&someRecords, howMany] (StorageRecord && record)->bool
{
if (someRecords.size() >= howMany) {
return false;
}
someRecords.emplace_back(std::move(record));
return true;
},
EventLatency_Normal
);
EXPECT_EQ(howMany, someRecords.size());
EXPECT_EQ(howMany, storage.LastReadRecordCount());
EXPECT_EQ(totalCount - howMany, storage.GetRecordCount());
}
// This method is not implemented for RAM storage
TEST(MemoryStorage, StoreSetting)
TEST(MemoryStorageTests, StoreSetting)
{
MemoryStorage storage(testLogManager, testConfig);
bool result = storage.StoreSetting("not_implemented", "not_implemented");
@ -227,7 +251,7 @@ TEST(MemoryStorage, StoreSetting)
}
// This method is not implemented for RAM storage
TEST(MemoryStorage, GetSetting)
TEST(MemoryStorageTests, GetSetting)
{
MemoryStorage storage(testLogManager, testConfig);
auto result = storage.GetSetting("not_implemented");
@ -235,7 +259,7 @@ TEST(MemoryStorage, GetSetting)
}
// This method is not implemented for RAM storage
TEST(MemoryStorag, ResizeDb)
TEST(MemoryStorageTests, ResizeDb)
{
MemoryStorage storage(testLogManager, testConfig);
EXPECT_THAT(storage.ResizeDb(), true);
@ -246,7 +270,6 @@ constexpr size_t MAX_STRESS_THREADS = 20;
TEST(MemoryStorageTests, MultiThreadPerfTest)
{
MemoryStorage storage(testLogManager, testConfig);
std::atomic<size_t> totalRecords(0);
std::vector<std::thread> workers;
std::atomic<size_t> threadCount(0);

Просмотреть файл

@ -1,13 +1,19 @@
//
// Created by maharrim on 5/18/2020.
//
#ifdef ANDROID
#include <android/log.h>
#endif
#include "common/Common.hpp"
#include "common/MockIRuntimeConfig.hpp"
#include "common/MockIOfflineStorageObserver.hpp"
#include "offline/MemoryStorage.hpp"
#ifdef ANDROID
#include "offline/OfflineStorage_Room.hpp"
#endif
#include "offline/OfflineStorage_SQLite.hpp"
#include "NullObjects.hpp"
#include <functional>
#include <string>
namespace MAE = ::Microsoft::Applications::Events;
@ -45,18 +51,20 @@ public:
OfflineStorageTestsRoom() : logManager(&nullLogManager)
{
EXPECT_CALL(configMock, GetOfflineStorageMaximumSizeBytes()).WillRepeatedly(
Return(UINT_MAX));
Return(32 * 4096));
EXPECT_CALL(configMock, GetMaximumRetryCount()).WillRepeatedly(
Return(5));
std::ostringstream name;
implementation = GetParam();
switch (implementation) {
#ifdef ANDROID
case StorageImplementation::Room:
configMock[CFG_STR_CACHE_FILE_PATH] = "OfflineStorageTestsRoom.db";
offlineStorage = std::make_unique<MAE::OfflineStorage_Room>(nullLogManager, configMock);
EXPECT_CALL(observerMock, OnStorageOpened("Room/Init"))
.RetiresOnSaturation();
break;
#endif
case StorageImplementation::SQLite:
name << MAE::GetTempDirectory() << "OfflineStorageTestsSQLite.db";
configMock[CFG_STR_CACHE_FILE_PATH] = name.str();
@ -76,7 +84,6 @@ public:
{
offlineStorage->Shutdown();
}
void DeleteAllRecords() {
auto records = offlineStorage->GetRecords(true, EventLatency_Unspecified, 0);
if (records.empty()) {
@ -90,6 +97,7 @@ public:
HttpHeaders h;
bool fromMemory = false;
offlineStorage->DeleteRecords(ids, h, fromMemory);
EXPECT_EQ(0, offlineStorage->GetRecordCount());
}
void SetUp() override {
@ -293,9 +301,6 @@ TEST_P(OfflineStorageTestsRoom, TestGetRecords) {
TEST_P(OfflineStorageTestsRoom, TestManyExpiredRecords) {
size_t count = 5000;
if (implementation == StorageImplementation::SQLite) {
count = 64; // issue 411
}
auto now = PAL::getUtcSystemTimeMs();
auto retries = configMock.GetMaximumRetryCount() + 1;
std::vector<StorageRecord> manyRecords;
@ -343,9 +348,180 @@ TEST_P(OfflineStorageTestsRoom, TestManyExpiredRecords) {
EXPECT_EQ(remainingRecords, offlineStorage->GetRecordCount(EventLatency_Normal));
}
TEST_P(OfflineStorageTestsRoom, LastReadRecordCount) {
size_t count = 5000;
size_t consume = 315;
std::hash<size_t> id_hash;
StorageRecordVector records;
records.reserve(count);
auto now = PAL::getUtcSystemTimeMs();
for (size_t i = 0; i < count; i++) {
auto id = id_hash(i);
auto id_string = std::to_string(id);
records.emplace_back(
id_string,
id_string,
EventLatency_Normal,
EventPersistence_Normal,
now,
StorageBlob {3, 1, 4, 1, 5, 9}
);
}
offlineStorage->StoreRecords(records);
records.clear();
offlineStorage->GetAndReserveRecords(
[&records, consume](StorageRecord && record)->bool
{
if (records.size() >= consume) {
return false;
}
records.emplace_back(record);
return true;
},
5000
);
EXPECT_EQ(consume, offlineStorage->LastReadRecordCount());
}
TEST_P(OfflineStorageTestsRoom, ReleaseActuallyReleases) {
auto now = PAL::getUtcSystemTimeMs();
StorageRecord r(
"Fred",
"George",
EventLatency_Normal,
EventPersistence_Normal,
now,
StorageBlob {1, 2, 3}
);
offlineStorage->StoreRecord(r);
offlineStorage->GetAndReserveRecords(
[](StorageRecord && record)->bool
{
return false;
},
5000
);
EXPECT_EQ(0, offlineStorage->LastReadRecordCount());
StorageRecordVector records;
offlineStorage->GetAndReserveRecords(
[&records] (StorageRecord && record)->bool
{
records.emplace_back(std::move(record));
return true;
}, 5000
);
EXPECT_EQ(1, offlineStorage->LastReadRecordCount());
EXPECT_EQ(1, records.size());
offlineStorage->GetAndReserveRecords(
[] (StorageRecord && record)->bool
{
ADD_FAILURE();
return false;
},
5000
);
}
TEST_P(OfflineStorageTestsRoom, DeleteByToken)
{
StorageRecordVector records;
auto now = PAL::getUtcSystemTimeMs();
for (size_t i = 0; i < 1000; ++i) {
auto id = std::to_string(i);
auto tenantToken = std::to_string(i % 5);
records.emplace_back(
id,
tenantToken,
EventLatency_Normal,
EventPersistence_Normal,
now,
StorageBlob {1, 2, static_cast<unsigned char>(i), 4, 5}
);
}
offlineStorage->StoreRecords(records);
EXPECT_EQ(1000, offlineStorage->GetRecordCount());
offlineStorage->DeleteRecords({{ "tenant_token", "0"}});
EXPECT_EQ(800, offlineStorage->GetRecordCount());
}
TEST_P(OfflineStorageTestsRoom, ResizeDB)
{
if (implementation == StorageImplementation::Memory) {
return;
}
auto now = PAL::getUtcSystemTimeMs();
StorageRecord record(
"",
"TenantFred",
EventLatency_Normal,
EventPersistence_Normal,
now,
StorageBlob {1, 2, 3, 4}
);
size_t index = 1;
while (offlineStorage->GetSize() <= configMock.GetOfflineStorageMaximumSizeBytes()) {
record.id = std::to_string(index);
offlineStorage->StoreRecord(record);
index += 1;
}
auto preCount = offlineStorage->GetRecordCount();
offlineStorage->ResizeDb();
auto postCount = offlineStorage->GetRecordCount();
EXPECT_GT(preCount, postCount);
}
TEST_P(OfflineStorageTestsRoom, StoreManyRecords)
{
constexpr size_t targetSize = 2 * 1024 * 1024;
constexpr size_t blobSize = 512;
constexpr size_t blockSize = 1024;
std::random_device rd; // non-deterministic generator
std::mt19937_64 gen(rd()); // to seed mersenne twister.
std::uniform_int_distribution<> randomByte(0,255);
std::uniform_int_distribution<uint64_t> randomWord(0, UINT64_MAX);
auto now = PAL::getUtcSystemTimeMs();
StorageBlob masterBlob;
masterBlob.reserve(blobSize);
while (masterBlob.size() < blobSize) {
masterBlob.push_back(randomByte(gen));
}
size_t blocks = 0;
StorageRecordVector records;
records.reserve(blockSize);
while (records.size() < blockSize) {
records.emplace_back(
"",
"Fred-Doom-Token23",
EventLatency_Normal,
EventPersistence_Normal,
now,
StorageBlob(masterBlob)
);
}
while (offlineStorage->GetSize() < targetSize) {
for (auto & record : records) {
record.id = std::to_string(randomWord(gen));
}
offlineStorage->StoreRecords(records);
++blocks;
}
EXPECT_EQ(blocks * blockSize, offlineStorage->GetRecordCount());
}
#ifdef ANDROID
auto values = Values(StorageImplementation::Room, StorageImplementation::SQLite, StorageImplementation::Memory);
#else
auto values = Values(StorageImplementation::SQLite, StorageImplementation::Memory);
#endif
INSTANTIATE_TEST_CASE_P(Storage,
OfflineStorageTestsRoom,
Values(StorageImplementation::Room, StorageImplementation::SQLite, StorageImplementation::Memory),
values,
[](const testing::TestParamInfo<OfflineStorageTestsRoom::ParamType>& info)->std::string {
std::ostringstream s;
s << info.param;