internal/postgres: use poller to update excluded list

Change-Id: I7f2218b3eb035973d3f6bc00a8e5e6a1df2db571
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/261819
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
This commit is contained in:
Jonathan Amsterdam 2020-10-13 12:18:10 -04:00
Родитель cb9361e611
Коммит 40c2f64cf3
4 изменённых файлов: 41 добавлений и 55 удалений

Просмотреть файл

@ -8,9 +8,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"strings" "strings"
"sync"
"time"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log" "golang.org/x/pkgsite/internal/log"
) )
@ -19,13 +18,8 @@ import (
func (db *DB) IsExcluded(ctx context.Context, path string) (_ bool, err error) { func (db *DB) IsExcluded(ctx context.Context, path string) (_ bool, err error) {
defer derrors.Wrap(&err, "DB.IsExcluded(ctx, %q)", path) defer derrors.Wrap(&err, "DB.IsExcluded(ctx, %q)", path)
db.ensureExcludedPrefixes(ctx) eps := db.expoller.Current().([]string)
excludedPrefixes.mu.Lock() for _, prefix := range eps {
defer excludedPrefixes.mu.Unlock()
if excludedPrefixes.err != nil {
return false, excludedPrefixes.err
}
for _, prefix := range excludedPrefixes.prefixes {
if strings.HasPrefix(path, prefix) { if strings.HasPrefix(path, prefix) {
log.Infof(ctx, "path %q matched excluded prefix %q", path, prefix) log.Infof(ctx, "path %q matched excluded prefix %q", path, prefix)
return true, nil return true, nil
@ -44,53 +38,20 @@ func (db *DB) InsertExcludedPrefix(ctx context.Context, prefix, user, reason str
_, err = db.db.Exec(ctx, "INSERT INTO excluded_prefixes (prefix, created_by, reason) VALUES ($1, $2, $3)", _, err = db.db.Exec(ctx, "INSERT INTO excluded_prefixes (prefix, created_by, reason) VALUES ($1, $2, $3)",
prefix, user, reason) prefix, user, reason)
if err != nil { if err == nil {
// Arrange to re-read the excluded_prefixes table on the next call to IsExcluded. db.expoller.Poll(ctx)
setExcludedPrefixesLastFetched(time.Time{})
} }
return err return err
} }
// In-memory copy of excluded_prefixes.
var excludedPrefixes struct {
mu sync.Mutex
prefixes []string
err error
lastFetched time.Time
}
func setExcludedPrefixesLastFetched(t time.Time) {
excludedPrefixes.mu.Lock()
excludedPrefixes.lastFetched = t
excludedPrefixes.mu.Unlock()
}
const excludedPrefixesExpiration = time.Minute
// ensureExcludedPrefixes makes sure the in-memory copy of the
// excluded_prefixes table is up to date.
func (db *DB) ensureExcludedPrefixes(ctx context.Context) {
excludedPrefixes.mu.Lock()
lastFetched := excludedPrefixes.lastFetched
excludedPrefixes.mu.Unlock()
if time.Since(lastFetched) < excludedPrefixesExpiration {
return
}
prefixes, err := db.GetExcludedPrefixes(ctx)
excludedPrefixes.mu.Lock()
defer excludedPrefixes.mu.Unlock()
excludedPrefixes.lastFetched = time.Now()
excludedPrefixes.prefixes = prefixes
excludedPrefixes.err = err
if err != nil {
log.Errorf(ctx, "reading excluded_prefixes: %v", err)
}
}
// GetExcludedPrefixes reads all the excluded prefixes from the database. // GetExcludedPrefixes reads all the excluded prefixes from the database.
func (db *DB) GetExcludedPrefixes(ctx context.Context) ([]string, error) { func (db *DB) GetExcludedPrefixes(ctx context.Context) ([]string, error) {
return getExcludedPrefixes(ctx, db.db)
}
func getExcludedPrefixes(ctx context.Context, db *database.DB) ([]string, error) {
var eps []string var eps []string
err := db.db.RunQuery(ctx, `SELECT prefix FROM excluded_prefixes`, func(rows *sql.Rows) error { err := db.RunQuery(ctx, `SELECT prefix FROM excluded_prefixes`, func(rows *sql.Rows) error {
var ep string var ep string
if err := rows.Scan(&ep); err != nil { if err := rows.Scan(&ep); err != nil {
return err return err

Просмотреть файл

@ -14,10 +14,9 @@ func TestIsExcluded(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout) ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel() defer cancel()
if _, err := testDB.db.Exec(ctx, "INSERT INTO excluded_prefixes (prefix, created_by, reason) VALUES ('bad', 'someone', 'because')"); err != nil { if err := testDB.InsertExcludedPrefix(ctx, "bad", "someone", "because"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
for _, test := range []struct { for _, test := range []struct {
path string path string
want bool want bool

Просмотреть файл

@ -7,28 +7,55 @@
package postgres package postgres
import ( import (
"context"
"time"
"golang.org/x/pkgsite/internal/database" "golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/poller"
) )
type DB struct { type DB struct {
db *database.DB db *database.DB
bypassLicenseCheck bool bypassLicenseCheck bool
expoller *poller.Poller
cancel func()
} }
// New returns a new postgres DB. // New returns a new postgres DB.
func New(db *database.DB) *DB { func New(db *database.DB) *DB {
return &DB{db, false} return newdb(db, false)
} }
// NewBypassingLicenseCheck returns a new postgres DB that bypasses license // NewBypassingLicenseCheck returns a new postgres DB that bypasses license
// checks. That means all data will be inserted and returned for // checks. That means all data will be inserted and returned for
// non-redistributable modules, packages and directories. // non-redistributable modules, packages and directories.
func NewBypassingLicenseCheck(db *database.DB) *DB { func NewBypassingLicenseCheck(db *database.DB) *DB {
return &DB{db, true} return newdb(db, true)
}
func newdb(db *database.DB, bypass bool) *DB {
p := poller.New(
[]string(nil),
func(ctx context.Context) (interface{}, error) {
return getExcludedPrefixes(ctx, db)
},
func(err error) {
log.Errorf(context.Background(), "getting excluded prefixes: %v", err)
})
ctx, cancel := context.WithCancel(context.Background())
p.Start(ctx, time.Minute)
return &DB{
db: db,
bypassLicenseCheck: bypass,
expoller: p,
cancel: cancel,
}
} }
// Close closes a DB. // Close closes a DB.
func (db *DB) Close() error { func (db *DB) Close() error {
db.cancel()
return db.db.Close() return db.db.Close()
} }

Просмотреть файл

@ -13,7 +13,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4"
"golang.org/x/pkgsite/internal/database" "golang.org/x/pkgsite/internal/database"
@ -115,11 +114,11 @@ func ResetTestDB(db *DB, t *testing.T) {
if _, err := tx.Exec(ctx, `TRUNCATE excluded_prefixes;`); err != nil { if _, err := tx.Exec(ctx, `TRUNCATE excluded_prefixes;`); err != nil {
return err return err
} }
setExcludedPrefixesLastFetched(time.Time{})
return nil return nil
}); err != nil { }); err != nil {
t.Fatalf("error resetting test DB: %v", err) t.Fatalf("error resetting test DB: %v", err)
} }
db.expoller.Poll(ctx) // clear excluded prefixes
} }
// RunDBTests is a wrapper that runs the given testing suite in a test database // RunDBTests is a wrapper that runs the given testing suite in a test database