зеркало из https://github.com/microsoft/kafka.git
Some new unit tests for ByteBufferMessageSet iterator KAFKA-108; patched by Jun; reviewed by Neha
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Родитель
900496eeda
Коммит
28984e6c18
|
@ -20,6 +20,7 @@ package kafka.message
|
|||
import java.nio._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Test
|
||||
import kafka.utils.TestUtils
|
||||
|
||||
class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
|
||||
|
||||
|
@ -49,4 +50,68 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
|
|||
assertTrue(messages.equals(moreMessages))
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testIterator() {
|
||||
val messageList = List(
|
||||
new Message("msg1".getBytes),
|
||||
new Message("msg2".getBytes),
|
||||
new Message("msg3".getBytes)
|
||||
)
|
||||
|
||||
// test for uncompressed regular messages
|
||||
{
|
||||
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
|
||||
//make sure ByteBufferMessageSet is re-iterable.
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
|
||||
//make sure the last offset after iteration is correct
|
||||
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
|
||||
}
|
||||
|
||||
// test for compressed regular messages
|
||||
{
|
||||
val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
|
||||
//make sure ByteBufferMessageSet is re-iterable.
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
|
||||
//make sure the last offset after iteration is correct
|
||||
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
|
||||
}
|
||||
|
||||
// test for mixed empty and non-empty messagesets uncompressed
|
||||
{
|
||||
val emptyMessageList : List[Message] = Nil
|
||||
val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
|
||||
val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
|
||||
val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
|
||||
buffer.put(emptyMessageSet.serialized)
|
||||
buffer.put(regularMessgeSet.serialized)
|
||||
buffer.rewind
|
||||
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
|
||||
//make sure ByteBufferMessageSet is re-iterable.
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
|
||||
//make sure the last offset after iteration is correct
|
||||
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
|
||||
}
|
||||
|
||||
// test for mixed empty and non-empty messagesets compressed
|
||||
{
|
||||
val emptyMessageList : List[Message] = Nil
|
||||
val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
|
||||
val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
|
||||
val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
|
||||
buffer.put(emptyMessageSet.serialized)
|
||||
buffer.put(regularMessgeSet.serialized)
|
||||
buffer.rewind
|
||||
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
|
||||
//make sure ByteBufferMessageSet is re-iterable.
|
||||
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
|
||||
//make sure the last offset after iteration is correct
|
||||
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -288,6 +288,18 @@ object TestUtils {
|
|||
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
|
||||
|
||||
}
|
||||
|
||||
def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = {
|
||||
new IteratorTemplate[Message] {
|
||||
override def makeNext(): Message = {
|
||||
if (iter.hasNext)
|
||||
return iter.next.message
|
||||
else
|
||||
return allDone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object TestZKUtils {
|
||||
|
|
Загрузка…
Ссылка в новой задаче