Minor cleanups after reviewing the code in this branch

* When `skeema` exits, gracefully close all connection pools, to avoid aborted
  connection counter/logging in some versions of MySQL

* `skeema diff`: If the only differences for a dir are schema-level DDL, the
  exit code now reflects this as a difference

* applier.TargetGroupChanForDir: skipCount return value is no longer a pointer

* cmd_init.go: Remove unnecessary createOptionFile() function

* cmd_pull.go: Track skipCount by return value, rather than a pointer arg
This commit is contained in:
Evan Elias 2018-10-29 16:13:47 -04:00
Родитель 2ba95f900a
Коммит 135162a674
13 изменённых файлов: 82 добавлений и 56 удалений

2
Gopkg.lock сгенерированный
Просмотреть файл

@ -152,7 +152,7 @@
branch = "master"
name = "github.com/skeema/tengo"
packages = ["."]
revision = "316d6488ea3f4b828aeec2ca24c575edee026e20"
revision = "43fee38eac82c8a682df9c76783a7d77e4e07e63"
[[projects]]
branch = "master"

Просмотреть файл

@ -47,6 +47,7 @@ func Worker(ctx context.Context, targetGroups <-chan TargetGroup, results chan<-
if diff.SchemaDDL != "" {
printer.syncPrintf(t.Instance, "", "%s;\n", diff.SchemaDDL)
targetStmtCount++
result.Differences = true
if !dryRun {
var schemaDDLErr error
if strings.HasPrefix(diff.SchemaDDL, "CREATE DATABASE") && t.SchemaFromInstance == nil {

Просмотреть файл

@ -177,14 +177,12 @@ func targetsForIdealSchema(idealSchema *fs.IdealSchema, dir *fs.Dir, instances [
}
// TargetGroupChanForDir returns a channel for obtaining TargetGroups for this
// dir and its subdirs, and a pointer to an int that will contain the count of
// skipped directories once the channel is closed.
func TargetGroupChanForDir(dir *fs.Dir) (<-chan TargetGroup, *int) {
// dir and its subdirs, and count of directories that were skipped due to non-
// fatal errors.
func TargetGroupChanForDir(dir *fs.Dir) (<-chan TargetGroup, int) {
targets, skipCount := TargetsForDir(dir, 5)
groups := make(chan TargetGroup)
var skipCount int
go func() {
var targets []*Target
targets, skipCount = TargetsForDir(dir, 5)
byInst := make(map[string]TargetGroup)
for _, t := range targets {
key := t.Instance.String()
@ -195,5 +193,5 @@ func TargetGroupChanForDir(dir *fs.Dir) (<-chan TargetGroup, *int) {
}
close(groups)
}()
return groups, &skipCount
return groups, skipCount
}

Просмотреть файл

@ -187,7 +187,10 @@ func (s ApplierIntegrationSuite) TestTargetGroupChanForDir(t *testing.T) {
// Parent dir maps to 2 instances, and schema dir maps to 2 schemas, so expect
// 4 targets split into 2 groups (by instance)
dir := getDir(t, "../testdata/applier/multi", "")
tgchan, skipCountPtr := TargetGroupChanForDir(dir)
tgchan, skipCount := TargetGroupChanForDir(dir)
if skipCount != 0 {
t.Errorf("Expected skip count of 0, instead found %d", skipCount)
}
seen := make(map[string]bool, 2)
for tg := range tgchan {
if len(tg) != 2 || tg[0].Instance != tg[1].Instance {
@ -203,16 +206,16 @@ func (s ApplierIntegrationSuite) TestTargetGroupChanForDir(t *testing.T) {
if len(seen) != 2 {
t.Errorf("Expected to see 2 target groups, instead found %d", len(seen))
}
if *skipCountPtr != 0 {
t.Errorf("Expected skip count of 0, instead found %d", *skipCountPtr)
}
// SQL syntax error in testdata/applier/sqlerror/one/bad.sql should cause one/
// dir to be skipped entirely for both hosts, so skipCount of 2. But other
// dir two/ has no errors and should successfully yield 2 targets (1 per host,
// and put into different targetgroups)
dir = getDir(t, "../testdata/applier/sqlerror", "")
tgchan, skipCountPtr = TargetGroupChanForDir(dir)
tgchan, skipCount = TargetGroupChanForDir(dir)
if skipCount != 2 {
t.Errorf("Expected skip count of 2, instead found %d", skipCount)
}
seen = make(map[string]bool, 2)
for tg := range tgchan {
if len(tg) != 1 {
@ -228,9 +231,6 @@ func (s ApplierIntegrationSuite) TestTargetGroupChanForDir(t *testing.T) {
if len(seen) != 2 {
t.Errorf("Expected to see 2 target groups, instead found %d", len(seen))
}
if *skipCountPtr != 2 {
t.Errorf("Expected skip count of 2, instead found %d", *skipCountPtr)
}
}
func getBaseConfig(t *testing.T, cliFlags string) *mybase.Config {

Просмотреть файл

@ -129,8 +129,8 @@ func InitHandler(cfg *mybase.Config) error {
}
// Write the option file
if err := createOptionFile(hostDir.Path, hostOptionFile); err != nil {
return NewExitValue(CodeCantCreate, err.Error())
if err := hostOptionFile.Write(false); err != nil {
return NewExitValue(CodeCantCreate, "Unable to use directory %s: Unable to write to %s: %s", hostDir.Path, hostOptionFile.Path(), err)
}
verb := "Using"
@ -182,12 +182,12 @@ func PopulateSchemaDir(s *tengo.Schema, parentDir *fs.Dir, makeSubdir bool) erro
// Put a .skeema file with the schema name in it. This is placed outside of
// any named section/environment since the default assumption is that schema
// names match between environments.
optionFile := mybase.NewFile(".skeema")
optionFile := mybase.NewFile(subPath, ".skeema")
optionFile.SetOptionValue("", "schema", s.Name)
optionFile.SetOptionValue("", "default-character-set", s.CharSet)
optionFile.SetOptionValue("", "default-collation", s.Collation)
if err := createOptionFile(subPath, optionFile); err != nil {
return NewExitValue(CodeCantCreate, "Unable to use directory %s for schema %s: %s", subPath, s.Name, err)
if err := optionFile.Write(false); err != nil {
return NewExitValue(CodeCantCreate, "Unable to use directory %s for schema %s: Unable to write to %s: %s", subPath, s.Name, optionFile.Path(), err)
}
} else {
subPath = parentDir.Path
@ -269,12 +269,3 @@ func preparePath(dirPath string, globalConfig *mybase.Config) (created bool, err
return false, nil
}
func createOptionFile(dirPath string, file *mybase.File) error {
file.Dir = dirPath
err := file.Write(false)
if err == nil {
return nil
}
return fmt.Errorf("Unable to write to %s: %s", file.Path(), err)
}

Просмотреть файл

@ -43,30 +43,31 @@ func PullHandler(cfg *mybase.Config) error {
return err
}
var errCount int
if _, err = pullWalker(dir, &errCount, 5); err != nil {
var skipCount int
if _, skipCount, err = pullWalker(dir, 5); err != nil {
return err
}
if errCount == 0 {
if skipCount == 0 {
return nil
}
var plural string
if errCount > 1 {
if skipCount > 1 {
plural = "s"
}
return NewExitValue(CodePartialError, "Skipped %d operation%s due to error%s", errCount, plural, plural)
return NewExitValue(CodePartialError, "Skipped %d operation%s due to error%s", skipCount, plural, plural)
}
// pullWalker processes dir, and recursively calls itself on any subdirs. An
// error is only returned if something fatal occurs; otherwise, *errCount is
// incremented if some operation is skipped but it isn't fatal.
func pullWalker(dir *fs.Dir, errCount *int, maxDepth int) (handledSchemaNames []string, err error) {
// error is only returned if something fatal occurs. skipCount reflects the
// number of non-fatal failed operations that were skipped for dir and its
// subdirectories.
func pullWalker(dir *fs.Dir, maxDepth int) (handledSchemaNames []string, skipCount int, err error) {
var instance *tengo.Instance
if dir.Config.Changed("host") {
instance, err = dir.FirstInstance()
if err != nil {
*errCount++
return nil, nil
log.Warnf("Skipping %s: %s", dir, err)
return nil, 1, nil
}
}
@ -83,7 +84,7 @@ func pullWalker(dir *fs.Dir, errCount *int, maxDepth int) (handledSchemaNames []
var schemaNames []string
if schemaNames, err = dir.SchemaNames(instance); err != nil {
return nil, fmt.Errorf("%s: Unable to fetch schema names mapped by this dir: %s", dir, err)
return nil, skipCount, fmt.Errorf("%s: Unable to fetch schema names mapped by this dir: %s", dir, err)
}
if len(schemaNames) == 0 {
log.Warnf("Ignoring directory %s -- did not map to any schema names\n", dir)
@ -94,38 +95,39 @@ func pullWalker(dir *fs.Dir, errCount *int, maxDepth int) (handledSchemaNames []
if err == sql.ErrNoRows {
log.Infof("Deleted directory %s -- schema %s no longer exists\n", dir, handledSchemaNames[0])
// Explicitly return here to prevent later attempt at subdir traversal
return nil, dir.Delete()
return nil, skipCount, dir.Delete()
} else if err != nil {
return nil, fmt.Errorf("%s: Unable to fetch schema %s from %s: %s", dir, handledSchemaNames[0], instance, err)
return nil, skipCount, fmt.Errorf("%s: Unable to fetch schema %s from %s: %s", dir, handledSchemaNames[0], instance, err)
}
if err = pullSchemaDir(dir, instance, instSchema, idealSchema); err != nil {
return nil, err
return nil, skipCount, err
}
}
}
if subdirs, badCount, err := dir.Subdirs(); err != nil {
log.Errorf("Cannot list subdirs of %s: %s", dir, err)
*errCount++
skipCount++
} else if len(subdirs) > 0 && maxDepth <= 0 {
log.Warnf("Not walking subdirs of %s: max depth reached", dir)
*errCount += len(subdirs)
skipCount += len(subdirs)
} else {
*errCount += badCount
skipCount += badCount
allSubSchemaNames := make([]string, 0)
for _, sub := range subdirs {
subSchemaNames, walkErr := pullWalker(sub, errCount, maxDepth-1)
subSchemaNames, subSkipCount, walkErr := pullWalker(sub, maxDepth-1)
skipCount += subSkipCount
if walkErr != nil {
return nil, walkErr
return nil, skipCount, walkErr
}
allSubSchemaNames = append(allSubSchemaNames, subSchemaNames...)
}
if instance != nil && !dir.Config.Changed("schema") {
updateFlavor(dir, instance)
return nil, findNewSchemas(dir, instance, allSubSchemaNames)
return nil, skipCount, findNewSchemas(dir, instance, allSubSchemaNames)
}
}
return handledSchemaNames, nil
return handledSchemaNames, skipCount, nil
}
// pullSchemaDir performs appropriate pull logic on a dir that maps to one or

Просмотреть файл

@ -54,7 +54,7 @@ func PushHandler(cfg *mybase.Config) error {
briefMode := dir.Config.GetBool("dry-run") && dir.Config.GetBool("brief")
printer := applier.NewPrinter(briefMode)
g, ctx := errgroup.WithContext(context.Background())
tgchan, skipCountPtr := applier.TargetGroupChanForDir(dir)
tgchan, skipCount := applier.TargetGroupChanForDir(dir)
results := make(chan applier.Result)
workerCount, err := dir.Config.GetInt("concurrent-instances")
@ -85,7 +85,7 @@ func PushHandler(cfg *mybase.Config) error {
return err
}
sum := applier.SumResults(allResults)
sum.SkipCount += *skipCountPtr
sum.SkipCount += skipCount
if sum.SkipCount+sum.UnsupportedCount == 0 {
if dir.Config.GetBool("dry-run") && sum.Differences {

Просмотреть файл

@ -5,6 +5,7 @@ import (
"os"
log "github.com/sirupsen/logrus"
"github.com/skeema/skeema/util"
)
// ExitValue represents an exit code for an operation. It satisfies the Error
@ -76,5 +77,10 @@ func Exit(err error) {
}
log.Debugf("Exit code %d", exitCode)
}
// Gracefully close all connection pools, to avoid aborted connection counter/
// logging in some versions of MySQL
util.CloseCachedConnectionPools()
os.Exit(exitCode)
}

Просмотреть файл

@ -306,8 +306,8 @@ func (dir *Dir) SchemaNames(instance *tengo.Instance) ([]string, error) {
}
// HasSchema returns true if this dir maps to at least one schema, either by
// stating a "schema" option in the dir's config, and/or by having *.sql files
// that explicitly mention a schema name
// stating a "schema" option in the dir's config for the current environment,
// and/or by having *.sql files that explicitly mention a schema name.
func (dir *Dir) HasSchema() bool {
if dir.Config.Changed("schema") {
return true

4
testdata/applier/schemaerror/four/four.sql поставляемый
Просмотреть файл

@ -0,0 +1,4 @@
CREATE TABLE four (
name varchar(30) NOT NULL,
PRIMARY KEY (name)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Просмотреть файл

@ -0,0 +1,4 @@
CREATE TABLE three (
name varchar(30) NOT NULL,
PRIMARY KEY (name)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Просмотреть файл

@ -34,3 +34,11 @@ func NewInstance(driver, dsn string) (*tengo.Instance, error) {
instanceCache.instanceMap[key] = instance
return instance, nil
}
// CloseCachedConnectionPools closes all connection pools in all cached
// Instances that were created via NewInstance.
func CloseCachedConnectionPools() {
for _, inst := range instanceCache.instanceMap {
inst.CloseAll()
}
}

14
vendor/github.com/skeema/tengo/instance.go сгенерированный поставляемый
Просмотреть файл

@ -26,7 +26,7 @@ type Instance struct {
Port int
SocketPath string
defaultParams map[string]string
connectionPool map[string]*sqlx.DB // key is in format "schema?params" or just "schema" if no params
connectionPool map[string]*sqlx.DB // key is in format "schema?params"
*sync.RWMutex // protects connectionPool for concurrent operations
flavor Flavor
version [3]int
@ -182,6 +182,18 @@ func (instance *Instance) CanConnect() (bool, error) {
return err == nil, err
}
// CloseAll closes all of instance's connection pools. This can be useful for
// graceful shutdown, to avoid aborted-connection counters/logging in some
// versions of MySQL.
func (instance *Instance) CloseAll() {
instance.Lock()
for key, db := range instance.connectionPool {
db.Close()
delete(instance.connectionPool, key)
}
instance.Unlock()
}
// Flavor returns this instance's flavor value, representing the database
// distribution/fork/vendor as well as major and minor version. If this is
// unable to be determined or an error occurs, FlavorUnknown will be returned.