This commit is contained in:
John Taubensee 2017-05-31 15:10:45 -07:00 коммит произвёл GitHub
Родитель 06381f8aad
Коммит 4f18a47a7d
42 изменённых файлов: 508 добавлений и 60 удалений

Просмотреть файл

@ -3,13 +3,26 @@
namespace Microsoft.Azure.EventHubs.Processor
{
/// <summary>
/// The context object used to preserve state in the stream.
/// </summary>
public class Checkpoint
{
/// <summary>
/// Creates a new Checkpoint for a particular partition ID.
/// </summary>
/// <param name="partitionId">The partition ID for the checkpoint</param>
public Checkpoint(string partitionId)
: this(partitionId, PartitionReceiver.StartOfStream, 0)
{
}
/// <summary>
/// Creates a new Checkpoint for a particular partition ID, with the offset and sequence number.
/// </summary>
/// <param name="partitionId">The partition ID for the checkpoint</param>
/// <param name="offset">The offset for the last processed <see cref="EventData"/></param>
/// <param name="sequenceNumber">The sequence number of the last processed <see cref="EventData"/></param>
public Checkpoint(string partitionId, string offset, long sequenceNumber)
{
this.PartitionId = partitionId;
@ -17,6 +30,10 @@ namespace Microsoft.Azure.EventHubs.Processor
this.SequenceNumber = sequenceNumber;
}
/// <summary>
/// Creates a new Checkpoint from an existing checkpoint.
/// </summary>
/// <param name="source">The existing checkpoint to copy</param>
public Checkpoint(Checkpoint source)
{
this.PartitionId = source.PartitionId;
@ -24,10 +41,19 @@ namespace Microsoft.Azure.EventHubs.Processor
this.SequenceNumber = source.SequenceNumber;
}
/// <summary>
/// Gets or sets the offset of the last processed <see cref="EventData"/>.
/// </summary>
public string Offset { get; set; }
/// <summary>
/// Gets or sets the sequence number of the last processed <see cref="EventData"/>.
/// </summary>
public long SequenceNumber { get; set; }
/// <summary>
/// Gets the partition ID for the corresponding checkpoint.
/// </summary>
public string PartitionId { get; }
}
}

Просмотреть файл

@ -3,9 +3,19 @@
namespace Microsoft.Azure.EventHubs.Processor
{
/// <summary>
/// Reason for closing an <see cref="EventProcessorHost"/>.
/// </summary>
public enum CloseReason
{
/// <summary>
/// The lease was lost or transitioned to a new processor instance.
/// </summary>
LeaseLost,
/// <summary>
/// The <see cref="EventProcessorHost"/> was shutdown.
/// </summary>
Shutdown
}
}

Просмотреть файл

@ -72,7 +72,7 @@ namespace Microsoft.Azure.EventHubs.Processor
object startAt = await this.PartitionContext.GetInitialOffsetAsync().ConfigureAwait(false);
long epoch = this.Lease.Epoch;
ProcessorEventSource.Log.PartitionPumpCreateClientsStart(this.Host.Id, this.PartitionContext.PartitionId, epoch, startAt?.ToString());
this.eventHubClient = EventHubClient.CreateFromConnectionString(this.Host.EventHubConnectionString);
this.eventHubClient = EventHubClient.CreateFromConnectionString(this.Host.EventHubConnectionString);
// Create new receiver and set options
if (startAt is string)

Просмотреть файл

