internal/postgres: move latest-version insertions last

Reorganize the code of saveModule so that all changes that assume
we have the latest version happen at the end.

Change-Id: I53b261f1fc24bd48001542b4f0e1d6bbc5f75d29
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/760099
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
This commit is contained in:
Jonathan Amsterdam 2020-06-02 07:45:36 -04:00
Родитель e66341ffa9
Коммит 079d64eb20
1 изменённых файлов: 48 добавлений и 36 удалений

Просмотреть файл

@ -74,6 +74,7 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) {
if experiment.IsActive(ctx, internal.ExperimentInsertSerializable) {
iso = sql.LevelSerializable
}
return db.db.Transact(ctx, iso, func(tx *database.DB) error {
moduleID, err := insertModule(ctx, tx, m)
if err != nil {
@ -83,11 +84,7 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) {
return err
}
isLatest, err := isLatestVersion(ctx, tx, m.ModulePath, m.Version)
if err != nil {
return err
}
if err := insertPackages(ctx, tx, m, isLatest); err != nil {
if err := insertPackages(ctx, tx, m); err != nil {
return err
}
if experiment.IsActive(ctx, internal.ExperimentInsertDirectories) {
@ -96,6 +93,20 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) {
}
}
// We only insert into imports_unique and search_documents if this is
// the latest version of the module.
isLatest, err := isLatestVersion(ctx, tx, m.ModulePath, m.Version)
if err != nil {
return err
}
if !isLatest {
return nil
}
if err := insertImportsUnique(ctx, tx, m); err != nil {
return err
}
// If there is a more recent version of this module that has an alternative
// module path, then do not insert its packages into search_documents. This
// happens when a module that initially does not have a go.mod file is
@ -125,11 +136,8 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) {
log.Infof(ctx, "%s@%s: not inserting into search documents", m.ModulePath, m.Version)
return err
}
if isLatest {
// Insert the module's packages into search_documents.
return UpsertSearchDocuments(ctx, tx, m)
}
return nil
// Insert the module's packages into search_documents.
return UpsertSearchDocuments(ctx, tx, m)
})
}
@ -211,21 +219,10 @@ func insertLicenses(ctx context.Context, db *database.DB, m *internal.Module, mo
return nil
}
func insertPackages(ctx context.Context, db *database.DB, m *internal.Module, isLatest bool) (err error) {
func insertPackages(ctx context.Context, db *database.DB, m *internal.Module) (err error) {
ctx, span := trace.StartSpan(ctx, "insertPackages")
defer span.End()
defer derrors.Wrap(&err, "insertPackages(ctx, %q, %q)", m.ModulePath, m.Version)
// We only insert into imports_unique if this is the latest
// version of the module.
if isLatest {
// Remove the previous rows for this module. We'll replace them with
// new ones below.
if _, err := db.Exec(ctx,
`DELETE FROM imports_unique WHERE from_module_path = $1`,
m.ModulePath); err != nil {
return err
}
}
// Sort to ensure proper lock ordering, avoiding deadlocks. See
// b/141164828#comment8. The only deadlocks we've actually seen are on
@ -242,7 +239,7 @@ func insertPackages(ctx context.Context, db *database.DB, m *internal.Module, is
for _, p := range m.Packages {
sort.Strings(p.Imports)
}
var pkgValues, importValues, importUniqueValues []interface{}
var pkgValues, importValues []interface{}
for _, p := range m.Packages {
if p.DocumentationHTML == internal.StringFieldMissing {
return errors.New("saveModule: package missing DocumentationHTML")
@ -280,9 +277,6 @@ func insertPackages(ctx context.Context, db *database.DB, m *internal.Module, is
)
for _, i := range p.Imports {
importValues = append(importValues, p.Path, m.ModulePath, m.Version, i)
if isLatest {
importUniqueValues = append(importUniqueValues, p.Path, m.ModulePath, i)
}
}
}
if len(pkgValues) > 0 {
@ -316,20 +310,38 @@ func insertPackages(ctx context.Context, db *database.DB, m *internal.Module, is
if err := db.BulkInsert(ctx, "imports", importCols, importValues, database.OnConflictDoNothing); err != nil {
return err
}
if len(importUniqueValues) > 0 {
importUniqueCols := []string{
"from_path",
"from_module_path",
"to_path",
}
if err := db.BulkInsert(ctx, "imports_unique", importUniqueCols, importUniqueValues, database.OnConflictDoNothing); err != nil {
return err
}
}
}
return nil
}
// insertImportsUnique inserts and removes rows from the imports_unique table. It should only
// be called if the given module's version is the latest.
func insertImportsUnique(ctx context.Context, tx *database.DB, m *internal.Module) (err error) {
ctx, span := trace.StartSpan(ctx, "insertImportsUnique")
defer span.End()
defer derrors.Wrap(&err, "insertImportsUnique(%q, %q)", m.ModulePath, m.Version)
// Remove the previous rows for this module. We'll replace them with
// new ones below.
if _, err := tx.Exec(ctx,
`DELETE FROM imports_unique WHERE from_module_path = $1`,
m.ModulePath); err != nil {
return err
}
var values []interface{}
for _, p := range m.Packages {
for _, i := range p.Imports {
values = append(values, p.Path, m.ModulePath, i)
}
}
if len(values) == 0 {
return nil
}
cols := []string{"from_path", "from_module_path", "to_path"}
return tx.BulkInsert(ctx, "imports_unique", cols, values, database.OnConflictDoNothing)
}
func insertDirectories(ctx context.Context, db *database.DB, m *internal.Module, moduleID int) (err error) {
defer derrors.Wrap(&err, "insertDirectories(ctx, tx, %q, %q)", m.ModulePath, m.Version)
ctx, span := trace.StartSpan(ctx, "insertDirectories")