Fixing issue with a lease existing on start (#277)

Storage failures don't return a Response, and require us to do an errors.As() and check the returned error instead. This checks for the two codes that can come back if the blob already exists (409) or if the blob exists _and_ it has an active storage lease (412).

Also, fixed a race condition in the LeaseReceiver that was causing Storage/TestMultiple() to fail.

Fixes #276
This commit is contained in:
Richard Park 2022-11-08 15:47:40 -08:00 коммит произвёл GitHub
Родитель adc9788f66
Коммит d9a88501ef
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 44 добавлений и 13 удалений

Просмотреть файл

@ -27,6 +27,7 @@ import (
"errors"
"fmt"
"math/rand"
"sync/atomic"
"time"
"github.com/devigned/tab"
@ -38,15 +39,18 @@ type (
leasedReceiver struct {
handle *eventhub.ListenerHandle
processor *EventProcessorHost
lease LeaseMarker
lease *atomic.Value // LeaseMarker
done func()
}
)
func newLeasedReceiver(processor *EventProcessorHost, lease LeaseMarker) *leasedReceiver {
leaseValue := atomic.Value{}
leaseValue.Store(lease)
return &leasedReceiver{
processor: processor,
lease: lease,
lease: &leaseValue,
}
}
@ -54,8 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error {
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.Run")
defer span.End()
partitionID := lr.lease.GetPartitionID()
epoch := lr.lease.GetEpoch()
lease := lr.getLease()
partitionID := lease.GetPartitionID()
epoch := lease.GetEpoch()
lr.dlog(ctx, "running...")
renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background())
@ -99,7 +105,9 @@ func (lr *leasedReceiver) listenForClose() {
defer cancel()
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.listenForClose")
defer span.End()
err := lr.processor.scheduler.stopReceiver(ctx, lr.lease)
lease := lr.getLease()
err := lr.processor.scheduler.stopReceiver(ctx, lease)
if err != nil {
tab.For(ctx).Error(err)
}
@ -120,7 +128,8 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) {
err := lr.tryRenew(ctx)
if err != nil {
tab.For(ctx).Error(err)
_ = lr.processor.scheduler.stopReceiver(ctx, lr.lease)
lease := lr.getLease()
_ = lr.processor.scheduler.stopReceiver(ctx, lease)
}
}
}
@ -130,7 +139,8 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.tryRenew")
defer span.End()
lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID())
oldLease := lr.getLease()
lease, ok, err := lr.processor.leaser.RenewLease(ctx, oldLease.GetPartitionID())
if err != nil {
tab.For(ctx).Error(err)
return err
@ -141,23 +151,33 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error {
return err
}
lr.dlog(ctx, "lease renewed")
lr.lease = lease
lr.lease.Store(lease)
return nil
}
func (lr *leasedReceiver) dlog(ctx context.Context, msg string) {
name := lr.processor.name
partitionID := lr.lease.GetPartitionID()
epoch := lr.lease.GetEpoch()
lease := lr.getLease()
partitionID := lease.GetPartitionID()
epoch := lease.GetEpoch()
tab.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch))
}
func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) {
span, ctx := startConsumerSpanFromContext(ctx, operationName)
lease := lr.getLease()
span.AddAttributes(
tab.StringAttribute("eph.id", lr.processor.name),
tab.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lr.lease.GetEpoch()),
tab.StringAttribute(partitionIDTag, lease.GetPartitionID()),
tab.Int64Attribute(epochTag, lease.GetEpoch()),
)
return span, ctx
}
func (lr *leasedReceiver) getLease() LeaseMarker {
return lr.lease.Load().(LeaseMarker)
}

Просмотреть файл

@ -190,7 +190,8 @@ func (s *scheduler) Stop(ctx context.Context) error {
if err := lr.Close(ctx); err != nil {
lastErr = err
}
_, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID())
_, _ = s.processor.leaser.ReleaseLease(ctx, lr.getLease().GetPartitionID())
}
return lastErr

Просмотреть файл

@ -630,6 +630,12 @@ func (sl *LeaserCheckpointer) createOrGetLease(ctx context.Context, partitionID
})
if err != nil {
if storageErr := azblobvendor.StorageError(nil); errors.As(err, &storageErr) &&
(storageErr.Response().StatusCode == http.StatusConflict || // blob exists
storageErr.Response().StatusCode == http.StatusPreconditionFailed) { // blob exists AND an Azure storage lease is active
return sl.getLease(ctx, partitionID)
}
return nil, err
}

Просмотреть файл

@ -105,6 +105,10 @@ func (ts *testSuite) TestLeaserLeaseEnsure() {
lease, err := leaser.EnsureLease(ctx, partitionID)
ts.NoError(err)
ts.Equal(partitionID, lease.GetPartitionID())
lease, err = leaser.EnsureLease(ctx, partitionID)
ts.NoError(err)
ts.Equal(partitionID, lease.GetPartitionID())
}
}