diff --git a/go/umgmt/client.go b/go/umgmt/client.go index 2136464c89..851b3f8642 100644 --- a/go/umgmt/client.go +++ b/go/umgmt/client.go @@ -5,11 +5,12 @@ package umgmt import ( - "code.google.com/p/vitess/go/relog" "errors" "io" "net" "net/rpc" + + "code.google.com/p/vitess/go/relog" ) type Client struct { @@ -43,10 +44,6 @@ func (client *Client) Ping() (string, error) { relog.Error("rpc err: %v", err) return "ERROR", err } - if reply.ErrorCode != 0 { - relog.Error("Ping err: %v %v", reply.ErrorCode, reply.Message) - return "ERROR", errors.New(reply.Message) - } return reply.Message, nil } @@ -56,7 +53,7 @@ func (client *Client) CloseListeners() error { reply := new(Reply) err := client.Call("UmgmtService.CloseListeners", request, reply) if err != nil { - relog.Error("rpc err: %v", err) + relog.Error("CloseListeners err: %v", err) return err } if reply.ErrorCode != 0 { @@ -74,9 +71,5 @@ func (client *Client) GracefulShutdown() error { relog.Error("rpc err: %v", err) return err } - if reply.ErrorCode != 0 { - relog.Error("GracefulShutdown err: %v %v", reply.ErrorCode, reply.Message) - return errors.New(reply.Message) - } return nil } diff --git a/go/umgmt/server.go b/go/umgmt/server.go index 0dac2ca35b..a09cb1226c 100644 --- a/go/umgmt/server.go +++ b/go/umgmt/server.go @@ -11,25 +11,22 @@ file descriptor passing. The underlying unix socket acts as a guard for starting up a server. Once that socket has be acquired it is assumed that previously bound sockets will be -released and startup can continue. You end up delegating execution of your server +released and startup can continue. You must delegate execution of your server initialization to this module via AddStartupCallback(). */ package umgmt import ( - "code.google.com/p/vitess/go/relog" - "container/list" - "errors" "fmt" - "io" "net" "net/rpc" "os" - "reflect" "sync" "syscall" "time" + + "code.google.com/p/vitess/go/relog" ) const ( @@ -46,47 +43,59 @@ type Reply struct { Message string } +type UmgmtListener interface { + Close() error + Addr() net.Addr +} + +type UmgmtCallback func() + type UmgmtService struct { - mutex sync.Mutex - // FIXME(msolomon) maybe this is just better as map[*net.Listener] bool - listeners list.List - startupCallbacks list.List - shutdownCallbacks list.List - closeCallbacks []StartupCallback + mutex sync.Mutex + listeners []UmgmtListener + startupCallbacks []UmgmtCallback + shutdownCallbacks []UmgmtCallback + closeCallbacks []UmgmtCallback done chan bool } +func newService() *UmgmtService { + return &UmgmtService{ + listeners: make([]UmgmtListener, 0, 8), + startupCallbacks: make([]UmgmtCallback, 0, 8), + shutdownCallbacks: make([]UmgmtCallback, 0, 8), + closeCallbacks: make([]UmgmtCallback, 0, 8), + done: make(chan bool, 1)} +} + // FIXME(msolomon) seems like RPC should really be registering an interface and something // that happens to implement it. This might help client-side type safety too. // type UmgmtService2 interface { // Ping(request *Request, reply *Reply) os.Error // } -func (service *UmgmtService) addListener(l io.Closer) { +func (service *UmgmtService) addListener(l UmgmtListener) { service.mutex.Lock() defer service.mutex.Unlock() - service.listeners.PushBack(l) + service.listeners = append(service.listeners, l) } -type StartupCallback func() -type ShutdownCallback func() error - -func (service *UmgmtService) addStartupCallback(f StartupCallback) { +func (service *UmgmtService) addStartupCallback(f UmgmtCallback) { service.mutex.Lock() defer service.mutex.Unlock() - service.startupCallbacks.PushBack(f) + service.startupCallbacks = append(service.startupCallbacks, f) } -func (service *UmgmtService) addCloseCallback(f StartupCallback) { +func (service *UmgmtService) addCloseCallback(f UmgmtCallback) { service.mutex.Lock() defer service.mutex.Unlock() service.closeCallbacks = append(service.closeCallbacks, f) } -func (service *UmgmtService) addShutdownCallback(f ShutdownCallback) { +func (service *UmgmtService) addShutdownCallback(f UmgmtCallback) { service.mutex.Lock() defer service.mutex.Unlock() - service.shutdownCallbacks.PushBack(f) + service.shutdownCallbacks = append(service.shutdownCallbacks, f) } func (service *UmgmtService) Ping(request *Request, reply *Reply) error { @@ -98,11 +107,33 @@ func (service *UmgmtService) Ping(request *Request, reply *Reply) error { func (service *UmgmtService) CloseListeners(request *Request, reply *Reply) (err error) { // NOTE(msolomon) block this method because we assume that when it returns to the client // that there is a very high chance that the listeners have actually closed. + // FIXME(msolomon) use normal error handling closeErr := service.closeListeners() if closeErr != nil { reply.ErrorCode = CloseFailed reply.Message = closeErr.Error() } + return closeErr +} + +func (service *UmgmtService) closeListeners() (err error) { + service.mutex.Lock() + defer service.mutex.Unlock() + for _, l := range service.listeners { + addr := l.Addr() + closeErr := l.Close() + if closeErr != nil { + err := fmt.Errorf("failed to close listener on %v err:%v", addr, closeErr) + // just return that at least one error happened, the log will reveal the rest + relog.Error("%s", err) + } + relog.Info("closed listener %v", addr) + } + for _, f := range service.closeCallbacks { + go f() + } + // Prevent duplicate execution. + service.listeners = service.listeners[:0] return } @@ -113,45 +144,15 @@ func (service *UmgmtService) GracefulShutdown(request *Request, reply *Reply) (e return } -func (service *UmgmtService) closeListeners() (err error) { - service.mutex.Lock() - defer service.mutex.Unlock() - for e := service.listeners.Front(); e != nil; e = e.Next() { - // NOTE(msolomon) we don't need the whole Listener interface, just Closer - //relog.Info("closeListeners %T %v", _listener, _listener) - if listener, ok := e.Value.(io.Closer); ok { - closeErr := listener.Close() - if closeErr != nil { - errMsg := fmt.Sprintf("failed to close listener:%v err:%v", listener, closeErr) - // just return that at least one error happened, the log will reveal the rest - err = errors.New(errMsg) - relog.Error("%s", errMsg) - } - // FIXME(msolomon) add a meaningful message telling what listener was closed - } else { - relog.Error("bad listener %T %v", listener, listener) - } - } - for _, f := range service.closeCallbacks { - go f() - } - return -} - func (service *UmgmtService) gracefulShutdown() { service.mutex.Lock() defer func() { service.done <- true }() defer service.mutex.Unlock() - for e := service.shutdownCallbacks.Front(); e != nil; e = e.Next() { - if callback, ok := e.Value.(ShutdownCallback); ok { - callbackErr := callback() - if callbackErr != nil { - relog.Error("failed running shutdown callback:%v err:%v", callback, callbackErr) - } - } else { - relog.Error("bad callback %T %v", callback, callback) - } + for _, f := range service.shutdownCallbacks { + f() } + // Prevent duplicate execution. + service.shutdownCallbacks = service.shutdownCallbacks[:0] } func SetLameDuckPeriod(f float32) { @@ -169,6 +170,8 @@ func SigTermHandler(signal os.Signal) { defaultService.gracefulShutdown() } + + type UmgmtServer struct { sync.Mutex quit bool @@ -209,6 +212,10 @@ func (server *UmgmtServer) Serve() error { return nil } +func (server *UmgmtServer) Addr() net.Addr { + return server.listener.Addr() +} + func (server *UmgmtServer) Close() (err error) { server.Lock() defer server.Unlock() @@ -236,18 +243,12 @@ func (server *UmgmtServer) handleGracefulShutdown() error { return nil } -var defaultService UmgmtService -var DefaultServer *UmgmtServer - -func init() { - defaultService.done = make(chan bool, 1) -} +var defaultService = newService() func ListenAndServe(addr string) error { - rpc.Register(&defaultService) - DefaultServer = new(UmgmtServer) - DefaultServer.connMap = make(map[net.Conn]bool) - defer DefaultServer.Close() + rpc.Register(defaultService) + server := &UmgmtServer{connMap: make(map[net.Conn]bool)} + defer server.Close() var umgmtClient *Client @@ -283,28 +284,24 @@ func ListenAndServe(addr string) error { return e } } else { - DefaultServer.listener = l + server.listener = l break } } - if DefaultServer.listener == nil { + if server.listener == nil { panic("unable to rebind umgmt socket") } // register the umgmt server itself for dropping - this seems like // the common case. i can't see when you *wouldn't* want to drop yourself - defaultService.addListener(DefaultServer) - defaultService.addShutdownCallback(func() error { - return DefaultServer.handleGracefulShutdown() + defaultService.addListener(server) + defaultService.addShutdownCallback(func() { + server.handleGracefulShutdown() }) // fire off the startup callbacks. if these bind ports, they should // call AddListener. - for e := defaultService.startupCallbacks.Front(); e != nil; e = e.Next() { - if startupCallback, ok := e.Value.(StartupCallback); ok { - startupCallback() - } else { - relog.Error("bad callback %T %v", e.Value, e.Value) - } + for _, f := range defaultService.startupCallbacks { + f() } if umgmtClient != nil { @@ -314,7 +311,7 @@ func ListenAndServe(addr string) error { umgmtClient.Close() }() } - err := DefaultServer.Serve() + err := server.Serve() // If we exitted gracefully, wait for the service to finish callbacks. if err == nil { <-defaultService.done @@ -322,50 +319,33 @@ func ListenAndServe(addr string) error { return err } -func AddListener(listener io.Closer) { +func AddListener(listener UmgmtListener) { defaultService.addListener(listener) } -func AddShutdownCallback(f ShutdownCallback) { +func AddShutdownCallback(f UmgmtCallback) { defaultService.addShutdownCallback(f) } -func AddStartupCallback(f StartupCallback) { +func AddStartupCallback(f UmgmtCallback) { defaultService.addStartupCallback(f) } -func AddCloseCallback(f StartupCallback) { +func AddCloseCallback(f UmgmtCallback) { defaultService.addCloseCallback(f) } -type WrappedError interface { - Err() error -} - // this is a temporary hack around a few different ways of wrapping // error codes coming out of the system libraries func checkError(err, testErr error) bool { //relog.Error("checkError %T(%v) == %T(%v)", err, err, testErr, testErr) - if wrappedError, ok := err.(WrappedError); ok { - return checkError(wrappedError.Err(), testErr) + if err == testErr { + return true } - errVal := getField(err, "Err") - if errVal != nil { - if osErr, ok := errVal.(error); ok { - return checkError(osErr, testErr) - } - } - return err == testErr -} -func getField(o interface{}, fieldName string) interface{} { - val := reflect.Indirect(reflect.ValueOf(o)) - if val.Kind() == reflect.Struct { - fieldVal := reflect.Indirect(val.FieldByName(fieldName)) - if !fieldVal.IsValid() { - return nil - } - return fieldVal.Interface() + if opErr, ok := err.(*net.OpError); ok { + return opErr.Err == testErr } - return nil + + return false } diff --git a/go/umgmt/umgmt_test.go b/go/umgmt/umgmt_test.go index 9c18ed4d25..6f2464e9af 100644 --- a/go/umgmt/umgmt_test.go +++ b/go/umgmt/umgmt_test.go @@ -5,46 +5,50 @@ package umgmt import ( - "code.google.com/p/vitess/go/relog" "testing" - "time" ) -func serve() { - AddShutdownCallback(ShutdownCallback(func() error { relog.Error("testserver GracefulShutdown callback"); return nil })) +var ready = make(chan bool) + +func serve(t *testing.T) { + AddStartupCallback(func() { ready <- true }) + AddShutdownCallback(func() { t.Log("test server GracefulShutdown callback") }) err := ListenAndServe("/tmp/test-sock") if err != nil { - relog.Fatal("listen err:%v", err) + t.Fatalf("listen err: %v", err) } - relog.Info("test server finished") + t.Log("test server finished") } func TestUmgmt(t *testing.T) { - go serve() - time.Sleep(1e9) + go serve(t) + <-ready client, err := Dial("/tmp/test-sock") if err != nil { t.Fatalf("can't connect %v", err) } request := new(Request) - reply := new(Reply) + reply := new(Reply) callErr := client.Call("UmgmtService.Ping", request, reply) if callErr != nil { t.Fatalf("callErr: %v", callErr) } - relog.Info("reply: %v", reply.Message) + t.Logf("Ping reply: %v", reply.Message) + reply = new(Reply) callErr = client.Call("UmgmtService.CloseListeners", reply, reply) if callErr != nil { t.Fatalf("callErr: %v", callErr) } - relog.Info("reply: %v", reply.Message) - time.Sleep(5e9) + t.Logf("CloseListeners reply: %v", reply.Message) + + + reply = new(Reply) callErr = client.Call("UmgmtService.GracefulShutdown", reply, reply) if callErr != nil { t.Fatalf("callErr: %v", callErr) } - relog.Info("reply: %v", reply.Message) + t.Logf("GracefulShutdown reply: %v", reply.Message) }