Allow customisation of initial checkpoint

Document change in changelog
This commit is contained in:
Daniel Bradley 2021-04-22 11:37:16 +01:00
Родитель ce20d7ad53
Коммит f6b68fbaaa
2 изменённых файлов: 24 добавлений и 2 удалений

Просмотреть файл

@ -1,5 +1,8 @@
# Change Log
## `v3.3.8`
- add option to customise initial checkpoint
## `v3.3.7`
- add option to prefix checkpoint blob paths

Просмотреть файл

@ -62,6 +62,7 @@ type (
env azure.Environment
dirtyPartitions map[string]uuid.UUID
leasesMu sync.Mutex
getInitialCheckpoint func() persist.Checkpoint
done func()
}
@ -422,12 +423,12 @@ func (sl *LeaserCheckpointer) EnsureCheckpoint(ctx context.Context, partitionID
lease, ok := sl.leases[partitionID]
if ok {
if lease.Checkpoint == nil {
checkpoint := persist.NewCheckpointFromStartOfStream()
checkpoint := sl.newInitialCheckpoint()
lease.Checkpoint = &checkpoint
}
return *lease.Checkpoint, nil
}
return persist.NewCheckpointFromStartOfStream(), nil
return sl.newInitialCheckpoint(), nil
}
// UpdateCheckpoint will attempt to write the checkpoint to Azure Storage
@ -649,6 +650,16 @@ func (sl *LeaserCheckpointer) leaseFromResponse(res *azblob.DownloadResponse) (*
return &lease, nil
}
func (sl *LeaserCheckpointer) newInitialCheckpoint() persist.Checkpoint {
if sl.getInitialCheckpoint != nil {
initialCheckpoint := sl.getInitialCheckpoint()
if initialCheckpoint.Offset != "" {
return initialCheckpoint
}
}
return persist.NewCheckpointFromStartOfStream()
}
// IsExpired checks to see if the blob is not still leased
func (s *storageLease) IsExpired(ctx context.Context) bool {
span, ctx := startConsumerSpanFromContext(ctx, "storage.storageLease.IsExpired")
@ -686,3 +697,11 @@ func WithPrefixInBlobPath(prefix string) LeaserCheckpointerOption {
return nil
}
}
// WithInitialCheckpoint is a LeaserCheckpointerOption that overrides the initial checkpoint used when no checkpoint exists rather than starting from the start of the stream
func WithInitialCheckpoint(getInitialCheckpoint func() persist.Checkpoint) LeaserCheckpointerOption {
return func(ls *LeaserCheckpointer) error {
ls.getInitialCheckpoint = getInitialCheckpoint
return nil
}
}