Merge pull request #17 from ashvina/auto-expiration

Auto expire outcomes older than 30 minutes
This commit is contained in:
Ashvin 2018-04-11 22:00:32 -07:00 коммит произвёл GitHub
Родитель 6ea32bd1f5 35e6183744
Коммит 10ea403be8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 201 добавлений и 16 удалений

Просмотреть файл

@ -41,6 +41,16 @@ public class ActionTable extends OutcomeTable<Action> {
actions.forEach(this::add);
}
/**
* Deletes all rows corresponding to actions older than or recorded at the given expiration
*
* @param expiration timestamp
* @return {@link ActionTable} containing retained {@link Action}s
*/
public ActionTable expire(Instant expiration) {
return new ActionTable(super.expireBefore(expiration));
}
/**
* @param id unique action id
* @return {@link Action}s with the given id
@ -135,7 +145,7 @@ public class ActionTable extends OutcomeTable<Action> {
* Builds {@link ActionTable} instance and provides ability to update it.
*/
public static class Builder {
private final ActionTable actionsTable = new ActionTable();
private ActionTable actionsTable = new ActionTable();
public ActionTable get() {
return actionsTable;
@ -148,5 +158,9 @@ public class ActionTable extends OutcomeTable<Action> {
this.actionsTable.addAll(actions);
}
public void expireBefore(Instant expiration) {
this.actionsTable = actionsTable.expire(expiration);
}
}
}

Просмотреть файл

@ -50,6 +50,16 @@ public class DiagnosisTable extends OutcomeTable<Diagnosis> {
return new DiagnosisTable(result);
}
/**
* Deletes all rows corresponding to diagnosis older than or recorded at the given expiration
*
* @param expiration timestamp
* @return {@link DiagnosisTable} containing retained {@link Diagnosis}
*/
public DiagnosisTable expire(Instant expiration) {
return new DiagnosisTable(super.expireBefore(expiration));
}
/**
* Retains all {@link Diagnosis} with given diagnosis type
*
@ -135,7 +145,7 @@ public class DiagnosisTable extends OutcomeTable<Diagnosis> {
* Builds {@link DiagnosisTable} instance and provides ability to update it.
*/
public static class Builder {
private final DiagnosisTable diagnosisTable = new DiagnosisTable();
private DiagnosisTable diagnosisTable = new DiagnosisTable();
public DiagnosisTable get() {
return diagnosisTable;
@ -148,5 +158,9 @@ public class DiagnosisTable extends OutcomeTable<Diagnosis> {
this.diagnosisTable.addAll(diagnosis);
}
public void expireBefore(Instant expiration) {
this.diagnosisTable = diagnosisTable.expire(expiration);
}
}
}

Просмотреть файл

@ -11,6 +11,7 @@ import tech.tablesaw.api.DoubleColumn;
import tech.tablesaw.api.LongColumn;
import tech.tablesaw.api.Table;
import tech.tablesaw.filtering.Filter;
import tech.tablesaw.util.Selection;
import java.time.Instant;
import java.util.ArrayList;
@ -91,6 +92,17 @@ public class MeasurementsTable {
return table;
}
/**
* Deletes all rows corresponding to measurements older than or recorded at the given expiration
*
* @param expiration timestamp
* @return {@link MeasurementsTable} containing retained {@link Measurement}s
*/
public MeasurementsTable expire(Instant expiration) {
Selection s = TableUtils.filterTime(measurements, TIME_STAMP, null, expiration);
return new MeasurementsTable(measurements.dropRows(s.toIntArrayList()));
}
/**
* Retains all {@link Measurement}s with given component names.
*
@ -162,7 +174,8 @@ public class MeasurementsTable {
* @return {@link MeasurementsTable} containing filtered {@link Measurement}s
*/
public MeasurementsTable between(Instant oldest, Instant newest) {
return new MeasurementsTable(TableUtils.filterTime(measurements, TIME_STAMP, oldest, newest));
Selection selection = TableUtils.filterTime(measurements, TIME_STAMP, oldest, newest);
return new MeasurementsTable(measurements.selectWhere(selection));
}
/**
@ -349,7 +362,7 @@ public class MeasurementsTable {
* Builds {@link MeasurementsTable} instance and provides ability to update it.
*/
public static class Builder {
private final MeasurementsTable measurementsTable = new MeasurementsTable();
private MeasurementsTable measurementsTable = new MeasurementsTable();
public MeasurementsTable get() {
return measurementsTable;
@ -362,5 +375,9 @@ public class MeasurementsTable {
this.measurementsTable.addAll(measurements);
}
public void expireBefore(Instant expiration) {
this.measurementsTable = measurementsTable.expire(expiration);
}
}
}