@ -5,14 +5,17 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
/// <summary>
/// An exception which specifies that the <see cref="EventProcessorHost"/> configuration is incorrect.
/// </summary>
public class EventProcessorConfigurationException : EventHubsException
{
public EventProcessorConfigurationException(string message)
internal EventProcessorConfigurationException(string message)
: this(message, null)
{
}
public EventProcessorConfigurationException(string message, Exception innerException)
internal EventProcessorConfigurationException(string message, Exception innerException)
: base(false, message, innerException)
{
}

Просмотреть файл

@ -6,6 +6,9 @@ namespace Microsoft.Azure.EventHubs.Processor
using System;
using System.Threading.Tasks;
/// <summary>
/// Represents a host for processing Event Hubs event data.
/// </summary>
public sealed class EventProcessorHost
{
readonly bool initializeLeaseManager;
@ -160,8 +163,14 @@ namespace Microsoft.Azure.EventHubs.Processor
/// </summary>
public string HostName { get; }
/// <summary>
/// Gets the event hub path.
/// </summary>
public string EventHubPath { get; }
/// <summary>
/// Gets the consumer group name.
/// </summary>
public string ConsumerGroupName { get; }
// All of these accessors are for internal use only.

Просмотреть файл

@ -3,21 +3,20 @@
namespace Microsoft.Azure.EventHubs.Processor
{
public static class EventProcessorHostActionStrings
internal static class EventProcessorHostActionStrings
{
public static readonly string CheckingLeases = "Checking Leases";
public static readonly string ClosingEventProcessor = "Closing Event Processor";
public static readonly string CreatingCheckpoint = "Creating Checkpoint";
public static readonly string CreatingCheckpointStore = "Creating Checkpoint Store";
public static readonly string CreatingEventProcessor = "Creating Event Processor";
public static readonly string CreatingLease = "Creating Lease";
public static readonly string CreatingLeaseStore = "Creating Lease Store";
public static readonly string InitializingStores = "Initializing Stores";
public static readonly string OpeningEventProcessor = "Opening Event Processor";
public static readonly string PartitionManagerCleanup = "Partition Manager Cleanup";
public static readonly string PartitionManagerMainLoop = "Partition Manager Main Loop";
public static readonly string StealingLease = "Stealing Lease";
public static readonly string PartitionPumpManagement = "Managing Partition Pumps";
internal static readonly string CheckingLeases = "Checking Leases";
internal static readonly string ClosingEventProcessor = "Closing Event Processor";
internal static readonly string CreatingCheckpoint = "Creating Checkpoint";
internal static readonly string CreatingCheckpointStore = "Creating Checkpoint Store";
internal static readonly string CreatingEventProcessor = "Creating Event Processor";
internal static readonly string CreatingLease = "Creating Lease";
internal static readonly string CreatingLeaseStore = "Creating Lease Store";
internal static readonly string InitializingStores = "Initializing Stores";
internal static readonly string OpeningEventProcessor = "Opening Event Processor";
internal static readonly string PartitionManagerCleanup = "Partition Manager Cleanup";
internal static readonly string PartitionManagerMainLoop = "Partition Manager Main Loop";
internal static readonly string StealingLease = "Stealing Lease";
internal static readonly string PartitionPumpManagement = "Managing Partition Pumps";
}
}

Просмотреть файл

@ -5,6 +5,9 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
/// <summary>
/// Defines the runtime options when registering an <see cref="IEventProcessor"/> interface with an EventHubConsumerGroup. This is also the mechanism for catching exceptions from an IEventProcessor instance used by an <see cref="EventProcessorHost"/> object.
/// </summary>
public sealed class EventProcessorOptions
{
Action<ExceptionReceivedEventArgs> exceptionHandler;
@ -27,6 +30,9 @@ namespace Microsoft.Azure.EventHubs.Processor
}
}
/// <summary>
/// Creates a new <see cref="EventProcessorOptions"/> object.
/// </summary>
public EventProcessorOptions()
{
this.MaxBatchSize = 10;

Просмотреть файл

@ -5,19 +5,25 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
/// <summary>
/// An exception thrown during event processing.
/// </summary>
public class EventProcessorRuntimeException : EventHubsException
{
public EventProcessorRuntimeException(string message, string action)
internal EventProcessorRuntimeException(string message, string action)
: this(message, action, null)
{
}
public EventProcessorRuntimeException(string message, string action, Exception innerException)
internal EventProcessorRuntimeException(string message, string action, Exception innerException)
: base(true, message, innerException)
{
this.Action = action;
}
/// <summary>
/// Gets the action that was being performed when the exception occured.
/// </summary>
public string Action { get; }
}
}

Просмотреть файл

@ -5,6 +5,9 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
/// <summary>
/// Provides data for the <see cref="EventProcessorOptions.NotifyOfException(string, string, System.Exception, string)"/> event.
/// </summary>
public sealed class ExceptionReceivedEventArgs
{
internal ExceptionReceivedEventArgs(string hostname, string partitionId, Exception exception, string action)

Просмотреть файл

@ -38,6 +38,10 @@ namespace Microsoft.Azure.EventHubs.Processor
/// <returns>Checkpoint info for the given partition, or null if none has been previously stored.</returns>
Task<Checkpoint> GetCheckpointAsync(string partitionId);
/// <summary>
/// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
/// </summary>
/// <param name="checkpoint">offset/sequeceNumber to update the store with.</param>
[System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Task UpdateCheckpointAsync(Checkpoint checkpoint);

Просмотреть файл

@ -5,12 +5,18 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System.Threading.Tasks;
/// <summary>
/// Contains partition ownership information.
/// </summary>
public class Lease
{
/// <summary></summary>
protected Lease()
{
}
/// <summary></summary>
/// <param name="partitionId"></param>
protected Lease(string partitionId)
{
this.PartitionId = partitionId;
@ -18,6 +24,8 @@ namespace Microsoft.Azure.EventHubs.Processor
this.Token = string.Empty;
}
/// <summary></summary>
/// <param name="source"></param>
protected Lease(Lease source)
{
this.PartitionId = source.PartitionId;
@ -26,18 +34,40 @@ namespace Microsoft.Azure.EventHubs.Processor
this.Token = source.Token;
}
/// <summary>
/// Gets or sets the current value for the offset in the stream.
/// </summary>
public string Offset { get; set; }
/// <summary>
/// Gets or sets the last checkpointed sequence number in the stream.
/// </summary>
public long SequenceNumber { get; set; }
/// <summary>
/// Gets the ID of the partition to which this lease belongs.
/// </summary>
public string PartitionId { get; set; }
/// <summary>
/// Gets or sets the host owner for the partition.
/// </summary>
public string Owner { get; set; }
/// <summary>
/// Gets or sets the lease token that manages concurrency between hosts. You can use this token to guarantee single access to any resource needed by the <see cref="IEventProcessor"/> object.
/// </summary>
public string Token { get; set; }
/// <summary>
/// Gets or sets the epoch year of the lease, which is a value you can use to determine the most recent owner of a partition between competing nodes.
/// </summary>
public long Epoch { get; set; }
/// <summary>
/// Determines whether the lease is expired.
/// </summary>
/// <returns></returns>
public virtual Task<bool> IsExpired()
{
// By default lease never expires.

Просмотреть файл

@ -5,6 +5,9 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
/// <summary>
/// Represents an exception that occurs when the service lease has been lost.
/// </summary>
public class LeaseLostException : Exception
{
readonly string partitionId;
@ -20,6 +23,9 @@ namespace Microsoft.Azure.EventHubs.Processor
this.partitionId = partitionId;
}
/// <summary>
/// Gets the partition ID where the exception occured.
/// </summary>
public string PartitionId
{
get { return this.partitionId; }

Просмотреть файл

@ -23,6 +23,7 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<DebugType>full</DebugType>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.EventHubs.Processor.xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0'">

Просмотреть файл

@ -6,6 +6,9 @@ namespace Microsoft.Azure.EventHubs.Processor
using System;
using System.Threading.Tasks;
/// <summary>
/// Encapsulates information related to an Event Hubs partition used by <see cref="IEventProcessor"/>.
/// </summary>
public class PartitionContext
{
readonly EventProcessorHost host;
@ -21,12 +24,24 @@ namespace Microsoft.Azure.EventHubs.Processor
this.SequenceNumber = 0;
}
/// <summary>
/// Gets the name of the consumer group.
/// </summary>
public string ConsumerGroupName { get; }
/// <summary>
/// Gets the path of the event hub.
/// </summary>
public string EventHubPath { get; }
/// <summary>
/// Gets the partition ID for the context.
/// </summary>
public string PartitionId { get; }
/// <summary>
/// Gets the host owner for the partition.
/// </summary>
public string Owner
{
get
@ -85,16 +100,16 @@ namespace Microsoft.Azure.EventHubs.Processor
{
throw new ArgumentException("Unexpected object type returned by user-provided initialOffsetProvider");
}
}
else
{
}
else
{
this.Offset = startingCheckpoint.Offset;
this.SequenceNumber = startingCheckpoint.SequenceNumber;
this.SequenceNumber = startingCheckpoint.SequenceNumber;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, this.PartitionId, $"Retrieved starting offset/sequenceNumber: {this.Offset}/{this.SequenceNumber}");
startAt = this.Offset;
}
return startAt;
return startAt;
}
/// <summary>
@ -102,11 +117,11 @@ namespace Microsoft.Azure.EventHubs.Processor
/// </summary>
public Task CheckpointAsync()
{
// Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair
// instead of catching an update halfway through. Do the capturing here because by the time the checkpoint
// task runs, the fields in this object may have changed, but we should only write to store what the user
// has directed us to write.
Checkpoint capturedCheckpoint;
// Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair
// instead of catching an update halfway through. Do the capturing here because by the time the checkpoint
// task runs, the fields in this object may have changed, but we should only write to store what the user
// has directed us to write.
Checkpoint capturedCheckpoint;
lock(this.ThisLock)
{
capturedCheckpoint = new Checkpoint(this.PartitionId, this.Offset, this.SequenceNumber);
@ -138,6 +153,10 @@ namespace Microsoft.Azure.EventHubs.Processor
return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber));
}
/// <summary>
/// Provides the parition context in the following format:"PartitionContext({EventHubPath}/{ConsumerGroupName}/{PartitionId}/{SequenceNumber})"
/// </summary>
/// <returns></returns>
public override string ToString()
{
return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumber})";

Просмотреть файл

@ -10,7 +10,7 @@ namespace Microsoft.Azure.EventHubs.Processor
abstract class PartitionPump
{
protected PartitionPump(EventProcessorHost host, Lease lease)
protected PartitionPump(EventProcessorHost host, Lease lease)
{
this.Host = host;
this.Lease = lease;

Просмотреть файл

@ -14,7 +14,7 @@ namespace Microsoft.Azure.EventHubs
/// map to an EventSource Guid based on the Name (Microsoft-Azure-EventHubs-Processor).
/// </summary>
[EventSource(Name = "Microsoft-Azure-EventHubs-Processor")]
public class ProcessorEventSource : EventSource
internal class ProcessorEventSource : EventSource
{
public static ProcessorEventSource Log { get; } = new ProcessorEventSource();

Просмотреть файл

@ -82,6 +82,9 @@ namespace Microsoft.Azure.EventHubs
internal AmqpMessage AmqpMessage { get; set; }
/// <summary>
/// Disposes resources attached to an Event Data
/// </summary>
public void Dispose()
{
Dispose(true);
@ -103,27 +106,37 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// A collection used to store properties which are set by the Event Hubs service.
/// </summary>
public sealed class SystemPropertiesCollection
{
internal SystemPropertiesCollection()
{
}
/// <summary>Gets the logical sequence number of the event within the partition stream of the Event Hub.</summary>
public long SequenceNumber
{
get; internal set;
}
/// <summary>Gets or sets the date and time of the sent time in UTC.</summary>
/// <value>The enqueue time in UTC. This value represents the actual time of enqueuing the message.</value>
public DateTime EnqueuedTimeUtc
{
get; internal set;
}
/// <summary>
/// Gets the offset of the data relative to the Event Hub partition stream. The offset is a marker or identifier for an event within the Event Hubs stream. The identifier is unique within a partition of the Event Hubs stream.
/// </summary>
public string Offset
{
get; internal set;
}
/// <summary>Gets the partition key of the corresponding partition that stored the <see cref="EventData"/></summary>
public string PartitionKey
{
get; internal set;

Просмотреть файл

@ -16,6 +16,10 @@ namespace Microsoft.Azure.EventHubs
long currentSize;
bool disposed;
/// <summary>
/// Creates a new <see cref="EventDataBatch"/>.
/// </summary>
/// <param name="maxSizeInBytes">The maximum size allowed for the batch</param>
public EventDataBatch(long maxSizeInBytes)
{
this.maxSize = Math.Min(maxSizeInBytes, MaxSizeLimit);
@ -79,6 +83,9 @@ namespace Microsoft.Azure.EventHubs
return eventData.AmqpMessage.SerializedMessageSize;
}
/// <summary>
/// Disposes resources attached to an EventDataBatch.
/// </summary>
public void Dispose()
{
Dispose(true);

Просмотреть файл

@ -10,7 +10,7 @@ namespace Microsoft.Azure.EventHubs
/// <summary>
/// Anchor class - all EventHub client operations start here.
/// See <see cref="EventHubClient.Create(string)"/>
/// See <see cref="EventHubClient.CreateFromConnectionString(string)"/>
/// </summary>
public abstract class EventHubClient : ClientEntity
{
@ -24,10 +24,14 @@ namespace Microsoft.Azure.EventHubs
this.RetryPolicy = RetryPolicy.Default;
}
/// <summary>
/// Gets the name of the EventHub.
/// </summary>
public string EventHubName { get; }
internal EventHubsConnectionStringBuilder ConnectionStringBuilder { get; }
/// <summary></summary>
protected object ThisLock { get; } = new object();
EventDataSender InnerSender
@ -49,6 +53,11 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// Creates a new instance of the Event Hubs client using the specified connection string. You can populate the EntityPath property with the name of the Event Hub.
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
public static EventHubClient CreateFromConnectionString(string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
@ -73,6 +82,10 @@ namespace Microsoft.Azure.EventHubs
return eventHubClient;
}
/// <summary>
/// Closes and releases resources associated with <see cref="EventHubClient"/>.
/// </summary>
/// <returns></returns>
public sealed override async Task CloseAsync()
{
EventHubsEventSource.Log.ClientCloseStart(this.ClientId);
@ -172,7 +185,7 @@ namespace Microsoft.Azure.EventHubs
/// Using this type of send (Sending using a specific partitionKey) could sometimes result in partitions which are not evenly distributed.
/// </summary>
/// <param name="eventData">the <see cref="EventData"/> to be sent.</param>
/// <param name="partitionKey">the partitionKey will be hashed to determine the partitionId to send the EventData to. On the Received message this can be accessed at <see cref="EventData.SystemProperties.PartitionKey"/>.</param>
/// <param name="partitionKey">the partitionKey will be hashed to determine the partitionId to send the EventData to. On the Received message this can be accessed at <see cref="EventData.SystemPropertiesCollection.PartitionKey"/>.</param>
/// <returns>A Task that completes when the send operation is done.</returns>
/// <seealso cref="SendAsync(EventData)"/>
/// <seealso cref="PartitionSender.SendAsync(EventData)"/>
@ -198,7 +211,7 @@ namespace Microsoft.Azure.EventHubs
/// <para>ii. Sending multiple events in One Transaction. This is the reason why all events sent in a batch needs to have same partitionKey (so that they are sent to one partition only).</para>
/// </summary>
/// <param name="eventDatas">the batch of events to send to EventHub</param>
/// <param name="partitionKey">the partitionKey will be hashed to determine the partitionId to send the EventData to. On the Received message this can be accessed at <see cref="EventData.SystemProperties.PartitionKey"/>.</param>
/// <param name="partitionKey">the partitionKey will be hashed to determine the partitionId to send the EventData to. On the Received message this can be accessed at <see cref="EventData.SystemPropertiesCollection.PartitionKey"/>.</param>
/// <returns>A Task that completes when the send operation is done.</returns>
/// <seealso cref="SendAsync(EventData)"/>
/// <see cref="PartitionSender.SendAsync(EventData)"/>
@ -287,7 +300,7 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
/// <param name="consumerGroupName">the consumer group name that this receiver should be grouped under.</param>
/// <param name="partitionId">the partition Id that the receiver belongs to. All data received will be from this partition only.</param>
/// <param name="startTime">the DateTime instant that receive operations will start receive events from. Events received will have <see cref="EventData.SystemProperties.EnqueuedTime"/> later than this Instant.</param>
/// <param name="startTime">the DateTime instant that receive operations will start receive events from. Events received will have <see cref="EventData.SystemPropertiesCollection.EnqueuedTimeUtc"/> later than this Instant.</param>
/// <returns>The created PartitionReceiver</returns>
/// <seealso cref="PartitionReceiver"/>
public PartitionReceiver CreateReceiver(string consumerGroupName, string partitionId, DateTime startTime)
@ -360,7 +373,7 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
/// <param name="consumerGroupName">the consumer group name that this receiver should be grouped under.</param>
/// <param name="partitionId">the partition Id that the receiver belongs to. All data received will be from this partition only.</param>
/// <param name="startTime">the date time instant that receive operations will start receive events from. Events received will have <see cref="EventData.SystemProperties.EnqueuedTime"/> later than this instant.</param>
/// <param name="startTime">the date time instant that receive operations will start receive events from. Events received will have <see cref="EventData.SystemPropertiesCollection.EnqueuedTimeUtc"/> later than this instant.</param>
/// <param name="epoch">a unique identifier (epoch value) that the service uses, to enforce partition/lease ownership.</param>
/// <returns>The created PartitionReceiver</returns>
/// <seealso cref="PartitionReceiver"/>
@ -441,12 +454,27 @@ namespace Microsoft.Azure.EventHubs
internal abstract EventDataSender OnCreateEventSender(string partitionId);
/// <summary></summary>
/// <param name="consumerGroupName"></param>
/// <param name="partitionId"></param>
/// <param name="startOffset"></param>
/// <param name="offsetInclusive"></param>
/// <param name="startTime"></param>
/// <param name="epoch"></param>
/// <returns></returns>
protected abstract PartitionReceiver OnCreateReceiver(string consumerGroupName, string partitionId, string startOffset, bool offsetInclusive, DateTime? startTime, long? epoch);
/// <summary></summary>
/// <returns></returns>
protected abstract Task<EventHubRuntimeInformation> OnGetRuntimeInformationAsync();
/// <summary></summary>
/// <param name="partitionId"></param>
/// <returns></returns>
protected abstract Task<EventHubPartitionRuntimeInformation> OnGetPartitionRuntimeInformationAsync(string partitionId);
/// <summary></summary>
/// <returns></returns>
protected abstract Task OnCloseAsync();
}
}

Просмотреть файл

@ -5,6 +5,9 @@ namespace Microsoft.Azure.EventHubs
{
using System;
/// <summary>
/// Contains information regarding an event hub partition.
/// </summary>
public class EventHubPartitionRuntimeInformation
{
internal string Type { get; set; }

Просмотреть файл

@ -5,6 +5,9 @@ namespace Microsoft.Azure.EventHubs
{
using System;
/// <summary>
/// Contains information regarding Event Hubs.
/// </summary>
public class EventHubRuntimeInformation
{
internal string Type { get; set; }

Просмотреть файл

@ -14,7 +14,7 @@ namespace Microsoft.Azure.EventHubs
/// map to an EventSource Guid based on the Name (Microsoft-Azure-EventHubs).
/// </summary>
[EventSource(Name = "Microsoft-Azure-EventHubs")]
public class EventHubsEventSource : EventSource
internal sealed class EventHubsEventSource : EventSource
{
public static EventHubsEventSource Log { get; } = new EventHubsEventSource();

Просмотреть файл

@ -13,15 +13,23 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public interface IPartitionReceiveHandler
{
/// <summary>
/// Gets the maximum batch size.
/// </summary>
int MaxBatchSize { get; }
/// <summary>
/// Users should implement this method to specify the action to be performed on the received events.
/// </summary>
/// <seealso cref="PartitionReceiver.ReceiveAsync"/>
/// <seealso cref="PartitionReceiver.ReceiveAsync(int)"/>
/// <param name="events">The list of fetched events from the corresponding PartitionReceiver.</param>
Task ProcessEventsAsync(IEnumerable<EventData> events);
/// <summary>
/// Implement in order to handle exceptions that are thrown during receipt of events.
/// </summary>
/// <param name="error">The <see cref="Exception"/> to be processed</param>
/// <returns>An asynchronour operation</returns>
Task ProcessErrorAsync(Exception error);
}
}

Просмотреть файл

@ -24,6 +24,7 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<DebugType>full</DebugType>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\Microsoft.Azure.EventHubs.xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'uap10.0'">

Просмотреть файл

@ -18,8 +18,8 @@ namespace Microsoft.Azure.EventHubs
/// non-epoch receivers.
/// </para>
/// </summary>
/// <seealso cref="EventHubClient.CreateReceiver"/>
/// <seealso cref="EventHubClient.CreateEpochReceiver"/>
/// <seealso cref="EventHubClient.CreateReceiver(string, string, string)"/>
/// <seealso cref="EventHubClient.CreateEpochReceiver(string, string, string, long)"/>
public abstract class PartitionReceiver : ClientEntity
{
/// <summary>
@ -44,6 +44,14 @@ namespace Microsoft.Azure.EventHubs
const int MaxPrefetchCount = 999;
const int DefaultPrefetchCount = 300;
/// <summary></summary>
/// <param name="eventHubClient"></param>
/// <param name="consumerGroupName"></param>
/// <param name="partitionId"></param>
/// <param name="startOffset"></param>
/// <param name="offsetInclusive"></param>
/// <param name="startTime"></param>
/// <param name="epoch"></param>
protected internal PartitionReceiver(
EventHubClient eventHubClient,
string consumerGroupName,
@ -95,10 +103,13 @@ namespace Microsoft.Azure.EventHubs
/// <value>the epoch value that this receiver is currently using for partition ownership.</value>
public long? Epoch { get; }
/// <summary></summary>
protected DateTime? StartTime { get; private set; }
/// <summary></summary>
protected bool OffsetInclusive { get; }
/// <summary></summary>
protected string StartOffset { get; private set; }
/// <summary>
@ -171,6 +182,10 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// Sets the <see cref="IPartitionReceiveHandler"/> to process events.
/// </summary>
/// <param name="receiveHandler">The <see cref="IPartitionReceiveHandler"/> used to process events.</param>
public void SetReceiveHandler(IPartitionReceiveHandler receiveHandler)
{
EventHubsEventSource.Log.SetReceiveHandlerStart(this.ClientId, receiveHandler != null ? receiveHandler.GetType().ToString() : "null");
@ -178,6 +193,10 @@ namespace Microsoft.Azure.EventHubs
EventHubsEventSource.Log.SetReceiveHandlerStop(this.ClientId);
}
/// <summary>
/// Closes and releases resources associated with <see cref="PartitionReceiver"/>.
/// </summary>
/// <returns>An asynchronous operation</returns>
public sealed override Task CloseAsync()
{
EventHubsEventSource.Log.ClientCloseStart(this.ClientId);
@ -191,10 +210,18 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary></summary>
/// <param name="maxMessageCount"></param>
/// <param name="waitTime"></param>
/// <returns></returns>
protected abstract Task<IList<EventData>> OnReceiveAsync(int maxMessageCount, TimeSpan waitTime);
/// <summary></summary>
/// <param name="receiveHandler"></param>
protected abstract void OnSetReceiveHandler(IPartitionReceiveHandler receiveHandler);
/// <summary></summary>
/// <returns></returns>
protected abstract Task OnCloseAsync();
string FormatTraceDetails()

Просмотреть файл

@ -12,7 +12,7 @@ namespace Microsoft.Azure.EventHubs
/// if you do not care about sending events to specific partitions, instead use <see cref="EventHubClient.SendAsync(EventData)"/>.
/// </summary>
/// <seealso cref="EventHubClient.CreatePartitionSender(string)"/>
/// <seealso cref="EventHubClient.Create(string)"/>
/// <seealso cref="EventHubClient.CreateFromConnectionString(string)"/>
public sealed class PartitionSender : ClientEntity
{
internal PartitionSender(EventHubClient eventHubClient, string partitionId)
@ -24,8 +24,14 @@ namespace Microsoft.Azure.EventHubs
EventHubsEventSource.Log.ClientCreated(this.ClientId, null);
}
/// <summary>
/// Gets the <see cref="EventHubClient"/> associated with this PartitionSender.
/// </summary>
public EventHubClient EventHubClient { get; }
/// <summary>
/// Gets the partition ID for this <see cref="PartitionSender"/>.
/// </summary>
public string PartitionId { get; }
EventDataSender InnerSender { get; }
@ -43,9 +49,9 @@ namespace Microsoft.Azure.EventHubs
/// <para>a. The client wants to take direct control of distribution of data across partitions. In this case client is responsible for making sure there is at least one sender per event hub partition.</para>
/// <para>b. User cannot use partition key as a mean to direct events to specific partition, yet there is a need for data correlation with partitioning scheme.</para>
/// </summary>
/// <param name="data">the <see cref="EventData"/> to be sent.</param>
/// <param name="eventData">the <see cref="EventData"/> to be sent.</param>
/// <returns>A Task that completes when the send operations is done.</returns>
/// <exception cref="PayloadSizeExceedeedException">the total size of the <see cref="EventData"/> exceeds a pre-defined limit set by the service. Default is 256k bytes.</exception>
/// <exception cref="MessageSizeExceededException">the total size of the <see cref="EventData"/> exceeds a pre-defined limit set by the service. Default is 256k bytes.</exception>
/// <exception cref="EventHubsException">Event Hubs service encountered problems during the operation.</exception>
public Task SendAsync(EventData eventData)
{
@ -94,7 +100,7 @@ namespace Microsoft.Azure.EventHubs
/// </example>
/// <param name="eventDatas">batch of events to send to EventHub</param>
/// <returns>a Task that completes when the send operation is done.</returns>
/// <exception cref="PayloadSizeExceededException">the total size of the <see cref="EventData"/> exceeds a pre-defined limit set by the service. Default is 256k bytes.</exception>
/// <exception cref="MessageSizeExceededException">the total size of the <see cref="EventData"/> exceeds a pre-defined limit set by the service. Default is 256k bytes.</exception>
/// <exception cref="EventHubsException">Event Hubs service encountered problems during the operation.</exception>
public async Task SendAsync(IEnumerable<EventData> eventDatas)
{
@ -120,6 +126,10 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// Closes and releases resources for the <see cref="PartitionSender"/>.
/// </summary>
/// <returns>An asynchronous operation</returns>
public override async Task CloseAsync()
{
EventHubsEventSource.Log.ClientCloseStart(this.ClientId);

Просмотреть файл

@ -8,23 +8,39 @@ namespace Microsoft.Azure.EventHubs
using System.Threading.Tasks;
// Code based on http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx
/// <summary>
/// Used as an asynchronous semaphore for internal Event Hubs operations.
/// </summary>
public class AsyncLock : IDisposable
{
readonly SemaphoreSlim asyncSemaphore;
readonly Task<LockRelease> lockRelease;
bool disposed = false;
/// <summary>
/// Returns a new AsyncLock.
/// </summary>
public AsyncLock()
{
asyncSemaphore = new SemaphoreSlim(1);
lockRelease = Task.FromResult(new LockRelease(this));
}
/// <summary>
/// Sets a lock.
/// </summary>
/// <returns>An asynchronous operation</returns>
public Task<LockRelease> LockAsync()
{
return this.LockAsync(CancellationToken.None);
}
/// <summary>
/// Sets a lock, which allows for cancellation, using a <see cref="CancellationToken"/>.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> which can be used to cancel the lock</param>
/// <returns>An asynchronous operation</returns>
public Task<LockRelease> LockAsync(CancellationToken cancellationToken)
{
var waitTask = asyncSemaphore.WaitAsync(cancellationToken);
@ -42,6 +58,9 @@ namespace Microsoft.Azure.EventHubs
TaskScheduler.Default);
}
/// <summary>
/// Closes and releases any resources associated with the AsyncLock.
/// </summary>
public void Dispose()
{
this.Dispose(true);
@ -61,6 +80,9 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// Used coordinate lock releases.
/// </summary>
public struct LockRelease : IDisposable
{
readonly AsyncLock asyncLockRelease;
@ -70,6 +92,10 @@ namespace Microsoft.Azure.EventHubs
this.asyncLockRelease = release;
}
/// <summary>
/// Closes and releases resources associated with <see cref="LockRelease"/>.
/// </summary>
/// <returns>An asynchronous operation</returns>
public void Dispose()
{
asyncLockRelease?.asyncSemaphore.Release();

Просмотреть файл

@ -14,25 +14,42 @@ namespace Microsoft.Azure.EventHubs
{
static int nextId;
/// <summary></summary>
/// <param name="clientId"></param>
protected ClientEntity(string clientId)
{
this.ClientId = clientId;
}
/// <summary>
/// Gets the client ID.
/// </summary>
public string ClientId
{
get; private set;
}
/// <summary>
/// Gets the <see cref="RetryPolicy.RetryPolicy"/> for the ClientEntity.
/// </summary>
public RetryPolicy RetryPolicy { get; set; }
/// <summary>
/// Closes the ClientEntity.
/// </summary>
/// <returns>The asynchronous operation</returns>
public abstract Task CloseAsync();
/// <summary>
/// Closes the ClientEntity.
/// </summary>
public void Close()
{
this.CloseAsync().GetAwaiter().GetResult();
}
/// <summary></summary>
/// <returns></returns>
protected static long GetNextId()
{
return Interlocked.Increment(ref nextId);

Просмотреть файл

@ -10,11 +10,16 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public class EventHubsCommunicationException : EventHubsException
{
/// <summary></summary>
/// <param name="message"></param>
protected internal EventHubsCommunicationException(string message)
: this(message, null)
{
}
/// <summary></summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
protected internal EventHubsCommunicationException(string message, Exception innerException)
: base(true, message, innerException)
{

Просмотреть файл

@ -117,6 +117,9 @@ namespace Microsoft.Azure.EventHubs
this.ParseConnectionString(connectionString);
}
/// <summary>
/// Gets or sets the Event Hubs endpoint.
/// </summary>
public Uri Endpoint { get; set; }
/// <summary>
@ -140,6 +143,10 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public TimeSpan OperationTimeout { get; set; }
/// <summary>
/// Creates a cloned object of the current <see cref="EventHubsConnectionStringBuilder"/>.
/// </summary>
/// <returns>A new <see cref="EventHubsConnectionStringBuilder"/></returns>
public EventHubsConnectionStringBuilder Clone()
{
var clone = new EventHubsConnectionStringBuilder(this.ToString());

Просмотреть файл

@ -10,29 +10,52 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public class EventHubsException : Exception
{
/// <summary>
/// Returns a new EventHubsException
/// </summary>
/// <param name="isTransient">Specifies whether or not the exception is transient.</param>
public EventHubsException(bool isTransient)
{
this.IsTransient = isTransient;
}
/// <summary>
/// Returns a new EventHubsException
/// </summary>
/// <param name="isTransient">Specifies whether or not the exception is transient.</param>
/// <param name="message">The detailed message exception.</param>
public EventHubsException(bool isTransient, string message)
: base(message)
{
this.IsTransient = isTransient;
}
/// <summary>
/// Returns a new EventHubsException
/// </summary>
/// <param name="isTransient">Specifies whether or not the exception is transient.</param>
/// <param name="innerException">The inner exception.</param>
public EventHubsException(bool isTransient, Exception innerException)
: base(innerException.Message, innerException)
{
this.IsTransient = isTransient;
}
/// <summary>
/// Returns a new EventHubsException
/// </summary>
/// <param name="isTransient">Specifies whether or not the exception is transient.</param>
/// <param name="message">The detailed message exception.</param>
/// <param name="innerException">The inner exception.</param>
public EventHubsException(bool isTransient, string message, Exception innerException)
: base(message, innerException)
{
this.IsTransient = isTransient;
}
/// <summary>
/// Gets the message as a formatted string.
/// </summary>
public override string Message
{
get
@ -53,6 +76,9 @@ namespace Microsoft.Azure.EventHubs
/// <value>returns true when user can retry the operation that generated the exception without additional intervention.</value>
public bool IsTransient { get; }
/// <summary>
/// Gets the Event Hubs namespace from which the exception occured, if available.
/// </summary>
public string EventHubsNamespace { get; internal set; }
}
}

Просмотреть файл

@ -10,7 +10,7 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public sealed class MessageSizeExceededException : EventHubsException
{
public MessageSizeExceededException(string message)
internal MessageSizeExceededException(string message)
: this(message, null)
{
}

Просмотреть файл

@ -3,9 +3,12 @@
namespace Microsoft.Azure.EventHubs
{
/// <summary>
/// The exception that is thrown when the Event Hub is not found on the namespace.
/// </summary>
public sealed class MessagingEntityNotFoundException : EventHubsException
{
public MessagingEntityNotFoundException(string message)
internal MessagingEntityNotFoundException(string message)
: base(false, message, null)
{
}

Просмотреть файл

@ -17,6 +17,12 @@ namespace Microsoft.Azure.EventHubs
readonly int maximumRetryCount;
readonly double retryFactor;
/// <summary>
/// Returns a new RetryExponential retry policy object.
/// </summary>
/// <param name="minimumBackoff">Minimum backoff interval.</param>
/// <param name="maximumBackoff">Maximum backoff interval.</param>
/// <param name="maximumRetryCount">Maximum retry count.</param>
public RetryExponential(TimeSpan minimumBackoff, TimeSpan maximumBackoff, int maximumRetryCount)
{
TimeoutHelper.ThrowIfNegativeArgument(minimumBackoff, nameof(minimumBackoff));
@ -28,6 +34,12 @@ namespace Microsoft.Azure.EventHubs
this.retryFactor = this.ComputeRetryFactor();
}
/// <summary></summary>
/// <param name="clientId"></param>
/// <param name="lastException"></param>
/// <param name="remainingTime"></param>
/// <param name="baseWaitTimeSecs"></param>
/// <returns></returns>
protected override TimeSpan? OnGetNextRetryInterval(string clientId, Exception lastException, TimeSpan remainingTime, int baseWaitTimeSecs)
{
int currentRetryCount = this.GetRetryCount(clientId);

Просмотреть файл

@ -6,6 +6,10 @@ namespace Microsoft.Azure.EventHubs
using System;
using System.Collections.Concurrent;
/// <summary>
/// Represents an abstraction for retrying messaging operations. Users should not
/// implement this class, and instead should use one of the provided implementations.
/// </summary>
public abstract class RetryPolicy
{
const int DefaultRetryMaxCount = 10;
@ -18,12 +22,17 @@ namespace Microsoft.Azure.EventHubs
ConcurrentDictionary<String, int> retryCounts;
object serverBusySync;
/// <summary></summary>
protected RetryPolicy()
{
this.retryCounts = new ConcurrentDictionary<string, int>();
this.serverBusySync = new Object();
}
/// <summary>
/// Increases the retry count.
/// </summary>
/// <param name="clientId">The <see cref="ClientEntity.ClientId"/> associated with the operation to retry</param>
public void IncrementRetryCount(string clientId)
{
int retryCount;
@ -31,12 +40,21 @@ namespace Microsoft.Azure.EventHubs
this.retryCounts[clientId] = retryCount + 1;
}
/// <summary>
/// Resets the retry count to zero.
/// </summary>
/// <param name="clientId">The <see cref="ClientEntity.ClientId"/> associated with the operation to retry</param>
public void ResetRetryCount(string clientId)
{
int currentRetryCount;
this.retryCounts.TryRemove(clientId, out currentRetryCount);
}
/// <summary>
/// Determines whether or not the exception can be retried.
/// </summary>
/// <param name="exception"></param>
/// <returns>A bool indicating whether or not the operation can be retried.</returns>
public static bool IsRetryableException(Exception exception)
{
if (exception == null)
@ -56,6 +74,9 @@ namespace Microsoft.Azure.EventHubs
return false;
}
/// <summary>
/// Returns the default retry policy, <see cref="RetryExponential"/>.
/// </summary>
public static RetryPolicy Default
{
get
@ -64,6 +85,9 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary>
/// Returns the default retry policy, <see cref="NoRetry"/>.
/// </summary>
public static RetryPolicy NoRetry
{
get
@ -72,6 +96,9 @@ namespace Microsoft.Azure.EventHubs
}
}
/// <summary></summary>
/// <param name="clientId"></param>
/// <returns></returns>
protected int GetRetryCount(string clientId)
{
int retryCount;
@ -81,8 +108,21 @@ namespace Microsoft.Azure.EventHubs
return retryCount;
}
/// <summary></summary>
/// <param name="clientId"></param>
/// <param name="lastException"></param>
/// <param name="remainingTime"></param>
/// <param name="baseWaitTime"></param>
/// <returns></returns>
protected abstract TimeSpan? OnGetNextRetryInterval(String clientId, Exception lastException, TimeSpan remainingTime, int baseWaitTime);
/// <summary>
/// Gets the timespan for the next retry operation.
/// </summary>
/// <param name="clientId">The <see cref="ClientEntity.ClientId"/> associated with the operation to retry</param>
/// <param name="lastException">The last exception that was thrown</param>
/// <param name="remainingTime">Remaining time for the cumulative timeout</param>
/// <returns></returns>
public TimeSpan? GetNextRetryInterval(string clientId, Exception lastException, TimeSpan remainingTime)
{
int baseWaitTime = 0;
@ -90,7 +130,7 @@ namespace Microsoft.Azure.EventHubs
{
if (lastException != null &&
(lastException is ServerBusyException || (lastException.InnerException != null && lastException.InnerException is ServerBusyException)))
{
{
baseWaitTime += ClientConstants.ServerBusyBaseSleepTimeInSecs;
}
}

Просмотреть файл

@ -9,13 +9,16 @@ namespace Microsoft.Azure.EventHubs
using System.Globalization;
using System.Net;
/// <summary>
/// Provides information about a security token such as audience, expiry time, and the string token value.
/// </summary>
public class SecurityToken
{
// per Simple Web Token draft specification
public const string TokenAudience = "Audience";
public const string TokenExpiresOn = "ExpiresOn";
public const string TokenIssuer = "Issuer";
public const string TokenDigest256 = "HMACSHA256";
private const string TokenAudience = "Audience";
private const string TokenExpiresOn = "ExpiresOn";
private const string TokenIssuer = "Issuer";
private const string TokenDigest256 = "HMACSHA256";
const string InternalExpiresOnFieldName = "ExpiresOn";
const string InternalAudienceFieldName = TokenAudience;
@ -27,6 +30,12 @@ namespace Microsoft.Azure.EventHubs
readonly DateTime expiresAtUtc;
readonly string audience;
/// <summary>
/// Creates a new instance of the <see cref="SecurityToken"/> class.
/// </summary>
/// <param name="tokenString">The token</param>
/// <param name="expiresAtUtc">The expiration time</param>
/// <param name="audience">The audience</param>
public SecurityToken(string tokenString, DateTime expiresAtUtc, string audience)
{
if (tokenString == null || audience == null)
@ -39,7 +48,11 @@ namespace Microsoft.Azure.EventHubs
this.audience = audience;
}
[SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors", Justification = "Existing public class, changes will be breaking. Current usage is safe.")]
/// <summary>
/// Creates a new instance of the <see cref="SecurityToken"/> class.
/// </summary>
/// <param name="tokenString">The token</param>
/// <param name="expiresAtUtc">The expiration time</param>
public SecurityToken(string tokenString, DateTime expiresAtUtc)
{
if (tokenString == null)
@ -52,8 +65,10 @@ namespace Microsoft.Azure.EventHubs
this.audience = GetAudienceFromToken(tokenString);
}
[SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors",
Justification = "Existing public class, changes will be breaking. Current usage is safe.")]
/// <summary>
/// Creates a new instance of the <see cref="SecurityToken"/> class.
/// </summary>
/// <param name="tokenString">The token</param>
public SecurityToken(string tokenString)
{
if (tokenString == null)
@ -65,18 +80,31 @@ namespace Microsoft.Azure.EventHubs
GetExpirationDateAndAudienceFromToken(tokenString, out this.expiresAtUtc, out this.audience);
}
/// <summary>
/// Gets the audience of this token.
/// </summary>
public string Audience => this.audience;
/// <summary>
/// Gets the expiration time of this token.
/// </summary>
public DateTime ExpiresAtUtc => this.expiresAtUtc;
/// <summary></summary>
protected virtual string ExpiresOnFieldName => InternalExpiresOnFieldName;
/// <summary></summary>
protected virtual string AudienceFieldName => InternalAudienceFieldName;
/// <summary></summary>
protected virtual string KeyValueSeparator => InternalKeyValueSeparator;
/// <summary></summary>
protected virtual string PairSeparator => InternalPairSeparator;
/// <summary>
/// Gets the actual token.
/// </summary>
public object TokenValue => this.token;
string GetAudienceFromToken(string token)

Просмотреть файл

@ -10,7 +10,7 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public sealed class ServerBusyException : EventHubsException
{
public ServerBusyException(string message)
internal ServerBusyException(string message)
: this(message, null)
{
}

Просмотреть файл

@ -17,6 +17,9 @@ namespace Microsoft.Azure.EventHubs
/// </summary>
public class SharedAccessSignatureTokenProvider : TokenProvider
{
/// <summary>
/// Represents 00:00:00 UTC Thursday 1, January 1970.
/// </summary>
public static readonly DateTime EpochTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
readonly byte[] encodedSharedAccessKey;
readonly string keyName;
@ -35,6 +38,12 @@ namespace Microsoft.Azure.EventHubs
{
}
/// <summary></summary>
/// <param name="keyName"></param>
/// <param name="sharedAccessKey"></param>
/// <param name="customKeyEncoder"></param>
/// <param name="tokenTimeToLive"></param>
/// <param name="tokenScope"></param>
protected SharedAccessSignatureTokenProvider(string keyName, string sharedAccessKey, Func<string, byte[]> customKeyEncoder, TimeSpan tokenTimeToLive, TokenScope tokenScope)
: base(tokenScope)
{
@ -69,6 +78,11 @@ namespace Microsoft.Azure.EventHubs
MessagingTokenProviderKeyEncoder(sharedAccessKey);
}
/// <summary></summary>
/// <param name="appliesTo"></param>
/// <param name="action"></param>
/// <param name="timeout"></param>
/// <returns></returns>
protected override Task<SecurityToken> OnGetTokenAsync(string appliesTo, string action, TimeSpan timeout)
{
string tokenString = this.BuildSignature(appliesTo);
@ -76,6 +90,9 @@ namespace Microsoft.Azure.EventHubs
return Task.FromResult<SecurityToken>(securityToken);
}
/// <summary></summary>
/// <param name="targetUri"></param>
/// <returns></returns>
protected virtual string BuildSignature(string targetUri)
{
return string.IsNullOrWhiteSpace(this.sharedAccessSignature)

Просмотреть файл

@ -16,19 +16,26 @@ namespace Microsoft.Azure.EventHubs
internal static readonly Func<string, byte[]> MessagingTokenProviderKeyEncoder = Encoding.UTF8.GetBytes;
const TokenScope DefaultTokenScope = TokenScope.Entity;
/// <summary></summary>
protected TokenProvider()
: this(TokenProvider.DefaultTokenScope)
{
}
/// <summary></summary>
/// <param name="tokenScope"></param>
protected TokenProvider(TokenScope tokenScope)
{
this.TokenScope = tokenScope;
this.ThisLock = new object();
}
/// <summary>
/// Gets the scope or permissions associated with the token.
/// </summary>
public TokenScope TokenScope { get; }
/// <summary></summary>
protected object ThisLock { get; }
/// <summary>
@ -42,7 +49,7 @@ namespace Microsoft.Azure.EventHubs
}
/// <summary>
/// Construct a TokenProvider based on the provided Key Name & Shared Access Key.
/// Construct a TokenProvider based on the provided Key Name and Shared Access Key.
/// </summary>
/// <param name="keyName">The key name of the corresponding SharedAccessKeyAuthorizationRule.</param>
/// <param name="sharedAccessKey">The key associated with the SharedAccessKeyAuthorizationRule</param>
@ -58,7 +65,7 @@ namespace Microsoft.Azure.EventHubs
//}
/// <summary>
/// Construct a TokenProvider based on the provided Key Name & Shared Access Key.
/// Construct a TokenProvider based on the provided Key Name and Shared Access Key.
/// </summary>
/// <param name="keyName">The key name of the corresponding SharedAccessKeyAuthorizationRule.</param>
/// <param name="sharedAccessKey">The key associated with the SharedAccessKeyAuthorizationRule</param>
@ -70,7 +77,7 @@ namespace Microsoft.Azure.EventHubs
}
/// <summary>
/// Construct a TokenProvider based on the provided Key Name & Shared Access Key.
/// Construct a TokenProvider based on the provided Key Name and Shared Access Key.
/// </summary>
/// <param name="keyName">The key name of the corresponding SharedAccessKeyAuthorizationRule.</param>
/// <param name="sharedAccessKey">The key associated with the SharedAccessKeyAuthorizationRule</param>
@ -82,7 +89,7 @@ namespace Microsoft.Azure.EventHubs
}
/// <summary>
/// Construct a TokenProvider based on the provided Key Name & Shared Access Key.
/// Construct a TokenProvider based on the provided Key Name and Shared Access Key.
/// </summary>
/// <param name="keyName">The key name of the corresponding SharedAccessKeyAuthorizationRule.</param>
/// <param name="sharedAccessKey">The key associated with the SharedAccessKeyAuthorizationRule</param>
@ -94,6 +101,13 @@ namespace Microsoft.Azure.EventHubs
return new SharedAccessSignatureTokenProvider(keyName, sharedAccessKey, tokenTimeToLive, tokenScope);
}
/// <summary>
/// Gets a <see cref="SecurityToken"/> for the given audience and duration.
/// </summary>
/// <param name="appliesTo">The URI which the access token applies to</param>
/// <param name="action">The request action</param>
/// <param name="timeout">The time span that specifies the timeout value for the message that gets the security token</param>
/// <returns></returns>
public Task<SecurityToken> GetTokenAsync(string appliesTo, string action, TimeSpan timeout)
{
TimeoutHelper.ThrowIfNegativeArgument(timeout);
@ -101,8 +115,16 @@ namespace Microsoft.Azure.EventHubs
return this.OnGetTokenAsync(appliesTo, action, timeout);
}
/// <summary></summary>
/// <param name="appliesTo"></param>
/// <param name="action"></param>
/// <param name="timeout"></param>
/// <returns></returns>
protected abstract Task<SecurityToken> OnGetTokenAsync(string appliesTo, string action, TimeSpan timeout);
/// <summary></summary>
/// <param name="appliesTo"></param>
/// <returns></returns>
protected virtual string NormalizeAppliesTo(string appliesTo)
{
return EventHubsUriHelper.NormalizeUri(appliesTo, "http", true, stripPath: this.TokenScope == TokenScope.Namespace, ensureTrailingSlash: true);

Просмотреть файл

@ -3,9 +3,19 @@
namespace Microsoft.Azure.EventHubs
{
/// <summary>
/// A enum representing the scope of the <see cref="SecurityToken"/>.
/// </summary>
public enum TokenScope
{
/// <summary>
/// The namespace.
/// </summary>
Namespace = 0,
/// <summary>
/// The entity.
/// </summary>
Entity = 1
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
@ -19,3 +20,11 @@ using System.Runtime.InteropServices;
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("126d946d-ce0f-4f14-9f13-8fd7098b81d8")]
// Friend Assemblies
[assembly: InternalsVisibleTo("Microsoft.Azure.EventHubs.Tests,PublicKey=" +
"0024000004800000940000000602000000240000525341310004000001000100fdf4acac3b2244" +
"dd8a96737e5385b31414369dc3e42f371172127252856a0650793e1f5673a16d5d78e2ac852a10" +
"4bc51e6f018dca44fdd26a219c27cb2b263956a80620223c8e9c2f8913c3c903e1e453e9e4e840" +
"98afdad5f4badb8c1ebe0a7b0a4b57a08454646a65886afe3e290a791ff3260099ce0edf0bdbcc" +
"afadfeb6")]

Просмотреть файл

@ -33,4 +33,8 @@
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>
</Project>