зеркало из https://github.com/github/vitess-gh.git
vtworker: Adjust the interactive method signature to be more similar to the command line version.
Now, the interactiveCommand implementations contain less vtworker-specific logic (e.g. setting the worker, sending a redirect). This way, it's easier to pass the wrangler to Instance.setAndStartWorker(). Also, commands no longer have to duplicate the same code.
This commit is contained in:
Родитель
c09d05d131
Коммит
f00de8bd05
|
@ -7,6 +7,7 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
@ -22,7 +23,7 @@ import (
|
|||
type command struct {
|
||||
Name string
|
||||
method func(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error)
|
||||
Interactive func(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request)
|
||||
Interactive func(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error)
|
||||
Params string
|
||||
Help string // if help is empty, won't list the command
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"net/http"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -41,9 +42,9 @@ const subIndexHTML = `
|
|||
</body>
|
||||
`
|
||||
|
||||
func httpError(w http.ResponseWriter, format string, err error) {
|
||||
log.Errorf(format, err)
|
||||
http.Error(w, fmt.Sprintf(format, err), http.StatusInternalServerError)
|
||||
func httpError(w http.ResponseWriter, format string, args ...interface{}) {
|
||||
log.Errorf(format, args)
|
||||
http.Error(w, fmt.Sprintf(format, args), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
func mustParseTemplate(name, contents string) *template.Template {
|
||||
|
@ -81,12 +82,28 @@ func (wi *Instance) InitInteractiveMode() {
|
|||
})
|
||||
|
||||
for _, c := range cg.Commands {
|
||||
// keep a local copy of the Command pointer for the
|
||||
// closure.
|
||||
// keep a local copy of the Command pointer for the closure.
|
||||
pc := c
|
||||
http.HandleFunc("/"+cg.Name+"/"+c.Name, func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
pc.Interactive(wi, ctx, wi.Wr, w, r)
|
||||
wrk, template, data, err := pc.Interactive(wi, ctx, wi.Wr, w, r)
|
||||
if err != nil {
|
||||
httpError(w, "%s", err)
|
||||
} else if template != nil && data != nil {
|
||||
executeTemplate(w, template, data)
|
||||
return
|
||||
}
|
||||
|
||||
if wrk == nil {
|
||||
httpError(w, "Internal server error. Command: %s did not return correct response.", c.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := wi.setAndStartWorker(wrk, wi.wr); err != nil {
|
||||
httpError(w, "Could not set %s worker: %s", c.Name, err)
|
||||
return
|
||||
}
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,9 +7,9 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -53,31 +53,22 @@ func commandPing(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, ar
|
|||
return worker, nil
|
||||
}
|
||||
|
||||
func interactivePing(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) {
|
||||
func interactivePing(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
httpError(w, "Cannot parse form: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("Cannot parse form: %s", err)
|
||||
}
|
||||
|
||||
message := r.FormValue("message")
|
||||
if message == "" {
|
||||
result := make(map[string]interface{})
|
||||
executeTemplate(w, pingTemplate, result)
|
||||
return
|
||||
return nil, pingTemplate, result, nil
|
||||
}
|
||||
|
||||
// start the clone job
|
||||
wrk, err := NewPingWorker(wr, message)
|
||||
if err != nil {
|
||||
httpError(w, "Could not create Ping worker: %v", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("Could not create Ping worker: %v", err)
|
||||
}
|
||||
if _, err := wi.setAndStartWorker(wrk); err != nil {
|
||||
httpError(w, "Could not set Ping worker: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
return wrk, nil, nil, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -7,13 +7,13 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
|
@ -150,10 +150,9 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler)
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func interactiveSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) {
|
||||
func interactiveSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
httpError(w, "cannot parse form: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err)
|
||||
}
|
||||
|
||||
keyspace := r.FormValue("keyspace")
|
||||
|
@ -168,9 +167,7 @@ func interactiveSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrang
|
|||
} else {
|
||||
result["Choices"] = choices
|
||||
}
|
||||
|
||||
executeTemplate(w, splitCloneTemplate, result)
|
||||
return
|
||||
return nil, splitCloneTemplate, result, nil
|
||||
}
|
||||
|
||||
sourceReaderCountStr := r.FormValue("sourceReaderCount")
|
||||
|
@ -183,8 +180,7 @@ func interactiveSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrang
|
|||
result["DefaultDestinationPackCount"] = fmt.Sprintf("%v", defaultDestinationPackCount)
|
||||
result["DefaultMinTableSizeForSplit"] = fmt.Sprintf("%v", defaultMinTableSizeForSplit)
|
||||
result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount)
|
||||
executeTemplate(w, splitCloneTemplate2, result)
|
||||
return
|
||||
return nil, splitCloneTemplate2, result, nil
|
||||
}
|
||||
|
||||
// get other parameters
|
||||
|
@ -197,39 +193,29 @@ func interactiveSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrang
|
|||
strategy := r.FormValue("strategy")
|
||||
sourceReaderCount, err := strconv.ParseInt(sourceReaderCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse sourceReaderCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse sourceReaderCount: %s", err)
|
||||
}
|
||||
destinationPackCount, err := strconv.ParseInt(destinationPackCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse destinationPackCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse destinationPackCount: %s", err)
|
||||
}
|
||||
minTableSizeForSplitStr := r.FormValue("minTableSizeForSplit")
|
||||
minTableSizeForSplit, err := strconv.ParseInt(minTableSizeForSplitStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse minTableSizeForSplit: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse minTableSizeForSplit: %s", err)
|
||||
}
|
||||
destinationWriterCountStr := r.FormValue("destinationWriterCount")
|
||||
destinationWriterCount, err := strconv.ParseInt(destinationWriterCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse destinationWriterCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse destinationWriterCount: %s", err)
|
||||
}
|
||||
|
||||
// start the clone job
|
||||
wrk, err := NewSplitCloneWorker(wr, wi.cell, keyspace, shard, excludeTableArray, strategy, int(sourceReaderCount), int(destinationPackCount), uint64(minTableSizeForSplit), int(destinationWriterCount))
|
||||
if err != nil {
|
||||
httpError(w, "cannot create worker: %v", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot create worker: %v", err)
|
||||
}
|
||||
if _, err := wi.setAndStartWorker(wrk); err != nil {
|
||||
httpError(w, "cannot set worker: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
return wrk, nil, nil, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -7,12 +7,12 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -131,10 +131,9 @@ func shardsWithSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func interactiveSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) {
|
||||
func interactiveSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
httpError(w, "cannot parse form: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err)
|
||||
}
|
||||
keyspace := r.FormValue("keyspace")
|
||||
shard := r.FormValue("shard")
|
||||
|
@ -148,9 +147,7 @@ func interactiveSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangl
|
|||
} else {
|
||||
result["Shards"] = shards
|
||||
}
|
||||
|
||||
executeTemplate(w, splitDiffTemplate, result)
|
||||
return
|
||||
return nil, splitDiffTemplate, result, nil
|
||||
}
|
||||
|
||||
submitButtonValue := r.FormValue("submit")
|
||||
|
@ -159,8 +156,7 @@ func interactiveSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangl
|
|||
result := make(map[string]interface{})
|
||||
result["Keyspace"] = keyspace
|
||||
result["Shard"] = shard
|
||||
executeTemplate(w, splitDiffTemplate2, result)
|
||||
return
|
||||
return nil, splitDiffTemplate2, result, nil
|
||||
}
|
||||
|
||||
// Process input form.
|
||||
|
@ -172,12 +168,10 @@ func interactiveSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangl
|
|||
|
||||
// start the diff job
|
||||
wrk := NewSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray)
|
||||
if _, err := wi.setAndStartWorker(wrk); err != nil {
|
||||
httpError(w, "cannot set worker: %s", err)
|
||||
return
|
||||
if _, err := wi.setAndStartWorker(wrk, nil); err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("cannot set worker: %s", err)
|
||||
}
|
||||
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
return wrk, nil, nil, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -7,13 +7,13 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -147,10 +147,9 @@ func keyspacesWithServedFrom(ctx context.Context, wr *wrangler.Wrangler) ([]stri
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func interactiveVerticalSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) {
|
||||
func interactiveVerticalSplitClone(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
httpError(w, "cannot parse form: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err)
|
||||
}
|
||||
|
||||
keyspace := r.FormValue("keyspace")
|
||||
|
@ -163,9 +162,7 @@ func interactiveVerticalSplitClone(wi *Instance, ctx context.Context, wr *wrangl
|
|||
} else {
|
||||
result["Keyspaces"] = keyspaces
|
||||
}
|
||||
|
||||
executeTemplate(w, verticalSplitCloneTemplate, result)
|
||||
return
|
||||
return nil, verticalSplitCloneTemplate, result, nil
|
||||
}
|
||||
|
||||
tables := r.FormValue("tables")
|
||||
|
@ -177,8 +174,7 @@ func interactiveVerticalSplitClone(wi *Instance, ctx context.Context, wr *wrangl
|
|||
result["DefaultDestinationPackCount"] = fmt.Sprintf("%v", defaultDestinationPackCount)
|
||||
result["DefaultMinTableSizeForSplit"] = fmt.Sprintf("%v", defaultMinTableSizeForSplit)
|
||||
result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount)
|
||||
executeTemplate(w, verticalSplitCloneTemplate2, result)
|
||||
return
|
||||
return nil, verticalSplitCloneTemplate2, result, nil
|
||||
}
|
||||
tableArray := strings.Split(tables, ",")
|
||||
|
||||
|
@ -187,39 +183,30 @@ func interactiveVerticalSplitClone(wi *Instance, ctx context.Context, wr *wrangl
|
|||
sourceReaderCountStr := r.FormValue("sourceReaderCount")
|
||||
sourceReaderCount, err := strconv.ParseInt(sourceReaderCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse sourceReaderCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse sourceReaderCount: %s", err)
|
||||
}
|
||||
destinationPackCountStr := r.FormValue("destinationPackCount")
|
||||
destinationPackCount, err := strconv.ParseInt(destinationPackCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse destinationPackCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse destinationPackCount: %s", err)
|
||||
}
|
||||
minTableSizeForSplitStr := r.FormValue("minTableSizeForSplit")
|
||||
minTableSizeForSplit, err := strconv.ParseInt(minTableSizeForSplitStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse minTableSizeForSplit: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse minTableSizeForSplit: %s", err)
|
||||
}
|
||||
destinationWriterCountStr := r.FormValue("destinationWriterCount")
|
||||
destinationWriterCount, err := strconv.ParseInt(destinationWriterCountStr, 0, 64)
|
||||
if err != nil {
|
||||
httpError(w, "cannot parse destinationWriterCount: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse destinationWriterCount: %s", err)
|
||||
}
|
||||
|
||||
// start the clone job
|
||||
wrk, err := NewVerticalSplitCloneWorker(wr, wi.cell, keyspace, "0", tableArray, strategy, int(sourceReaderCount), int(destinationPackCount), uint64(minTableSizeForSplit), int(destinationWriterCount))
|
||||
if err != nil {
|
||||
httpError(w, "cannot create worker: %v", err)
|
||||
return nil, nil, nil, fmt.Errorf("cannot create worker: %v", err)
|
||||
}
|
||||
if _, err := wi.setAndStartWorker(wrk); err != nil {
|
||||
httpError(w, "cannot set worker: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
return wrk, nil, nil, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -7,12 +7,12 @@ package worker
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -130,10 +130,9 @@ func shardsWithTablesSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func interactiveVerticalSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) {
|
||||
func interactiveVerticalSplitDiff(wi *Instance, ctx context.Context, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) {
|
||||
if err := r.ParseForm(); err != nil {
|
||||
httpError(w, "cannot parse form: %s", err)
|
||||
return
|
||||
return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err)
|
||||
}
|
||||
keyspace := r.FormValue("keyspace")
|
||||
shard := r.FormValue("shard")
|
||||
|
@ -147,9 +146,7 @@ func interactiveVerticalSplitDiff(wi *Instance, ctx context.Context, wr *wrangle
|
|||
} else {
|
||||
result["Shards"] = shards
|
||||
}
|
||||
|
||||
executeTemplate(w, verticalSplitDiffTemplate, result)
|
||||
return
|
||||
return nil, verticalSplitDiffTemplate, result, nil
|
||||
}
|
||||
|
||||
submitButtonValue := r.FormValue("submit")
|
||||
|
@ -158,8 +155,7 @@ func interactiveVerticalSplitDiff(wi *Instance, ctx context.Context, wr *wrangle
|
|||
result := make(map[string]interface{})
|
||||
result["Keyspace"] = keyspace
|
||||
result["Shard"] = shard
|
||||
executeTemplate(w, verticalSplitDiffTemplate2, result)
|
||||
return
|
||||
return nil, verticalSplitDiffTemplate2, result, nil
|
||||
}
|
||||
|
||||
// Process input form.
|
||||
|
@ -171,12 +167,7 @@ func interactiveVerticalSplitDiff(wi *Instance, ctx context.Context, wr *wrangle
|
|||
|
||||
// start the diff job
|
||||
wrk := NewVerticalSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray)
|
||||
if _, err := wi.setAndStartWorker(wrk); err != nil {
|
||||
httpError(w, "cannot set worker: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, servenv.StatusURLPath(), http.StatusTemporaryRedirect)
|
||||
return wrk, nil, nil, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
Загрузка…
Ссылка в новой задаче