simplify listener and callback registration
make test deterministic and fast
This commit is contained in:
Mike Solomon 2012-07-08 00:08:33 -07:00
Родитель a2837e2224
Коммит d23aadd5b4
3 изменённых файлов: 105 добавлений и 128 удалений

Просмотреть файл

@ -5,11 +5,12 @@
package umgmt
import (
"code.google.com/p/vitess/go/relog"
"errors"
"io"
"net"
"net/rpc"
"code.google.com/p/vitess/go/relog"
)
type Client struct {
@ -43,10 +44,6 @@ func (client *Client) Ping() (string, error) {
relog.Error("rpc err: %v", err)
return "ERROR", err
}
if reply.ErrorCode != 0 {
relog.Error("Ping err: %v %v", reply.ErrorCode, reply.Message)
return "ERROR", errors.New(reply.Message)
}
return reply.Message, nil
}
@ -56,7 +53,7 @@ func (client *Client) CloseListeners() error {
reply := new(Reply)
err := client.Call("UmgmtService.CloseListeners", request, reply)
if err != nil {
relog.Error("rpc err: %v", err)
relog.Error("CloseListeners err: %v", err)
return err
}
if reply.ErrorCode != 0 {
@ -74,9 +71,5 @@ func (client *Client) GracefulShutdown() error {
relog.Error("rpc err: %v", err)
return err
}
if reply.ErrorCode != 0 {
relog.Error("GracefulShutdown err: %v %v", reply.ErrorCode, reply.Message)
return errors.New(reply.Message)
}
return nil
}

Просмотреть файл

@ -11,25 +11,22 @@ file descriptor passing.
The underlying unix socket acts as a guard for starting up a server.
Once that socket has be acquired it is assumed that previously bound sockets will be
released and startup can continue. You end up delegating execution of your server
released and startup can continue. You must delegate execution of your server
initialization to this module via AddStartupCallback().
*/
package umgmt
import (
"code.google.com/p/vitess/go/relog"
"container/list"
"errors"
"fmt"
"io"
"net"
"net/rpc"
"os"
"reflect"
"sync"
"syscall"
"time"
"code.google.com/p/vitess/go/relog"
)
const (
@ -46,47 +43,59 @@ type Reply struct {
Message string
}
type UmgmtListener interface {
Close() error
Addr() net.Addr
}
type UmgmtCallback func()
type UmgmtService struct {
mutex sync.Mutex
// FIXME(msolomon) maybe this is just better as map[*net.Listener] bool
listeners list.List
startupCallbacks list.List
shutdownCallbacks list.List
closeCallbacks []StartupCallback
mutex sync.Mutex
listeners []UmgmtListener
startupCallbacks []UmgmtCallback
shutdownCallbacks []UmgmtCallback
closeCallbacks []UmgmtCallback
done chan bool
}
func newService() *UmgmtService {
return &UmgmtService{
listeners: make([]UmgmtListener, 0, 8),
startupCallbacks: make([]UmgmtCallback, 0, 8),
shutdownCallbacks: make([]UmgmtCallback, 0, 8),
closeCallbacks: make([]UmgmtCallback, 0, 8),
done: make(chan bool, 1)}
}
// FIXME(msolomon) seems like RPC should really be registering an interface and something
// that happens to implement it. This might help client-side type safety too.
// type UmgmtService2 interface {
// Ping(request *Request, reply *Reply) os.Error
// }
func (service *UmgmtService) addListener(l io.Closer) {
func (service *UmgmtService) addListener(l UmgmtListener) {
service.mutex.Lock()
defer service.mutex.Unlock()
service.listeners.PushBack(l)
service.listeners = append(service.listeners, l)
}
type StartupCallback func()
type ShutdownCallback func() error
func (service *UmgmtService) addStartupCallback(f StartupCallback) {
func (service *UmgmtService) addStartupCallback(f UmgmtCallback) {
service.mutex.Lock()
defer service.mutex.Unlock()
service.startupCallbacks.PushBack(f)
service.startupCallbacks = append(service.startupCallbacks, f)
}
func (service *UmgmtService) addCloseCallback(f StartupCallback) {
func (service *UmgmtService) addCloseCallback(f UmgmtCallback) {
service.mutex.Lock()
defer service.mutex.Unlock()
service.closeCallbacks = append(service.closeCallbacks, f)
}
func (service *UmgmtService) addShutdownCallback(f ShutdownCallback) {
func (service *UmgmtService) addShutdownCallback(f UmgmtCallback) {
service.mutex.Lock()
defer service.mutex.Unlock()
service.shutdownCallbacks.PushBack(f)
service.shutdownCallbacks = append(service.shutdownCallbacks, f)
}
func (service *UmgmtService) Ping(request *Request, reply *Reply) error {
@ -98,11 +107,33 @@ func (service *UmgmtService) Ping(request *Request, reply *Reply) error {
func (service *UmgmtService) CloseListeners(request *Request, reply *Reply) (err error) {
// NOTE(msolomon) block this method because we assume that when it returns to the client
// that there is a very high chance that the listeners have actually closed.
// FIXME(msolomon) use normal error handling
closeErr := service.closeListeners()
if closeErr != nil {
reply.ErrorCode = CloseFailed
reply.Message = closeErr.Error()
}
return closeErr
}
func (service *UmgmtService) closeListeners() (err error) {
service.mutex.Lock()
defer service.mutex.Unlock()
for _, l := range service.listeners {
addr := l.Addr()
closeErr := l.Close()
if closeErr != nil {
err := fmt.Errorf("failed to close listener on %v err:%v", addr, closeErr)
// just return that at least one error happened, the log will reveal the rest
relog.Error("%s", err)
}
relog.Info("closed listener %v", addr)
}
for _, f := range service.closeCallbacks {
go f()
}
// Prevent duplicate execution.
service.listeners = service.listeners[:0]
return
}
@ -113,45 +144,15 @@ func (service *UmgmtService) GracefulShutdown(request *Request, reply *Reply) (e
return
}
func (service *UmgmtService) closeListeners() (err error) {
service.mutex.Lock()
defer service.mutex.Unlock()
for e := service.listeners.Front(); e != nil; e = e.Next() {
// NOTE(msolomon) we don't need the whole Listener interface, just Closer
//relog.Info("closeListeners %T %v", _listener, _listener)
if listener, ok := e.Value.(io.Closer); ok {
closeErr := listener.Close()
if closeErr != nil {
errMsg := fmt.Sprintf("failed to close listener:%v err:%v", listener, closeErr)
// just return that at least one error happened, the log will reveal the rest
err = errors.New(errMsg)
relog.Error("%s", errMsg)
}
// FIXME(msolomon) add a meaningful message telling what listener was closed
} else {
relog.Error("bad listener %T %v", listener, listener)
}
}
for _, f := range service.closeCallbacks {
go f()
}
return
}
func (service *UmgmtService) gracefulShutdown() {
service.mutex.Lock()
defer func() { service.done <- true }()
defer service.mutex.Unlock()
for e := service.shutdownCallbacks.Front(); e != nil; e = e.Next() {
if callback, ok := e.Value.(ShutdownCallback); ok {
callbackErr := callback()
if callbackErr != nil {
relog.Error("failed running shutdown callback:%v err:%v", callback, callbackErr)
}
} else {
relog.Error("bad callback %T %v", callback, callback)
}
for _, f := range service.shutdownCallbacks {
f()
}
// Prevent duplicate execution.
service.shutdownCallbacks = service.shutdownCallbacks[:0]
}
func SetLameDuckPeriod(f float32) {
@ -169,6 +170,8 @@ func SigTermHandler(signal os.Signal) {
defaultService.gracefulShutdown()
}
type UmgmtServer struct {
sync.Mutex
quit bool
@ -209,6 +212,10 @@ func (server *UmgmtServer) Serve() error {
return nil
}
func (server *UmgmtServer) Addr() net.Addr {
return server.listener.Addr()
}
func (server *UmgmtServer) Close() (err error) {
server.Lock()
defer server.Unlock()
@ -236,18 +243,12 @@ func (server *UmgmtServer) handleGracefulShutdown() error {
return nil
}
var defaultService UmgmtService
var DefaultServer *UmgmtServer
func init() {
defaultService.done = make(chan bool, 1)
}
var defaultService = newService()
func ListenAndServe(addr string) error {
rpc.Register(&defaultService)
DefaultServer = new(UmgmtServer)
DefaultServer.connMap = make(map[net.Conn]bool)
defer DefaultServer.Close()
rpc.Register(defaultService)
server := &UmgmtServer{connMap: make(map[net.Conn]bool)}
defer server.Close()
var umgmtClient *Client
@ -283,28 +284,24 @@ func ListenAndServe(addr string) error {
return e
}
} else {
DefaultServer.listener = l
server.listener = l
break
}
}
if DefaultServer.listener == nil {
if server.listener == nil {
panic("unable to rebind umgmt socket")
}
// register the umgmt server itself for dropping - this seems like
// the common case. i can't see when you *wouldn't* want to drop yourself
defaultService.addListener(DefaultServer)
defaultService.addShutdownCallback(func() error {
return DefaultServer.handleGracefulShutdown()
defaultService.addListener(server)
defaultService.addShutdownCallback(func() {
server.handleGracefulShutdown()
})
// fire off the startup callbacks. if these bind ports, they should
// call AddListener.
for e := defaultService.startupCallbacks.Front(); e != nil; e = e.Next() {
if startupCallback, ok := e.Value.(StartupCallback); ok {
startupCallback()
} else {
relog.Error("bad callback %T %v", e.Value, e.Value)
}
for _, f := range defaultService.startupCallbacks {
f()
}
if umgmtClient != nil {
@ -314,7 +311,7 @@ func ListenAndServe(addr string) error {
umgmtClient.Close()
}()
}
err := DefaultServer.Serve()
err := server.Serve()
// If we exitted gracefully, wait for the service to finish callbacks.
if err == nil {
<-defaultService.done
@ -322,50 +319,33 @@ func ListenAndServe(addr string) error {
return err
}
func AddListener(listener io.Closer) {
func AddListener(listener UmgmtListener) {
defaultService.addListener(listener)
}
func AddShutdownCallback(f ShutdownCallback) {
func AddShutdownCallback(f UmgmtCallback) {
defaultService.addShutdownCallback(f)
}
func AddStartupCallback(f StartupCallback) {
func AddStartupCallback(f UmgmtCallback) {
defaultService.addStartupCallback(f)
}
func AddCloseCallback(f StartupCallback) {
func AddCloseCallback(f UmgmtCallback) {
defaultService.addCloseCallback(f)
}
type WrappedError interface {
Err() error
}
// this is a temporary hack around a few different ways of wrapping
// error codes coming out of the system libraries
func checkError(err, testErr error) bool {
//relog.Error("checkError %T(%v) == %T(%v)", err, err, testErr, testErr)
if wrappedError, ok := err.(WrappedError); ok {
return checkError(wrappedError.Err(), testErr)
if err == testErr {
return true
}
errVal := getField(err, "Err")
if errVal != nil {
if osErr, ok := errVal.(error); ok {
return checkError(osErr, testErr)
}
}
return err == testErr
}
func getField(o interface{}, fieldName string) interface{} {
val := reflect.Indirect(reflect.ValueOf(o))
if val.Kind() == reflect.Struct {
fieldVal := reflect.Indirect(val.FieldByName(fieldName))
if !fieldVal.IsValid() {
return nil
}
return fieldVal.Interface()
if opErr, ok := err.(*net.OpError); ok {
return opErr.Err == testErr
}
return nil
return false
}

Просмотреть файл

@ -5,46 +5,50 @@
package umgmt
import (
"code.google.com/p/vitess/go/relog"
"testing"
"time"
)
func serve() {
AddShutdownCallback(ShutdownCallback(func() error { relog.Error("testserver GracefulShutdown callback"); return nil }))
var ready = make(chan bool)
func serve(t *testing.T) {
AddStartupCallback(func() { ready <- true })
AddShutdownCallback(func() { t.Log("test server GracefulShutdown callback") })
err := ListenAndServe("/tmp/test-sock")
if err != nil {
relog.Fatal("listen err:%v", err)
t.Fatalf("listen err: %v", err)
}
relog.Info("test server finished")
t.Log("test server finished")
}
func TestUmgmt(t *testing.T) {
go serve()
time.Sleep(1e9)
go serve(t)
<-ready
client, err := Dial("/tmp/test-sock")
if err != nil {
t.Fatalf("can't connect %v", err)
}
request := new(Request)
reply := new(Reply)
reply := new(Reply)
callErr := client.Call("UmgmtService.Ping", request, reply)
if callErr != nil {
t.Fatalf("callErr: %v", callErr)
}
relog.Info("reply: %v", reply.Message)
t.Logf("Ping reply: %v", reply.Message)
reply = new(Reply)
callErr = client.Call("UmgmtService.CloseListeners", reply, reply)
if callErr != nil {
t.Fatalf("callErr: %v", callErr)
}
relog.Info("reply: %v", reply.Message)
time.Sleep(5e9)
t.Logf("CloseListeners reply: %v", reply.Message)
reply = new(Reply)
callErr = client.Call("UmgmtService.GracefulShutdown", reply, reply)
if callErr != nil {
t.Fatalf("callErr: %v", callErr)
}
relog.Info("reply: %v", reply.Message)
t.Logf("GracefulShutdown reply: %v", reply.Message)
}