This commit is contained in:
annie-mac 2022-09-27 09:24:48 -07:00
Родитель 9322d04f02
Коммит 6da3d29b66
1 изменённых файлов: 64 добавлений и 51 удалений

Просмотреть файл

@ -86,35 +86,42 @@ public class CosmosDBSinkTaskTest {
.getFailedRecordResponses()
.add(new SinkOperationFailedResponse(record, new BadRequestException("Unable to serialize JSON request")));
MockedConstruction<? extends SinkWriterBase> mockedWriterConstruction;
if (this.isBulkModeEnabled) {
mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
} else {
mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
}
MockedConstruction<? extends SinkWriterBase> mockedWriterConstruction = null;
try {
testTask.put(Arrays.asList(record));
fail("Expected ConnectException on bad message");
} catch (ConnectException ce) {
if (this.isBulkModeEnabled) {
mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
} catch (Throwable t) {
fail("Expected ConnectException, but got: " + t.getClass().getName());
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
} else {
mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> {
if (invocation.getMethod().equals(PointWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
}
try {
testTask.put(Arrays.asList(record));
fail("Expected ConnectException on bad message");
} catch (ConnectException ce) {
} catch (Throwable t) {
fail("Expected ConnectException, but got: " + t.getClass().getName());
}
assertEquals(1, mockedWriterConstruction.constructed().size());
} finally {
if (mockedWriterConstruction != null) {
mockedWriterConstruction.close();
}
}
assertEquals(1, mockedWriterConstruction.constructed().size());
}
@Test
@ -130,34 +137,40 @@ public class CosmosDBSinkTaskTest {
SinkWriteResponse sinkWriteResponse = new SinkWriteResponse();
sinkWriteResponse.getSucceededRecords().add(record);
MockedConstruction<? extends SinkWriterBase> mockedWriterConstruction;
if (this.isBulkModeEnabled) {
mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
} else {
mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
}
MockedConstruction<? extends SinkWriterBase> mockedWriterConstruction = null;
try {
testTask.put(Arrays.asList(record));
} catch (ConnectException ce) {
fail("Expected sink write succeeded. but got: " + ce.getMessage());
} catch (Throwable t) {
fail("Expected sink write succeeded, but got: " + t.getClass().getName());
}
if (this.isBulkModeEnabled) {
mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> {
if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
assertEquals(1, mockedWriterConstruction.constructed().size());
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
} else {
mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> {
if (invocation.getMethod().equals(PointWriter.class.getMethod("write", List.class))) {
return sinkWriteResponse;
}
throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName());
});
}
try {
testTask.put(Arrays.asList(record));
} catch (ConnectException ce) {
fail("Expected sink write succeeded. but got: " + ce.getMessage());
} catch (Throwable t) {
fail("Expected sink write succeeded, but got: " + t.getClass().getName());
}
assertEquals(1, mockedWriterConstruction.constructed().size());
} finally {
if (mockedWriterConstruction != null) {
mockedWriterConstruction.close();
}
}
}
}