зеркало из https://github.com/github/gh-ost.git
`golang-ci`: fix `staticcheck` linter warnings
This commit is contained in:
Родитель
b751499091
Коммит
500fbefdb9
|
@ -5,10 +5,7 @@ run:
|
||||||
linters:
|
linters:
|
||||||
disable:
|
disable:
|
||||||
- errcheck
|
- errcheck
|
||||||
- staticcheck
|
|
||||||
enable:
|
enable:
|
||||||
- gosimple
|
|
||||||
- govet
|
|
||||||
- noctx
|
- noctx
|
||||||
- rowserrcheck
|
- rowserrcheck
|
||||||
- sqlclosecheck
|
- sqlclosecheck
|
||||||
|
|
|
@ -846,30 +846,30 @@ func (this *MigrationContext) ReadConfigFile() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("client").Haskey("user") {
|
if cfg.Section("client").HasKey("user") {
|
||||||
this.config.Client.User = cfg.Section("client").Key("user").String()
|
this.config.Client.User = cfg.Section("client").Key("user").String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("client").Haskey("password") {
|
if cfg.Section("client").HasKey("password") {
|
||||||
this.config.Client.Password = cfg.Section("client").Key("password").String()
|
this.config.Client.Password = cfg.Section("client").Key("password").String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("osc").Haskey("chunk_size") {
|
if cfg.Section("osc").HasKey("chunk_size") {
|
||||||
this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64()
|
this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to read osc chunk size: %s", err.Error())
|
return fmt.Errorf("Unable to read osc chunk size: %s", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("osc").Haskey("max_load") {
|
if cfg.Section("osc").HasKey("max_load") {
|
||||||
this.config.Osc.Max_Load = cfg.Section("osc").Key("max_load").String()
|
this.config.Osc.Max_Load = cfg.Section("osc").Key("max_load").String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("osc").Haskey("replication_lag_query") {
|
if cfg.Section("osc").HasKey("replication_lag_query") {
|
||||||
this.config.Osc.Replication_Lag_Query = cfg.Section("osc").Key("replication_lag_query").String()
|
this.config.Osc.Replication_Lag_Query = cfg.Section("osc").Key("replication_lag_query").String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Section("osc").Haskey("max_lag_millis") {
|
if cfg.Section("osc").HasKey("max_lag_millis") {
|
||||||
this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64()
|
this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Unable to read max lag millis: %s", err.Error())
|
return fmt.Errorf("Unable to read max lag millis: %s", err.Error())
|
||||||
|
|
|
@ -69,7 +69,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
extraPortQuery := `select @@global.extra_port`
|
extraPortQuery := `select @@global.extra_port`
|
||||||
if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil {
|
if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil { // nolint:staticcheck
|
||||||
// swallow this error. not all servers support extra_port
|
// swallow this error. not all servers support extra_port
|
||||||
}
|
}
|
||||||
// AliyunRDS set users port to "NULL", replace it by gh-ost param
|
// AliyunRDS set users port to "NULL", replace it by gh-ost param
|
||||||
|
|
|
@ -19,7 +19,8 @@ import (
|
||||||
_ "github.com/go-sql-driver/mysql"
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/openark/golib/log"
|
"github.com/openark/golib/log"
|
||||||
|
|
||||||
"golang.org/x/crypto/ssh/terminal"
|
// TODO: move to golang.org/x/term
|
||||||
|
"golang.org/x/crypto/ssh/terminal" // nolint:staticcheck
|
||||||
)
|
)
|
||||||
|
|
||||||
var AppVersion string
|
var AppVersion string
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
Copyright 2021 GitHub Inc.
|
Copyright 2022 GitHub Inc.
|
||||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -71,7 +71,6 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Applier) InitDBConnections() (err error) {
|
func (this *Applier) InitDBConnections() (err error) {
|
||||||
|
|
||||||
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -344,8 +343,10 @@ func (this *Applier) InitiateHeartbeat() {
|
||||||
}
|
}
|
||||||
injectHeartbeat()
|
injectHeartbeat()
|
||||||
|
|
||||||
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||||
for range heartbeatTick {
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
<-ticker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,8 @@ type ChangelogState string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
||||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed"
|
||||||
ReadMigrationRangeValues = "ReadMigrationRangeValues"
|
ReadMigrationRangeValues ChangelogState = "ReadMigrationRangeValues"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadChangelogState(s string) ChangelogState {
|
func ReadChangelogState(s string) ChangelogState {
|
||||||
|
@ -809,17 +809,17 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initiateStatus sets and activates the printStatus() ticker
|
// initiateStatus sets and activates the printStatus() ticker
|
||||||
func (this *Migrator) initiateStatus() error {
|
func (this *Migrator) initiateStatus() {
|
||||||
this.printStatus(ForcePrintStatusAndHintRule)
|
this.printStatus(ForcePrintStatusAndHintRule)
|
||||||
statusTick := time.Tick(1 * time.Second)
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
for range statusTick {
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
<-ticker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
go this.printStatus(HeuristicPrintStatusRule)
|
go this.printStatus(HeuristicPrintStatusRule)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// printMigrationStatusHint prints a detailed configuration dump, that is useful
|
// printMigrationStatusHint prints a detailed configuration dump, that is useful
|
||||||
|
@ -1052,8 +1052,10 @@ func (this *Migrator) initiateStreaming() error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.Tick(1 * time.Second)
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
for range ticker {
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
<-ticker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,7 +134,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
argIsQuestion := (arg == "?")
|
argIsQuestion := (arg == "?")
|
||||||
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
|
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged"
|
||||||
|
|
||||||
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
|
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
|
||||||
return NoPrintStatusRule, err
|
return NoPrintStatusRule, err
|
||||||
|
@ -282,7 +282,7 @@ help # This message
|
||||||
return NoPrintStatusRule, nil
|
return NoPrintStatusRule, nil
|
||||||
}
|
}
|
||||||
this.migrationContext.SetThrottleQuery(arg)
|
this.migrationContext.SetThrottleQuery(arg)
|
||||||
fmt.Fprintf(writer, throttleHint)
|
fmt.Fprintln(writer, throttleHint)
|
||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
case "throttle-http":
|
case "throttle-http":
|
||||||
|
@ -292,7 +292,7 @@ help # This message
|
||||||
return NoPrintStatusRule, nil
|
return NoPrintStatusRule, nil
|
||||||
}
|
}
|
||||||
this.migrationContext.SetThrottleHTTP(arg)
|
this.migrationContext.SetThrottleHTTP(arg)
|
||||||
fmt.Fprintf(writer, throttleHint)
|
fmt.Fprintln(writer, throttleHint)
|
||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
case "throttle-control-replicas":
|
case "throttle-control-replicas":
|
||||||
|
@ -315,7 +315,7 @@ help # This message
|
||||||
return NoPrintStatusRule, err
|
return NoPrintStatusRule, err
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
|
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
|
||||||
fmt.Fprintf(writer, throttleHint)
|
fmt.Fprintln(writer, throttleHint)
|
||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
case "no-throttle", "unthrottle", "resume", "continue":
|
case "no-throttle", "unthrottle", "resume", "continue":
|
||||||
|
|
|
@ -87,10 +87,10 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
|
||||||
|
|
||||||
for _, listener := range this.listeners {
|
for _, listener := range this.listeners {
|
||||||
listener := listener
|
listener := listener
|
||||||
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
|
if !strings.EqualFold(listener.databaseName, binlogEvent.DatabaseName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
|
if !strings.EqualFold(listener.tableName, binlogEvent.TableName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if listener.async {
|
if listener.async {
|
||||||
|
|
|
@ -168,8 +168,10 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
|
||||||
collectFunc()
|
collectFunc()
|
||||||
firstThrottlingCollected <- true
|
firstThrottlingCollected <- true
|
||||||
|
|
||||||
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||||
for range ticker {
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
<-ticker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -244,12 +246,16 @@ func (this *Throttler) collectControlReplicasLag() {
|
||||||
}
|
}
|
||||||
this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag())
|
this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag())
|
||||||
}
|
}
|
||||||
aggressiveTicker := time.Tick(100 * time.Millisecond)
|
|
||||||
|
aggressiveTicker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer aggressiveTicker.Stop()
|
||||||
|
|
||||||
relaxedFactor := 10
|
relaxedFactor := 10
|
||||||
counter := 0
|
counter := 0
|
||||||
shouldReadLagAggressively := false
|
shouldReadLagAggressively := false
|
||||||
|
|
||||||
for range aggressiveTicker {
|
for {
|
||||||
|
<-aggressiveTicker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -321,8 +327,10 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
|
||||||
firstThrottlingCollected <- true
|
firstThrottlingCollected <- true
|
||||||
|
|
||||||
collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond
|
collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond
|
||||||
ticker := time.Tick(collectInterval)
|
ticker := time.NewTicker(collectInterval)
|
||||||
for range ticker {
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
<-ticker.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -441,8 +449,10 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan
|
||||||
this.collectGeneralThrottleMetrics()
|
this.collectGeneralThrottleMetrics()
|
||||||
firstThrottlingCollected <- true
|
firstThrottlingCollected <- true
|
||||||
|
|
||||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
throttlerMetricsTick := time.NewTicker(1 * time.Second)
|
||||||
for range throttlerMetricsTick {
|
defer throttlerMetricsTick.Stop()
|
||||||
|
for {
|
||||||
|
<-throttlerMetricsTick.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -453,8 +463,9 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan
|
||||||
}
|
}
|
||||||
|
|
||||||
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
|
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
|
||||||
func (this *Throttler) initiateThrottlerChecks() error {
|
func (this *Throttler) initiateThrottlerChecks() {
|
||||||
throttlerTick := time.Tick(100 * time.Millisecond)
|
throttlerTick := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer throttlerTick.Stop()
|
||||||
|
|
||||||
throttlerFunction := func() {
|
throttlerFunction := func() {
|
||||||
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
|
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
|
||||||
|
@ -472,14 +483,13 @@ func (this *Throttler) initiateThrottlerChecks() error {
|
||||||
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
|
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
|
||||||
}
|
}
|
||||||
throttlerFunction()
|
throttlerFunction()
|
||||||
for range throttlerTick {
|
for {
|
||||||
|
<-throttlerTick.C
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
throttlerFunction()
|
throttlerFunction()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
|
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
|
||||||
|
|
|
@ -501,6 +501,9 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
|
||||||
}
|
}
|
||||||
|
|
||||||
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
|
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
|
||||||
|
if err != nil {
|
||||||
|
return "", sharedArgs, uniqueKeyArgs, err
|
||||||
|
}
|
||||||
result = fmt.Sprintf(`
|
result = fmt.Sprintf(`
|
||||||
update /* gh-ost %s.%s */
|
update /* gh-ost %s.%s */
|
||||||
%s.%s
|
%s.%s
|
||||||
|
|
Загрузка…
Ссылка в новой задаче