From 6da3d29b667478407e7084c2fbbf28648a66a60a Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 27 Sep 2022 09:24:48 -0700 Subject: [PATCH] fix tests --- .../connect/sink/CosmosDBSinkTaskTest.java | 115 ++++++++++-------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java index 8afa1df..ccfd54d 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java @@ -86,35 +86,42 @@ public class CosmosDBSinkTaskTest { .getFailedRecordResponses() .add(new SinkOperationFailedResponse(record, new BadRequestException("Unable to serialize JSON request"))); - MockedConstruction mockedWriterConstruction; - if (this.isBulkModeEnabled) { - mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> { - if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { - return sinkWriteResponse; - } - - throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); - }); - } else { - mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> { - if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { - return sinkWriteResponse; - } - - throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); - }); - } + MockedConstruction mockedWriterConstruction = null; try { - testTask.put(Arrays.asList(record)); - fail("Expected ConnectException on bad message"); - } catch (ConnectException ce) { + if (this.isBulkModeEnabled) { + mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> { + if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { + return sinkWriteResponse; + } - } catch (Throwable t) { - fail("Expected ConnectException, but got: " + t.getClass().getName()); + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } else { + mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> { + if (invocation.getMethod().equals(PointWriter.class.getMethod("write", List.class))) { + return sinkWriteResponse; + } + + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } + + try { + testTask.put(Arrays.asList(record)); + fail("Expected ConnectException on bad message"); + } catch (ConnectException ce) { + + } catch (Throwable t) { + fail("Expected ConnectException, but got: " + t.getClass().getName()); + } + + assertEquals(1, mockedWriterConstruction.constructed().size()); + } finally { + if (mockedWriterConstruction != null) { + mockedWriterConstruction.close(); + } } - - assertEquals(1, mockedWriterConstruction.constructed().size()); } @Test @@ -130,34 +137,40 @@ public class CosmosDBSinkTaskTest { SinkWriteResponse sinkWriteResponse = new SinkWriteResponse(); sinkWriteResponse.getSucceededRecords().add(record); - MockedConstruction mockedWriterConstruction; - if (this.isBulkModeEnabled) { - mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> { - if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { - return sinkWriteResponse; - } - - throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); - }); - } else { - mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> { - if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { - return sinkWriteResponse; - } - - throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); - }); - } - + MockedConstruction mockedWriterConstruction = null; try { - testTask.put(Arrays.asList(record)); - } catch (ConnectException ce) { - fail("Expected sink write succeeded. but got: " + ce.getMessage()); - } catch (Throwable t) { - fail("Expected sink write succeeded, but got: " + t.getClass().getName()); - } + if (this.isBulkModeEnabled) { + mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> { + if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { + return sinkWriteResponse; + } - assertEquals(1, mockedWriterConstruction.constructed().size()); + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } else { + mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> { + if (invocation.getMethod().equals(PointWriter.class.getMethod("write", List.class))) { + return sinkWriteResponse; + } + + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } + + try { + testTask.put(Arrays.asList(record)); + } catch (ConnectException ce) { + fail("Expected sink write succeeded. but got: " + ce.getMessage()); + } catch (Throwable t) { + fail("Expected sink write succeeded, but got: " + t.getClass().getName()); + } + + assertEquals(1, mockedWriterConstruction.constructed().size()); + } finally { + if (mockedWriterConstruction != null) { + mockedWriterConstruction.close(); + } + } } }