This commit is contained in:
Sreeram Garlapati 2015-12-30 22:13:58 -08:00
Родитель 2b1fa05d2d
Коммит 52968da1d5
1 изменённых файлов: 56 добавлений и 30 удалений

Просмотреть файл

@ -13,8 +13,8 @@ import com.microsoft.azure.servicebus.*;
* The data structure encapsulating the Event being sent-to and received-from EventHubs.
* Each EventHubs partition can be visualized as a Stream of EventData.
*/
public class EventData implements AutoCloseable {
public class EventData implements AutoCloseable
{
private String partitionKey;
private String offset;
private long sequenceNumber;
@ -26,7 +26,8 @@ public class EventData implements AutoCloseable {
private SystemProperties systemProperties;
EventData() {
EventData()
{
this.closed = false;
}
@ -34,8 +35,10 @@ public class EventData implements AutoCloseable {
* Internal Constructor - intended to be used only by the {@link PartitionReceiver} to Create #EventData out of #Message
*/
@SuppressWarnings("unchecked")
EventData(Message amqpMessage) {
if (amqpMessage == null) {
EventData(Message amqpMessage)
{
if (amqpMessage == null)
{
throw new IllegalArgumentException("amqpMessage cannot be null");
}
@ -79,10 +82,12 @@ public class EventData implements AutoCloseable {
* @param data the actual payload of data in bytes to be Sent to EventHubs.
* @see to start sending to EventHubs refer to {@link EventHubClient#createFromConnectionString(String)}
*/
public EventData(byte[] data) {
public EventData(byte[] data)
{
this();
if (data == null) {
if (data == null)
{
throw new IllegalArgumentException("data cannot be null");
}
@ -109,10 +114,12 @@ public class EventData implements AutoCloseable {
* @param length length of the byte[] to be read, starting from offset
* @see to start sending to EventHubs refer to {@link EventHubClient#createFromConnectionString(String)}
*/
public EventData(byte[] data, final int offset, final int length) {
public EventData(byte[] data, final int offset, final int length)
{
this();
if (data == null) {
if (data == null)
{
throw new IllegalArgumentException("data cannot be null");
}
@ -137,10 +144,12 @@ public class EventData implements AutoCloseable {
* @param buffer ByteBuffer which references the payload of the Event to be sent to EventHubs
* @see to start sending to EventHubs refer to {@link EventHubClient#createFromConnectionString(String)}
*/
public EventData(ByteBuffer buffer){
public EventData(ByteBuffer buffer)
{
this();
if (buffer == null) {
if (buffer == null)
{
throw new IllegalArgumentException("data cannot be null");
}
@ -152,18 +161,21 @@ public class EventData implements AutoCloseable {
* This is intended to be used after receiving EventData using @@PartitionReceiver.
* @return returns the byte[] of the actual data
*/
public byte[] getBody() {
public byte[] getBody()
{
return this.bodyData.getArray();
}
/**
* Application property bag
*/
public Map<String, String> getProperties() {
public Map<String, String> getProperties()
{
return this.properties;
}
public void setProperties(Map applicationProperties) {
public void setProperties(Map applicationProperties)
{
this.properties = applicationProperties;
}
@ -172,67 +184,81 @@ public class EventData implements AutoCloseable {
* <pr>As these are populated by Service, they are only present on a Received EventData.
* @return an encapsulation of all SystemProperties appended by EventHubs service into EventData
*/
public SystemProperties getSystemProperties() {
if (this.isReceivedEvent && this.systemProperties == null) {
public SystemProperties getSystemProperties()
{
if (this.isReceivedEvent && this.systemProperties == null)
{
this.systemProperties = new SystemProperties(this);
}
return this.systemProperties;
}
private void throwIfAutoClosed() {
if (this.closed) {
private void throwIfAutoClosed()
{
if (this.closed)
{
// TODO: TRACE
throw new IllegalStateException("EventData is already disposed");
}
}
Message toAmqpMessage() {
Message toAmqpMessage()
{
this.throwIfAutoClosed();
Message amqpMessage = Proton.message();
if (this.properties != null && !this.properties.isEmpty()) {
if (this.properties != null && !this.properties.isEmpty())
{
ApplicationProperties applicationProperties = new ApplicationProperties(this.properties);
amqpMessage.setApplicationProperties(applicationProperties);
}
if (this.bodyData != null) {
if (this.bodyData != null)
{
amqpMessage.setBody(new Data(this.bodyData));
}
return amqpMessage;
}
public void close() throws Exception {
if (!this.closed) {
public void close() throws Exception
{
if (!this.closed)
{
// TODO: dispose native resources
}
this.closed = true;
}
public static final class SystemProperties {
public static final class SystemProperties
{
EventData event;
SystemProperties(EventData event) {
SystemProperties(EventData event)
{
this.event = event;
}
public long getSequenceNumber() {
public long getSequenceNumber()
{
return this.event.sequenceNumber;
}
public Date getEnqueuedTimeUtc() {
public Date getEnqueuedTimeUtc()
{
return this.event.enqueuedTimeUtc;
}
public String getOffset() {
public String getOffset()
{
return this.event.offset;
}
public String getPartitionKey() {
public String getPartitionKey()
{
return this.event.partitionKey;
}
}