Просмотреть файл

@ -11,6 +11,7 @@ import tech.tablesaw.api.IntColumn;
import tech.tablesaw.api.LongColumn;
import tech.tablesaw.api.Table;
import tech.tablesaw.filtering.Filter;
import tech.tablesaw.util.Selection;
import java.time.Instant;
import java.util.ArrayList;
@ -22,8 +23,6 @@ import java.util.stream.Collectors;
import static tech.tablesaw.api.QueryHelper.column;
import static tech.tablesaw.api.QueryHelper.or;
//TODO thread safety
/**
* An ordered collection of {@link Outcome} instances. This class provides methods to filter, query and aggregate the
* {@link Outcome} instances.
@ -77,6 +76,11 @@ public abstract class OutcomeTable<T extends Outcome> {
});
}
public Table expireBefore(Instant expiration) {
Selection s = TableUtils.filterTime(table, TIME_STAMP, null, expiration);
return table.dropRows(s.toIntArrayList());
}
Table filterId(int id) {
return table.selectWhere(column(ID).isEqualTo(id));
}
@ -103,7 +107,7 @@ public abstract class OutcomeTable<T extends Outcome> {
}
Table filterTime(Instant oldest, Instant newest) {
return TableUtils.filterTime(table, TIME_STAMP, oldest, newest);
return table.selectWhere(TableUtils.filterTime(table, TIME_STAMP, oldest, newest));
}
/**

Просмотреть файл

@ -41,6 +41,16 @@ public class SymptomsTable extends OutcomeTable<Symptom> {
symptoms.forEach(this::add);
}
/**
* Deletes all rows corresponding to {@link Symptom}s older than or recorded at the given expiration
*
* @param expiration timestamp
* @return {@link SymptomsTable} containing retained {@link Symptom}s
*/
public SymptomsTable expire(Instant expiration) {
return new SymptomsTable(super.expireBefore(expiration));
}
/**
* @param id unique symptom id
* @return {@link Symptom}s with the given id
@ -135,7 +145,7 @@ public class SymptomsTable extends OutcomeTable<Symptom> {
* Builds {@link SymptomsTable} instance and provides ability to update it.
*/
public static class Builder {
private final SymptomsTable symptomsTable = new SymptomsTable();
private SymptomsTable symptomsTable = new SymptomsTable();
public SymptomsTable get() {
return symptomsTable;
@ -148,5 +158,9 @@ public class SymptomsTable extends OutcomeTable<Symptom> {
this.symptomsTable.addAll(symptoms);
}
public void expireBefore(Instant expiration) {
this.symptomsTable = symptomsTable.expire(expiration);
}
}
}

Просмотреть файл

