internal/worker: remove fetch timeout and detached context

Now that we are using basic scaling, we can use a longer timeout for
all requests, including fetches. So the special fetch timeout, and the
detached context and tracing that went with it, are no longer necessary.

We still enforce a timeout on requests via middleware.Timeout, using a value
that comes from GO_DISCOVERY_WORKER_TIMEOUT_MINUTES.

Updates b/141406364.

Change-Id: I39266a3f08b1f7e2fea3fdbec05bf092ea974d9b
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/750119
Reviewed-by: Julie Qiu <julieqiu@google.com>
This commit is contained in:
Jonathan Amsterdam 2020-05-18 11:40:09 -04:00
Родитель ecc8a1931e
Коммит ed005bec92
2 изменённых файлов: 8 добавлений и 25 удалений

Просмотреть файл

@ -23,13 +23,8 @@ import (
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/xcontext"
)
// fetchTimeout bounds the time allowed for fetching a single module. It is
// mutable for testing purposes.
var fetchTimeout = 2 * config.StatementTimeout
const (
// Indicates that although we have a valid module, some packages could not be processed.
hasIncompletePackagesCode = 290
@ -65,7 +60,7 @@ func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion strin
dbErr := updateVersionMapAndDeleteModulesWithErrors(ctx, db, ft)
if dbErr != nil {
log.Error(ctx, dbErr)
ft.Error = err
ft.Error = dbErr
ft.Status = http.StatusInternalServerError
}
if !semver.IsValid(ft.ResolvedVersion) {
@ -102,8 +97,7 @@ func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion strin
// The given parentCtx is used for tracing, but fetches actually execute in a
// detached context with fixed timeout, so that fetches are allowed to complete
// even for short-lived requests.
func fetchAndInsertModule(parentCtx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) *fetchTask {
func fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) *fetchTask {
ft := &fetchTask{
FetchResult: fetch.FetchResult{
ModulePath: modulePath,
@ -120,12 +114,12 @@ func fetchAndInsertModule(parentCtx context.Context, modulePath, requestedVersio
}()
if ProxyRemoved[modulePath+"@"+requestedVersion] {
log.Infof(parentCtx, "not fetching %s@%s because it is on the ProxyRemoved list", modulePath, requestedVersion)
log.Infof(ctx, "not fetching %s@%s because it is on the ProxyRemoved list", modulePath, requestedVersion)
ft.Error = derrors.Excluded
return ft
}
exc, err := db.IsExcluded(parentCtx, modulePath)
exc, err := db.IsExcluded(ctx, modulePath)
if err != nil {
ft.Error = err
return ft
@ -135,15 +129,6 @@ func fetchAndInsertModule(parentCtx context.Context, modulePath, requestedVersio
return ft
}
parentSpan := trace.FromContext(parentCtx)
// A fixed timeout for FetchAndInsertModule to allow module processing to
// succeed even for extremely short lived requests.
ctx, cancel := context.WithTimeout(xcontext.Detach(parentCtx), fetchTimeout)
defer cancel()
ctx, span := trace.StartSpanWithRemoteParent(ctx, "worker.fetchAndInsertModule", parentSpan.SpanContext())
defer span.End()
start := time.Now()
fr := fetch.FetchModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient)
if fr == nil {

Просмотреть файл

@ -1077,11 +1077,6 @@ func TestFetchAndInsertModule(t *testing.T) {
func TestFetchAndInsertModuleTimeout(t *testing.T) {
defer postgres.ResetTestDB(testDB, t)
defer func(oldTimeout time.Duration) {
fetchTimeout = oldTimeout
}(fetchTimeout)
fetchTimeout = 0
proxyClient, teardownProxy := proxy.SetupTestProxy(t, nil)
defer teardownProxy()
sourceClient := source.NewClient(sourceTimeout)
@ -1089,7 +1084,10 @@ func TestFetchAndInsertModuleTimeout(t *testing.T) {
name := "my.mod/version"
version := "v1.0.0"
wantErrString := "deadline exceeded"
_, err := FetchAndUpdateState(context.Background(), name, version, proxyClient, sourceClient, testDB)
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
_, err := FetchAndUpdateState(ctx, name, version, proxyClient, sourceClient, testDB)
if err == nil || !strings.Contains(err.Error(), wantErrString) {
t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v) returned error %v, want error containing %q",
name, version, proxyClient, sourceClient, testDB, err, wantErrString)