Retiring logutil.LoggerEvent in favor of logutilpb.Event.

This commit is contained in:
Alain Jobart 2015-11-16 08:08:18 -08:00
Родитель 7bd36d95b5
Коммит 6389f7bc2e
28 изменённых файлов: 178 добавлений и 203 удалений

Просмотреть файл

@ -15,6 +15,8 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// The default values used by these flags cannot be taken from wrangler and
@ -33,16 +35,16 @@ func main() {
err := vtctlclient.RunCommandAndWait(
context.Background(), *server, flag.Args(),
*dialTimeout, *actionTimeout,
func(e *logutil.LoggerEvent) {
func(e *logutilpb.Event) {
switch e.Level {
case logutil.LOGGER_INFO:
log.Info(e.String())
case logutil.LOGGER_WARNING:
log.Warning(e.String())
case logutil.LOGGER_ERROR:
log.Error(e.String())
case logutil.LOGGER_CONSOLE:
fmt.Print(e.String())
case logutilpb.Level_INFO:
log.Info(logutil.EventString(e))
case logutilpb.Level_WARNING:
log.Warning(logutil.EventString(e))
case logutilpb.Level_ERROR:
log.Error(logutil.EventString(e))
case logutilpb.Level_CONSOLE:
fmt.Print(logutil.EventString(e))
}
})
if err != nil {

Просмотреть файл

@ -14,6 +14,8 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// The default values used by these flags cannot be taken from wrangler and
@ -31,16 +33,16 @@ func main() {
err := vtworkerclient.RunCommandAndWait(
ctx, *server, flag.Args(),
func(e *logutil.LoggerEvent) {
func(e *logutilpb.Event) {
switch e.Level {
case logutil.LOGGER_INFO:
log.Info(e.String())
case logutil.LOGGER_WARNING:
log.Warning(e.String())
case logutil.LOGGER_ERROR:
log.Error(e.String())
case logutil.LOGGER_CONSOLE:
fmt.Print(e.String())
case logutilpb.Level_INFO:
log.Info(logutil.EventString(e))
case logutilpb.Level_WARNING:
log.Warning(logutil.EventString(e))
case logutilpb.Level_ERROR:
log.Error(logutil.EventString(e))
case logutilpb.Level_CONSOLE:
fmt.Print(logutil.EventString(e))
}
})
if err != nil {

Просмотреть файл

@ -13,9 +13,12 @@ import (
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// ExecuteVtctl runs vtctl using vtctlclient. The stream of LoggerEvent messages is concatenated into one output string.
// ExecuteVtctl runs vtctl using vtctlclient. The stream of Event
// messages is concatenated into one output string.
func ExecuteVtctl(ctx context.Context, server string, args []string) (string, error) {
var output bytes.Buffer
@ -33,9 +36,9 @@ func ExecuteVtctl(ctx context.Context, server string, args []string) (string, er
// CreateLoggerEventToBufferFunction returns a function to add LoggerEvent
// structs to a given buffer, one line per event.
// The buffer can be used to return a multi-line string with all events.
func CreateLoggerEventToBufferFunction(output *bytes.Buffer) func(*logutil.LoggerEvent) {
return func(e *logutil.LoggerEvent) {
e.ToBuffer(output)
func CreateLoggerEventToBufferFunction(output *bytes.Buffer) func(*logutilpb.Event) {
return func(e *logutilpb.Event) {
logutil.EventToBuffer(e, output)
output.WriteRune('\n')
}
}

Просмотреть файл

@ -8,6 +8,8 @@ import (
"strings"
"sync"
"time"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// Logger defines the interface to use for our logging interface.
@ -24,48 +26,28 @@ type Logger interface {
Printf(format string, v ...interface{})
}
// The logger levels are used to store individual logging events
const (
// the usual logging levels
LOGGER_INFO = iota
LOGGER_WARNING
LOGGER_ERROR
// for messages that may contains non-logging events
LOGGER_CONSOLE
)
// LoggerEvent is used to manage individual logging events. It is used
// by ChannelLogger and MemoryLogger.
type LoggerEvent struct {
Time time.Time
Level int
File string
Line int
Value string
}
// ToBuffer formats an individual LoggerEvent into a buffer, without the
// EventToBuffer formats an individual Event into a buffer, without the
// final '\n'
func (event *LoggerEvent) ToBuffer(buf *bytes.Buffer) {
func EventToBuffer(event *logutilpb.Event, buf *bytes.Buffer) {
// Avoid Fprintf, for speed. The format is so simple that we
// can do it quickly by hand. It's worth about 3X. Fprintf is hard.
// Lmmdd hh:mm:ss.uuuuuu file:line]
switch event.Level {
case LOGGER_INFO:
case logutilpb.Level_INFO:
buf.WriteByte('I')
case LOGGER_WARNING:
case logutilpb.Level_WARNING:
buf.WriteByte('W')
case LOGGER_ERROR:
case logutilpb.Level_ERROR:
buf.WriteByte('E')
case LOGGER_CONSOLE:
case logutilpb.Level_CONSOLE:
buf.WriteString(event.Value)
return
}
_, month, day := event.Time.Date()
hour, minute, second := event.Time.Clock()
t := ProtoToTime(event.Time)
_, month, day := t.Date()
hour, minute, second := t.Clock()
twoDigits(buf, int(month))
twoDigits(buf, day)
buf.WriteByte(' ')
@ -75,7 +57,7 @@ func (event *LoggerEvent) ToBuffer(buf *bytes.Buffer) {
buf.WriteByte(':')
twoDigits(buf, second)
buf.WriteByte('.')
nDigits(buf, 6, event.Time.Nanosecond()/1000, '0')
nDigits(buf, 6, t.Nanosecond()/1000, '0')
buf.WriteByte(' ')
buf.WriteString(event.File)
buf.WriteByte(':')
@ -85,28 +67,28 @@ func (event *LoggerEvent) ToBuffer(buf *bytes.Buffer) {
buf.WriteString(event.Value)
}
// String returns the line in one string
func (event *LoggerEvent) String() string {
// EventString returns the line in one string
func EventString(event *logutilpb.Event) string {
buf := new(bytes.Buffer)
event.ToBuffer(buf)
EventToBuffer(event, buf)
return buf.String()
}
// ChannelLogger is a Logger that sends the logging events through a channel for
// consumption.
type ChannelLogger chan LoggerEvent
type ChannelLogger chan *logutilpb.Event
// NewChannelLogger returns a ChannelLogger fo the given size
func NewChannelLogger(size int) ChannelLogger {
return make(chan LoggerEvent, size)
return make(chan *logutilpb.Event, size)
}
// Infof is part of the Logger interface
func (cl ChannelLogger) Infof(format string, v ...interface{}) {
file, line := fileAndLine(2)
(chan LoggerEvent)(cl) <- LoggerEvent{
Time: time.Now(),
Level: LOGGER_INFO,
(chan *logutilpb.Event)(cl) <- &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_INFO,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -116,9 +98,9 @@ func (cl ChannelLogger) Infof(format string, v ...interface{}) {
// Warningf is part of the Logger interface
func (cl ChannelLogger) Warningf(format string, v ...interface{}) {
file, line := fileAndLine(2)
(chan LoggerEvent)(cl) <- LoggerEvent{
Time: time.Now(),
Level: LOGGER_WARNING,
(chan *logutilpb.Event)(cl) <- &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_WARNING,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -128,21 +110,21 @@ func (cl ChannelLogger) Warningf(format string, v ...interface{}) {
// Errorf is part of the Logger interface
func (cl ChannelLogger) Errorf(format string, v ...interface{}) {
file, line := fileAndLine(2)
(chan LoggerEvent)(cl) <- LoggerEvent{
Time: time.Now(),
Level: LOGGER_ERROR,
(chan *logutilpb.Event)(cl) <- &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_ERROR,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
}
}
// Errorf is part of the Logger interface
// Printf is part of the Logger interface
func (cl ChannelLogger) Printf(format string, v ...interface{}) {
file, line := fileAndLine(2)
(chan LoggerEvent)(cl) <- LoggerEvent{
Time: time.Now(),
Level: LOGGER_CONSOLE,
(chan *logutilpb.Event)(cl) <- &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -153,7 +135,7 @@ func (cl ChannelLogger) Printf(format string, v ...interface{}) {
// All protected by a mutex.
type MemoryLogger struct {
mu sync.Mutex
Events []LoggerEvent
Events []*logutilpb.Event
}
// NewMemoryLogger returns a new MemoryLogger
@ -166,9 +148,9 @@ func (ml *MemoryLogger) Infof(format string, v ...interface{}) {
file, line := fileAndLine(2)
ml.mu.Lock()
defer ml.mu.Unlock()
ml.Events = append(ml.Events, LoggerEvent{
Time: time.Now(),
Level: LOGGER_INFO,
ml.Events = append(ml.Events, &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_INFO,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -180,9 +162,9 @@ func (ml *MemoryLogger) Warningf(format string, v ...interface{}) {
file, line := fileAndLine(2)
ml.mu.Lock()
defer ml.mu.Unlock()
ml.Events = append(ml.Events, LoggerEvent{
Time: time.Now(),
Level: LOGGER_WARNING,
ml.Events = append(ml.Events, &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_WARNING,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -194,9 +176,9 @@ func (ml *MemoryLogger) Errorf(format string, v ...interface{}) {
file, line := fileAndLine(2)
ml.mu.Lock()
defer ml.mu.Unlock()
ml.Events = append(ml.Events, LoggerEvent{
Time: time.Now(),
Level: LOGGER_ERROR,
ml.Events = append(ml.Events, &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_ERROR,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -208,9 +190,9 @@ func (ml *MemoryLogger) Printf(format string, v ...interface{}) {
file, line := fileAndLine(2)
ml.mu.Lock()
defer ml.mu.Unlock()
ml.Events = append(ml.Events, LoggerEvent{
Time: time.Now(),
Level: LOGGER_CONSOLE,
ml.Events = append(ml.Events, &logutilpb.Event{
Time: TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: file,
Line: line,
Value: fmt.Sprintf(format, v...),
@ -223,7 +205,7 @@ func (ml *MemoryLogger) String() string {
ml.mu.Lock()
defer ml.mu.Unlock()
for _, event := range ml.Events {
event.ToBuffer(buf)
EventToBuffer(event, buf)
buf.WriteByte('\n')
}
return buf.String()
@ -255,6 +237,7 @@ type TeeLogger struct {
One, Two Logger
}
// NewTeeLogger returns a logger that sends its logs to both loggers
func NewTeeLogger(one, two Logger) *TeeLogger {
return &TeeLogger{
One: one,
@ -262,21 +245,25 @@ func NewTeeLogger(one, two Logger) *TeeLogger {
}
}
// Infof is part of the Logger interface
func (tl *TeeLogger) Infof(format string, v ...interface{}) {
tl.One.Infof(format, v...)
tl.Two.Infof(format, v...)
}
// Warningf is part of the Logger interface
func (tl *TeeLogger) Warningf(format string, v ...interface{}) {
tl.One.Warningf(format, v...)
tl.Two.Warningf(format, v...)
}
// Errorf is part of the Logger interface
func (tl *TeeLogger) Errorf(format string, v ...interface{}) {
tl.One.Errorf(format, v...)
tl.Two.Errorf(format, v...)
}
// Printf is part of the Logger interface
func (tl *TeeLogger) Printf(format string, v ...interface{}) {
tl.One.Printf(format, v...)
tl.Two.Printf(format, v...)
@ -308,7 +295,7 @@ func nDigits(buf *bytes.Buffer, n, d int, pad byte) {
}
// someDigits adds a zero-prefixed variable-width integer to buf
func someDigits(buf *bytes.Buffer, d int) {
func someDigits(buf *bytes.Buffer, d int64) {
// Print into the top, then copy down.
tmp := make([]byte, 10)
j := 10
@ -324,7 +311,7 @@ func someDigits(buf *bytes.Buffer, d int) {
}
// fileAndLine returns the caller's file and line 2 levels above
func fileAndLine(depth int) (string, int) {
func fileAndLine(depth int) (string, int64) {
_, file, line, ok := runtime.Caller(depth)
if !ok {
return "???", 1
@ -334,5 +321,5 @@ func fileAndLine(depth int) (string, int) {
if slash >= 0 {
file = file[slash+1:]
}
return file, line
return file, int64(line)
}

Просмотреть файл

@ -3,17 +3,19 @@ package logutil
import (
"testing"
"time"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
func TestLoggerEventFormat(t *testing.T) {
testValues := []struct {
event LoggerEvent
event *logutilpb.Event
expected string
}{
{
event: LoggerEvent{
Time: time.Date(2014, time.November, 10, 23, 30, 12, 123456000, time.UTC),
Level: LOGGER_INFO,
event: &logutilpb.Event{
Time: TimeToProto(time.Date(2014, time.November, 10, 23, 30, 12, 123456000, time.UTC)),
Level: logutilpb.Level_INFO,
File: "file.go",
Line: 123,
Value: "message",
@ -21,9 +23,9 @@ func TestLoggerEventFormat(t *testing.T) {
expected: "I1110 23:30:12.123456 file.go:123] message",
},
{
event: LoggerEvent{
Time: time.Date(2014, time.January, 20, 23, 30, 12, 0, time.UTC),
Level: LOGGER_WARNING,
event: &logutilpb.Event{
Time: TimeToProto(time.Date(2014, time.January, 20, 23, 30, 12, 0, time.UTC)),
Level: logutilpb.Level_WARNING,
File: "file2.go",
Line: 567,
Value: "message %v %v",
@ -31,9 +33,9 @@ func TestLoggerEventFormat(t *testing.T) {
expected: "W0120 23:30:12.000000 file2.go:567] message %v %v",
},
{
event: LoggerEvent{
Time: time.Date(2014, time.January, 20, 23, 30, 12, 0, time.UTC),
Level: LOGGER_CONSOLE,
event: &logutilpb.Event{
Time: TimeToProto(time.Date(2014, time.January, 20, 23, 30, 12, 0, time.UTC)),
Level: logutilpb.Level_CONSOLE,
File: "file2.go",
Line: 567,
Value: "message %v %v",
@ -42,7 +44,7 @@ func TestLoggerEventFormat(t *testing.T) {
},
}
for _, testValue := range testValues {
got := testValue.event.String()
got := EventString(testValue.event)
if testValue.expected != got {
t.Errorf("invalid printing of %v: expected '%v' got '%v'", testValue.event, testValue.expected, got)
}
@ -92,25 +94,25 @@ func TestTeeLogger(t *testing.T) {
if ml.Events[0].Value != "test infof 1 2" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[0].Value)
}
if ml.Events[0].Level != LOGGER_INFO {
if ml.Events[0].Level != logutilpb.Level_INFO {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[0].Level)
}
if ml.Events[1].Value != "test warningf 2 3" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[1].Value)
}
if ml.Events[1].Level != LOGGER_WARNING {
if ml.Events[1].Level != logutilpb.Level_WARNING {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[1].Level)
}
if ml.Events[2].Value != "test errorf 3 4" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[2].Value)
}
if ml.Events[2].Level != LOGGER_ERROR {
if ml.Events[2].Level != logutilpb.Level_ERROR {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[2].Level)
}
if ml.Events[3].Value != "test printf 4 5" {
t.Errorf("Invalid ml%v[0]: %v", i+1, ml.Events[3].Value)
}
if ml.Events[3].Level != LOGGER_CONSOLE {
if ml.Events[3].Level != logutilpb.Level_CONSOLE {
t.Errorf("Invalid ml%v[0].level: %v", i+1, ml.Events[3].Level)
}
}

Просмотреть файл

@ -34,25 +34,3 @@ func TimeToProto(t time.Time) *logutilpb.Time {
Nanoseconds: int32(nanos),
}
}
// LoggerEventToProto converts a LoggerEvent to proto
func LoggerEventToProto(e *LoggerEvent) *logutilpb.Event {
return &logutilpb.Event{
Time: TimeToProto(e.Time),
Level: logutilpb.Level(e.Level),
File: e.File,
Line: int64(e.Line),
Value: e.Value,
}
}
// ProtoToLoggerEvent converts a proto into a LoggerEvent
func ProtoToLoggerEvent(e *logutilpb.Event) *LoggerEvent {
return &LoggerEvent{
Time: ProtoToTime(e.Time),
Level: int(e.Level),
File: e.File,
Line: int(e.Line),
Value: e.Value,
}
}

Просмотреть файл

@ -22,6 +22,7 @@ import (
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
@ -79,7 +80,7 @@ func logStuff(logger logutil.Logger, count int) {
}
}
func compareLoggedStuff(t *testing.T, name string, logChannel <-chan *logutil.LoggerEvent, count int) {
func compareLoggedStuff(t *testing.T, name string, logChannel <-chan *logutilpb.Event, count int) {
for i := 0; i < count; i++ {
le, ok := <-logChannel
if !ok {

Просмотреть файл

@ -15,12 +15,12 @@ import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
@ -267,8 +267,8 @@ func (client *FakeTabletManagerClient) PromoteSlave(ctx context.Context, tablet
//
// Backup is part of the tmclient.TabletManagerClient interface
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutil.LoggerEvent, tmclient.ErrFunc, error) {
logstream := make(chan *logutil.LoggerEvent, 10)
func (client *FakeTabletManagerClient) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
logstream := make(chan *logutilpb.Event, 10)
return logstream, func() error {
return nil
}, nil

Просмотреть файл

@ -14,13 +14,13 @@ import (
"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
@ -628,13 +628,13 @@ func (client *Client) PromoteSlave(ctx context.Context, tablet *topo.TabletInfo)
//
// Backup is part of the tmclient.TabletManagerClient interface
func (client *Client) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutil.LoggerEvent, tmclient.ErrFunc, error) {
func (client *Client) Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, tmclient.ErrFunc, error) {
cc, c, err := client.dial(ctx, tablet)
if err != nil {
return nil, nil, err
}
logstream := make(chan *logutil.LoggerEvent, 10)
logstream := make(chan *logutilpb.Event, 10)
stream, err := c.Backup(ctx, &tabletmanagerdatapb.BackupRequest{
Concurrency: int64(concurrency),
})
@ -654,7 +654,7 @@ func (client *Client) Backup(ctx context.Context, tablet *topo.TabletInfo, concu
close(logstream)
return
}
logstream <- logutil.ProtoToLoggerEvent(br.Event)
logstream <- br.Event
}
}()
return logstream, func() error {

Просмотреть файл

@ -451,7 +451,7 @@ func (s *server) Backup(request *tabletmanagerdatapb.BackupRequest, stream table
// has been broken. We'll just keep trying
// to send.
stream.Send(&tabletmanagerdatapb.BackupResponse{
Event: logutil.LoggerEventToProto(&e),
Event: e,
})
}

Просмотреть файл

@ -10,12 +10,12 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/hook"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/mysqlctl/tmutils"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
querypb "github.com/youtube/vitess/go/vt/proto/query"
replicationdatapb "github.com/youtube/vitess/go/vt/proto/replicationdata"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
@ -183,7 +183,7 @@ type TabletManagerClient interface {
//
// Backup creates a database backup
Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutil.LoggerEvent, ErrFunc, error)
Backup(ctx context.Context, tablet *topo.TabletInfo, concurrency int) (<-chan *logutilpb.Event, ErrFunc, error)
//
// RPC related methods

Просмотреть файл

@ -10,6 +10,8 @@ import (
"time"
"github.com/youtube/vitess/go/vt/logutil"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// FakeLoggerEventStreamingClient is the base for the fakes for the vtctlclient and vtworkerclient.
@ -50,19 +52,19 @@ func (f *FakeLoggerEventStreamingClient) RegisterResult(args []string, output st
}
// StreamResult returns a channel which streams back a registered result as logging events.
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutil.LoggerEvent, func() error, error) {
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutilpb.Event, func() error, error) {
result, ok := f.results[fromSlice(args)]
if !ok {
return nil, nil, fmt.Errorf("No response was registered for args: %v", args)
}
stream := make(chan *logutil.LoggerEvent)
stream := make(chan *logutilpb.Event)
go func() {
// Each line of the multi-line string "output" is streamed as console text.
for _, line := range strings.Split(result.output, "\n") {
stream <- &logutil.LoggerEvent{
Time: time.Now(),
Level: logutil.LOGGER_CONSOLE,
stream <- &logutilpb.Event{
Time: logutil.TimeToProto(time.Now()),
Level: logutilpb.Level_CONSOLE,
File: "fakevtctlclient",
Line: -1,
Value: line,

Просмотреть файл

@ -8,9 +8,10 @@ package fakevtctlclient
import (
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// FakeVtctlClient is a fake which implements the vtctlclient interface.
@ -31,7 +32,7 @@ func (f *FakeVtctlClient) FakeVtctlClientFactory(addr string, dialTimeout time.D
}
// ExecuteVtctlCommand is part of the vtctlclient interface.
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutilpb.Event, vtctlclient.ErrFunc, error) {
return f.FakeLoggerEventStreamingClient.StreamResult(args)
}

Просмотреть файл

@ -11,10 +11,11 @@ import (
rpc "github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtctl/gorpcproto"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
type goRPCVtctlClient struct {
@ -34,12 +35,12 @@ func goRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclien
// ExecuteVtctlCommand is part of the VtctlClient interface.
// Note the bson rpc version doesn't honor timeouts in the context
// (but the server side will honor the actionTimeout)
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutilpb.Event, vtctlclient.ErrFunc, error) {
req := &gorpcproto.ExecuteVtctlCommandArgs{
Args: args,
ActionTimeout: actionTimeout,
}
sr := make(chan *logutil.LoggerEvent, 10)
sr := make(chan *logutilpb.Event, 10)
c := client.rpcClient.StreamGo("VtctlServer.ExecuteVtctlCommand", req, sr)
return sr, func() error { return c.Error }, nil
}

Просмотреть файл

@ -44,7 +44,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(ctx context.Context, query *gorpcproto
// we still need to flush and finish the
// command, even if the channel to the client
// has been broken. We'll just keep trying.
sendReply(&e)
sendReply(e)
}
wg.Done()
}()

Просмотреть файл

@ -9,18 +9,18 @@ import (
"io"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "github.com/youtube/vitess/go/vt/proto/vtctldata"
pbs "github.com/youtube/vitess/go/vt/proto/vtctlservice"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
vtctldatapb "github.com/youtube/vitess/go/vt/proto/vtctldata"
vtctlservicepb "github.com/youtube/vitess/go/vt/proto/vtctlservice"
)
type gRPCVtctlClient struct {
cc *grpc.ClientConn
c pbs.VtctlClient
c vtctlservicepb.VtctlClient
}
func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient.VtctlClient, error) {
@ -29,7 +29,7 @@ func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient
if err != nil {
return nil, err
}
c := pbs.NewVtctlClient(cc)
c := vtctlservicepb.NewVtctlClient(cc)
return &gRPCVtctlClient{
cc: cc,
@ -38,8 +38,8 @@ func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient
}
// ExecuteVtctlCommand is part of the VtctlClient interface
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
query := &pb.ExecuteVtctlCommandRequest{
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutilpb.Event, vtctlclient.ErrFunc, error) {
query := &vtctldatapb.ExecuteVtctlCommandRequest{
Args: args,
ActionTimeout: int64(actionTimeout.Nanoseconds()),
}
@ -49,7 +49,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s
return nil, nil, err
}
results := make(chan *logutil.LoggerEvent, 1)
results := make(chan *logutilpb.Event, 1)
var finalError error
go func() {
for {
@ -61,7 +61,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s
close(results)
return
}
results <- logutil.ProtoToLoggerEvent(le.Event)
results <- le.Event
}
}()
return results, func() error {

Просмотреть файл

@ -52,7 +52,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(args *pb.ExecuteVtctlCommandRequest, s
// command, even if the channel to the client
// has been broken. We'll just keep trying.
stream.Send(&pb.ExecuteVtctlCommandResponse{
Event: logutil.LoggerEventToProto(&e),
Event: e,
})
}
wg.Done()

Просмотреть файл

@ -11,8 +11,9 @@ import (
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/logutil"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// vtctlClientProtocol specifices which RPC client implementation should be used.
@ -25,7 +26,7 @@ type ErrFunc func() error
type VtctlClient interface {
// ExecuteVtctlCommand will execute the command remotely
// NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races.
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc, error)
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout time.Duration) (<-chan *logutilpb.Event, ErrFunc, error)
// Close will terminate the connection. This object won't be
// used after this.

Просмотреть файл

@ -9,15 +9,17 @@ import (
"fmt"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// RunCommandAndWait executes a single command on a given vtctld and blocks until the command did return or timed out.
// Output from vtctld is streamed as logutil.LoggerEvent messages which have to be consumed by the caller who has to specify a "recv" function.
func RunCommandAndWait(ctx context.Context, server string, args []string, dialTimeout, actionTimeout time.Duration, recv func(*logutil.LoggerEvent)) error {
// Output from vtctld is streamed as logutilpb.Event messages which
// have to be consumed by the caller who has to specify a "recv" function.
func RunCommandAndWait(ctx context.Context, server string, args []string, dialTimeout, actionTimeout time.Duration, recv func(*logutilpb.Event)) error {
if recv == nil {
return errors.New("No function closure for LoggerEvent stream specified")
return errors.New("No function closure for Event stream specified")
}
// create the client
client, err := New(server, dialTimeout)

Просмотреть файл

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl/vtctlclient"
@ -64,7 +65,7 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
count := 0
for e := range logs {
expected := "cell1-0000000001 test_keyspace <null> master localhost:3333 localhost:3334 [tag: \"value\"]\n"
if e.String() != expected {
if logutil.EventString(e) != expected {
t.Errorf("Got unexpected log line '%v' expected '%v'", e.String(), expected)
}
count++

Просмотреть файл

@ -8,10 +8,12 @@ package fakevtworkerclient
import (
"time"
"github.com/youtube/vitess/go/vt/logutil"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/vtctl/fakevtctlclient"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// FakeVtworkerClient is a fake which implements the vtworkerclient interface.
@ -32,7 +34,7 @@ func (f *FakeVtworkerClient) FakeVtworkerClientFactory(addr string, dialTimeout
}
// ExecuteVtworkerCommand is part of the vtworkerclient interface.
func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) {
func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutilpb.Event, vtworkerclient.ErrFunc, error) {
return f.FakeLoggerEventStreamingClient.StreamResult(args)
}

Просмотреть файл

@ -9,18 +9,18 @@ import (
"io"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "github.com/youtube/vitess/go/vt/proto/vtworkerdata"
pbs "github.com/youtube/vitess/go/vt/proto/vtworkerservice"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
vtworkerdatapb "github.com/youtube/vitess/go/vt/proto/vtworkerdata"
vtworkerservicepb "github.com/youtube/vitess/go/vt/proto/vtworkerservice"
)
type gRPCVtworkerClient struct {
cc *grpc.ClientConn
c pbs.VtworkerClient
c vtworkerservicepb.VtworkerClient
}
func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworkerclient.VtworkerClient, error) {
@ -29,7 +29,7 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker
if err != nil {
return nil, err
}
c := pbs.NewVtworkerClient(cc)
c := vtworkerservicepb.NewVtworkerClient(cc)
return &gRPCVtworkerClient{
cc: cc,
@ -38,8 +38,8 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker
}
// ExecuteVtworkerCommand is part of the VtworkerClient interface.
func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) {
query := &pb.ExecuteVtworkerCommandRequest{
func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutilpb.Event, vtworkerclient.ErrFunc, error) {
query := &vtworkerdatapb.ExecuteVtworkerCommandRequest{
Args: args,
}
@ -48,7 +48,7 @@ func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, ar
return nil, nil, err
}
results := make(chan *logutil.LoggerEvent, 1)
results := make(chan *logutilpb.Event, 1)
var finalError error
go func() {
for {
@ -60,13 +60,7 @@ func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, ar
close(results)
return
}
results <- &logutil.LoggerEvent{
Time: time.Unix(le.Event.Time.Seconds, int64(le.Event.Time.Nanoseconds)),
Level: int(le.Event.Level),
File: le.Event.File,
Line: int(le.Event.Line),
Value: le.Event.Value,
}
results <- le.Event
}
}()
return results, func() error {

Просмотреть файл

@ -17,7 +17,6 @@ import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/worker"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
pb "github.com/youtube/vitess/go/vt/proto/vtworkerdata"
pbs "github.com/youtube/vitess/go/vt/proto/vtworkerservice"
)
@ -53,16 +52,7 @@ func (s *VtworkerServer) ExecuteVtworkerCommand(args *pb.ExecuteVtworkerCommandR
// command, even if the channel to the client
// has been broken. We'll just keep trying.
stream.Send(&pb.ExecuteVtworkerCommandResponse{
Event: &logutilpb.Event{
Time: &logutilpb.Time{
Seconds: e.Time.Unix(),
Nanoseconds: int32(e.Time.Nanosecond()),
},
Level: logutilpb.Level(e.Level),
File: e.File,
Line: int64(e.Line),
Value: e.Value,
},
Event: e,
})
}
wg.Done()

Просмотреть файл

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/worker"
"github.com/youtube/vitess/go/vt/zktopo"
@ -51,7 +52,7 @@ func commandSucceeds(t *testing.T, client VtworkerClient) {
count := 0
for e := range logs {
expected := "Ping command was called with message: 'pong'.\n"
if e.String() != expected {
if logutil.EventString(e) != expected {
t.Errorf("Got unexpected log line '%v' expected '%v'", e.String(), expected)
}
count++

Просмотреть файл

@ -11,8 +11,9 @@ import (
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/logutil"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// VtworkerClientProtocol specifices which RPC client implementation should be used.
@ -25,7 +26,7 @@ type ErrFunc func() error
type VtworkerClient interface {
// ExecuteVtworkerCommand will execute the command remotely.
// NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races.
ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, ErrFunc, error)
ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutilpb.Event, ErrFunc, error)
// Close will terminate the connection. This object won't be
// used after this.

Просмотреть файл

@ -9,15 +9,17 @@ import (
"fmt"
"time"
"github.com/youtube/vitess/go/vt/logutil"
"golang.org/x/net/context"
logutilpb "github.com/youtube/vitess/go/vt/proto/logutil"
)
// RunCommandAndWait executes a single command on a given vtworker and blocks until the command did return or timed out.
// Output from vtworker is streamed as logutil.LoggerEvent messages which have to be consumed by the caller who has to specify a "recv" function.
func RunCommandAndWait(ctx context.Context, server string, args []string, recv func(*logutil.LoggerEvent)) error {
// Output from vtworker is streamed as logutil.Event messages which
// have to be consumed by the caller who has to specify a "recv" function.
func RunCommandAndWait(ctx context.Context, server string, args []string, recv func(*logutilpb.Event)) error {
if recv == nil {
return errors.New("No function closure for LoggerEvent stream specified")
return errors.New("No function closure for Event stream specified")
}
// create the client
// TODO(mberlin): vtctlclient exposes dialTimeout as flag. If there are no use cases, remove it there as well to be consistent?

Просмотреть файл

@ -4,6 +4,8 @@
"""This is the go rpc client implementation of the vtctl client interface.
"""
import datetime
from net import bsonrpc
from vtctl import vtctl_client
@ -49,6 +51,6 @@ class GoRpcVtctlClient(vtctl_client.VtctlClient):
e = self.client.stream_next()
if e is None:
break
yield vtctl_client.Event(e.reply['Time'], e.reply['Level'],
e.reply['File'], e.reply['Line'],
e.reply['Value'])
t = datetime.datetime.utcfromtimestamp(e.reply['Time']['Seconds'])
yield vtctl_client.Event(t, e.reply['Level'], e.reply['File'],
e.reply['Line'], e.reply['Value'])

Просмотреть файл

@ -106,7 +106,7 @@ class TestVtctld(unittest.TestCase):
def _check_all_tablets(self, result):
lines = result.splitlines()
self.assertEqual(len(lines), len(tablets))
self.assertEqual(len(lines), len(tablets), 'got lines:\n%s' % lines)
line_map = {}
for line in lines:
parts = line.split()