Merge pull request #211 from catalinaperalta/master
get checkpoint from specified consumer group
This commit is contained in:
Коммит
ae5acc7f0f
|
@ -1,5 +1,8 @@
|
|||
# Change Log
|
||||
|
||||
## `v3.3.9`
|
||||
- update the checkpoint after the receiver options are applied
|
||||
|
||||
## `v3.3.8`
|
||||
- add option to customise initial checkpoint
|
||||
|
||||
|
|
16
receiver.go
16
receiver.go
|
@ -143,12 +143,6 @@ func (h *Hub) newReceiver(ctx context.Context, partitionID string, opts ...Recei
|
|||
partitionID: partitionID,
|
||||
}
|
||||
|
||||
// update checkpoint if old checkpoint is successfully read from e.g. file or memory
|
||||
oldCheckpoint, err := receiver.getLastReceivedCheckpoint()
|
||||
if err == nil {
|
||||
receiver.checkpoint = oldCheckpoint
|
||||
}
|
||||
|
||||
// apply options after fetching the persisted checkpoint in case the options
|
||||
// specify a custom checkpoint to start from. This allows the custom
|
||||
// checkpoint to override the stored one.
|
||||
|
@ -158,7 +152,15 @@ func (h *Hub) newReceiver(ctx context.Context, partitionID string, opts ...Recei
|
|||
}
|
||||
}
|
||||
|
||||
if err = receiver.storeLastReceivedCheckpoint(receiver.checkpoint); err != nil {
|
||||
// update checkpoint if no checkpoint is specified and if old checkpoint is successfully read from e.g. file or memory
|
||||
if receiver.checkpoint == (persist.Checkpoint{}) {
|
||||
oldCheckpoint, err := receiver.getLastReceivedCheckpoint()
|
||||
if err == nil {
|
||||
receiver.checkpoint = oldCheckpoint
|
||||
}
|
||||
}
|
||||
|
||||
if err := receiver.storeLastReceivedCheckpoint(receiver.checkpoint); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -2,5 +2,5 @@ package eventhub
|
|||
|
||||
const (
|
||||
// Version is the semantic version number
|
||||
Version = "3.3.7"
|
||||
Version = "3.3.9"
|
||||
)
|
||||
|
|
Загрузка…
Ссылка в новой задаче