@ -14,6 +14,7 @@ import tech.tablesaw.columns.ColumnReference;
import tech.tablesaw.filtering.Filter;
import tech.tablesaw.filtering.LongGreaterThanOrEqualTo;
import tech.tablesaw.filtering.LongLessThanOrEqualTo;
import tech.tablesaw.util.Selection;
import java.time.Instant;
import java.util.ArrayList;
@ -34,7 +35,7 @@ class TableUtils {
return result;
}
static Table filterTime(Table table, String column, Instant oldest, Instant newest) {
static Selection filterTime(Table table, String column, Instant oldest, Instant newest) {
List<Filter> filters = new ArrayList<>();
if (oldest != null) {
@ -43,11 +44,12 @@ class TableUtils {
if (newest != null) {
filters.add(new LongLessThanOrEqualTo(new ColumnReference(column), newest.toEpochMilli()));
}
if (filters.isEmpty()) {
return table;
throw new IllegalArgumentException();
}
return table.selectWhere(allOf(filters));
return allOf(filters).apply(table);
}
static Collection<Instant> uniqueInstants(LongColumn timeStamps) {

Просмотреть файл

@ -99,7 +99,11 @@ public class PoliciesExecutor {
// TODO pretty print
LOG.info(actions.toString());
// TODO delete expired objects from state store
Instant expiration = current.minus(Duration.ofMinutes(30));
context.measurementsTableBuilder.expireBefore(expiration);
context.symptomsTableBuilder.expireBefore(expiration);
context.diagnosisTableBuilder.expireBefore(expiration);
context.actionTableBuilder.expireBefore(expiration);
}
}, 1, 1, TimeUnit.MILLISECONDS);
@ -133,10 +137,7 @@ public class PoliciesExecutor {
}
private void captureCheckpoint() {
if (checkpoint != null) {
previousCheckpoint = checkpoint;
}
previousCheckpoint = checkpoint != null ? checkpoint : Instant.MIN;
checkpoint = Instant.now();
}

Просмотреть файл

@ -89,6 +89,18 @@ public class MeasurementsTableTest {
resultTable.get().forEach(m -> assertTrue(70 >= m.instant().toEpochMilli()));
}
@Test
public void expire() {
Instant expiration = Instant.ofEpochMilli(70);
resultTable = testTable.between(null, expiration);
assertEquals(7, resultTable.size());
resultTable = testTable.expire(expiration);
assertEquals(17, resultTable.size());
resultTable = resultTable.between(null, expiration);
assertEquals(0, resultTable.size());
}
@Test
public void max() {
assertEquals(240, testTable.max(), 0.01);

Просмотреть файл

@ -89,6 +89,18 @@ public class SymptomsTableTest {
resultTable.get().forEach(s -> assertTrue(30 >= s.instant().toEpochMilli()));
}
@Test
public void expire() {
Instant expiration = Instant.ofEpochMilli(20);
resultTable = testTable.between(null, expiration);
assertEquals(6, resultTable.size());
resultTable = testTable.expire(expiration);
assertEquals(12, resultTable.size());
resultTable = resultTable.between(null, expiration);
assertEquals(0, resultTable.size());
}
@Test
public void size() {
assertEquals(18, testTable.size());

Просмотреть файл

@ -12,18 +12,25 @@ import com.microsoft.dhalion.core.Action;
import com.microsoft.dhalion.core.Diagnosis;
import com.microsoft.dhalion.core.Measurement;
import com.microsoft.dhalion.core.Symptom;
import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -84,4 +91,92 @@ public class PoliciesExecutorTest {
executor.destroy();
}
@Test
public void verifyExpiry() throws Exception {
Instant now = Instant.now();
Instant old = now.minus(Duration.ofMinutes(35));
Measurement mRetain = new Measurement("c", "i", "m", now, 123);
Measurement mExpire = new Measurement("c", "i", "m", old, 123);
List<Measurement> measurements = Arrays.asList(mRetain, mExpire);
Symptom sRetain = new Symptom("s", now, null, null);
Symptom sExpire = new Symptom("s", old, null, null);
List<Symptom> symptoms = Arrays.asList(sRetain, sExpire);
Diagnosis dRetain = new Diagnosis("d", now, null, null);
Diagnosis dExpire = new Diagnosis("d", old, null, null);
List<Diagnosis> diagnosis = Arrays.asList(dRetain, dExpire);
Action aRetain = new Action("a", now, null, null);
Action aExpire = new Action("a", old, null, null);
List<Action> actions = Arrays.asList(aRetain, aExpire);
CountDownLatch barrier = new CountDownLatch(1);
final AtomicInteger executeCount = new AtomicInteger();
HealthPolicyImpl barrierPolicy = new HealthPolicyImpl() {
private ExecutionContext context;
@Override
public void initialize(ExecutionContext ctxt) {
this.context = ctxt;
}
@Override
public Duration getDelay() {
if (executeCount.get() == 1 && barrier.getCount() > 0) {
// verify result of expiry in previous cycle
assertEquals(2, measurements.size());
assertEquals(1, context.measurements().size());
assertEquals(now, context.measurements().get().iterator().next().instant());
assertEquals(2, symptoms.size());
assertEquals(1, context.symptoms().size());
assertEquals(now, context.symptoms().get().iterator().next().instant());
assertEquals(2, diagnosis.size());
assertEquals(1, context.diagnosis().size());
assertEquals(now, context.diagnosis().get().iterator().next().instant());
assertEquals(2, actions.size());
assertEquals(1, context.actions().size());
assertEquals(now, context.actions().get().iterator().next().instant());
barrier.countDown();
}
return Duration.ZERO;
}
@Override
public Collection<Measurement> executeSensors() {
executeCount.incrementAndGet();
return measurements;
}
@Override
public Collection<Symptom> executeDetectors(Collection<Measurement> measurements) {
return symptoms;
}
@Override
public Collection<Diagnosis> executeDiagnosers(Collection<Symptom> symptoms) {
return diagnosis;
}
@Override
public Collection<Action> executeResolvers(Collection<Diagnosis> diagnosis) {
return actions;
}
};
List<IHealthPolicy> policies = Collections.singletonList(barrierPolicy);
PoliciesExecutor executor = new PoliciesExecutor(policies);
ScheduledFuture<?> future = executor.start();
barrier.await(500, TimeUnit.MILLISECONDS);
if (future.isDone()) {
future.get();
}
executor.destroy();
}
}