Add ServiceContext to isolate internal and external ServiceManager APIs.

This commit is contained in:
Anthony Yeh 2014-08-13 14:21:46 -07:00
Родитель 1414dc22f2
Коммит 6b8d690a7c
9 изменённых файлов: 97 добавлений и 82 удалений

Просмотреть файл

@ -39,14 +39,14 @@ func TestHTTP(t *testing.T) {
lastValue := sync2.AtomicString{}
svm := sync2.ServiceManager{}
svm.Go(func(_ *sync2.ServiceManager) {
svm.Go(func(svc *sync2.ServiceContext) {
resp, err := http.Get(fmt.Sprintf("http://%s/log", addr))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
buf := make([]byte, 100)
for svm.State() == sync2.SERVICE_RUNNING {
for svc.IsRunning() {
n, err := resp.Body.Read(buf)
if err != nil {
t.Fatal(err)
@ -91,10 +91,10 @@ func TestChannel(t *testing.T) {
lastValue := sync2.AtomicString{}
svm := sync2.ServiceManager{}
svm.Go(func(_ *sync2.ServiceManager) {
svm.Go(func(svc *sync2.ServiceContext) {
ch := logger.Subscribe()
defer logger.Unsubscribe(ch)
for svm.State() == sync2.SERVICE_RUNNING {
for svc.IsRunning() {
lastValue.Set((<-ch).(*logMessage).Format(nil))
}
})

Просмотреть файл

@ -32,13 +32,17 @@ type ServiceManager struct {
}
// Go tries to change the state from SERVICE_STOPPED to SERVICE_RUNNING.
// If the current state is not SERVICE_STOPPED (already running),
// it returns false immediately.
// On successful transition, it launches the service as a goroutine and returns true.
// The service func is required to regularly check the state of the service manager.
// If the state is not SERVICE_RUNNING, it must treat it as end of service and return.
//
// If the current state is not SERVICE_STOPPED (already running), it returns
// false immediately.
//
// On successful transition, it launches the service as a goroutine and returns
// true. The service function is responsible for returning on its own when
// requested, either by regularly checking svc.IsRunning(), or by waiting for
// the svc.ShuttingDown channel to be closed.
//
// When the service func returns, the state is reverted to SERVICE_STOPPED.
func (svm *ServiceManager) Go(service func(svm *ServiceManager)) bool {
func (svm *ServiceManager) Go(service func(svc *ServiceContext)) bool {
svm.mu.Lock()
defer svm.mu.Unlock()
if !svm.state.CompareAndSwap(SERVICE_STOPPED, SERVICE_RUNNING) {
@ -47,7 +51,7 @@ func (svm *ServiceManager) Go(service func(svm *ServiceManager)) bool {
svm.wg.Add(1)
svm.shutdown = make(chan struct{})
go func() {
service(svm)
service(&ServiceContext{ShuttingDown: svm.shutdown})
svm.state.Set(SERVICE_STOPPED)
svm.wg.Done()
}()
@ -57,7 +61,7 @@ func (svm *ServiceManager) Go(service func(svm *ServiceManager)) bool {
// Stop tries to change the state from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN.
// If the current state is not SERVICE_RUNNING, it returns false immediately.
// On successul transition, it waits for the service to finish, and returns true.
// You are allowed to 'Go' again after a Stop.
// You are allowed to Go() again after a Stop().
func (svm *ServiceManager) Stop() bool {
svm.mu.Lock()
defer svm.mu.Unlock()
@ -66,23 +70,11 @@ func (svm *ServiceManager) Stop() bool {
}
// Signal the service that we've transitioned to SERVICE_SHUTTING_DOWN.
close(svm.shutdown)
svm.wg.Wait()
svm.shutdown = nil
svm.wg.Wait()
return true
}
// ShuttingDown returns a channel that the service can select on to be notified
// when it should shut down. The channel is closed when the state transitions
// from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN.
func (svm *ServiceManager) ShuttingDown() chan struct{} {
return svm.shutdown
}
// IsRunning returns true if the state is SERVICE_RUNNING.
func (svm *ServiceManager) IsRunning() bool {
return svm.state.Get() == SERVICE_RUNNING
}
// Wait waits for the service to terminate if it's currently running.
func (svm *ServiceManager) Wait() {
svm.wg.Wait()
@ -98,3 +90,23 @@ func (svm *ServiceManager) State() int64 {
func (svm *ServiceManager) StateName() string {
return stateNames[svm.State()]
}
// ServiceContext is passed into the service function to give it access to
// information about the running service.
type ServiceContext struct {
// ShuttingDown is a channel that the service can select on to be notified
// when it should shut down. The channel is closed when the state transitions
// from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN.
ShuttingDown chan struct{}
}
// IsRunning returns true if the ServiceContext.ShuttingDown channel has not
// been closed yet.
func (svc *ServiceContext) IsRunning() bool {
select {
case <-svc.ShuttingDown:
return false
default:
return true
}
}

Просмотреть файл

@ -14,11 +14,11 @@ type testService struct {
t *testing.T
}
func (ts *testService) service(svm *ServiceManager) {
func (ts *testService) service(svc *ServiceContext) {
if !ts.activated.CompareAndSwap(0, 1) {
ts.t.Fatalf("service called more than once")
}
for svm.IsRunning() {
for svc.IsRunning() {
time.Sleep(10 * time.Millisecond)
}
@ -27,16 +27,16 @@ func (ts *testService) service(svm *ServiceManager) {
}
}
func (ts *testService) selectService(svm *ServiceManager) {
func (ts *testService) selectService(svc *ServiceContext) {
if !ts.activated.CompareAndSwap(0, 1) {
ts.t.Fatalf("service called more than once")
}
serviceLoop:
for svm.IsRunning() {
for svc.IsRunning() {
select {
case <-time.After(1 * time.Second):
ts.t.Errorf("service didn't stop when shutdown channel was closed")
case <-svm.ShuttingDown():
case <-svc.ShuttingDown:
break serviceLoop
}
}

Просмотреть файл

@ -47,11 +47,11 @@ func (bls *binlogConnStreamer) Stream(gtid myproto.GTID, sendTransaction sendTra
bls.startPos = gtid
// Launch using service manager so we can stop this as needed.
bls.svm.Go(func(svm *sync2.ServiceManager) {
bls.svm.Go(func(svc *sync2.ServiceContext) {
var events <-chan proto.BinlogEvent
// Keep reconnecting and restarting stream unless we've been told to stop.
for svm.IsRunning() {
for svc.IsRunning() {
if bls.conn, err = mysqlctl.NewSlaveConnection(bls.mysqld); err != nil {
return
}
@ -60,7 +60,9 @@ func (bls *binlogConnStreamer) Stream(gtid myproto.GTID, sendTransaction sendTra
bls.conn.Close()
return
}
if err = bls.parseEvents(events, sendTransaction); err != nil {
// parseEvents will loop until the events channel is closed, the
// service enters the SHUTTING_DOWN state, or an error occurs.
if err = bls.parseEvents(svc, events, sendTransaction); err != nil {
bls.conn.Close()
return
}
@ -87,8 +89,9 @@ func (bls *binlogConnStreamer) Stop() {
}
// parseEvents processes the raw binlog dump stream from the server, one event
// at a time, and groups them into transactions.
func (bls *binlogConnStreamer) parseEvents(events <-chan proto.BinlogEvent, sendTransaction sendTransactionFunc) (err error) {
// at a time, and groups them into transactions. It is called from within the
// service function launched by Stream().
func (bls *binlogConnStreamer) parseEvents(svc *sync2.ServiceContext, events <-chan proto.BinlogEvent, sendTransaction sendTransactionFunc) (err error) {
var statements []proto.Statement
var timestamp int64
var format proto.BinlogFormat
@ -112,7 +115,7 @@ func (bls *binlogConnStreamer) parseEvents(events <-chan proto.BinlogEvent, send
}
// Parse events.
for bls.svm.IsRunning() {
for svc.IsRunning() {
var ev proto.BinlogEvent
var ok bool
@ -122,7 +125,7 @@ func (bls *binlogConnStreamer) parseEvents(events <-chan proto.BinlogEvent, send
log.Infof("reached end of binlog event stream")
return nil
}
case <-bls.svm.ShuttingDown():
case <-svc.ShuttingDown:
log.Infof("stopping early due to BinlogStreamer service shutdown")
return nil
}

Просмотреть файл

@ -68,8 +68,8 @@ func TestBinlogConnStreamerParseEventsXID(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -111,8 +111,8 @@ func TestBinlogConnStreamerParseEventsCommit(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -133,8 +133,8 @@ func TestBinlogConnStreamerStop(t *testing.T) {
}
// Start parseEvents(), but don't send it anything, so it just waits.
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -171,8 +171,8 @@ func TestBinlogConnStreamerParseEventsSendEOF(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -202,8 +202,8 @@ func TestBinlogConnStreamerParseEventsSendErrorXID(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -233,8 +233,8 @@ func TestBinlogConnStreamerParseEventsSendErrorCommit(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -266,8 +266,8 @@ func TestBinlogConnStreamerParseEventsInvalid(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -301,8 +301,8 @@ func TestBinlogConnStreamerParseEventsInvalidFormat(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -332,8 +332,8 @@ func TestBinlogConnStreamerParseEventsNoFormat(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -367,8 +367,8 @@ func TestBinlogConnStreamerParseEventsInvalidQuery(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return
@ -414,8 +414,8 @@ func TestBinlogConnStreamerParseEventsRollback(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -467,8 +467,8 @@ func TestBinlogConnStreamerParseEventsCreate(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -512,8 +512,8 @@ func TestBinlogConnStreamerParseEventsSetInsertID(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -548,8 +548,8 @@ func TestBinlogConnStreamerParseEventsInvalidIntVar(t *testing.T) {
}
go sendTestEvents(events, input)
bls.svm.Go(func(svm *sync2.ServiceManager) {
err := bls.parseEvents(events, sendTransaction)
bls.svm.Go(func(svc *sync2.ServiceContext) {
err := bls.parseEvents(svc, events, sendTransaction)
if err == nil {
t.Errorf("expected error, got none")
return

Просмотреть файл

@ -95,12 +95,12 @@ func (bls *binlogFileStreamer) streamFilePos(file string, pos int64, sendTransac
defer bls.file.Close()
// Launch using service manager so we can stop this as needed.
bls.svm.Go(func(_ *sync2.ServiceManager) {
bls.svm.Go(func(svc *sync2.ServiceContext) {
for {
if err = bls.run(sendTransaction); err != nil {
if err = bls.run(svc, sendTransaction); err != nil {
return
}
if err = bls.file.WaitForChange(&bls.svm); err != nil {
if err = bls.file.WaitForChange(svc); err != nil {
return
}
}
@ -134,14 +134,14 @@ func (bls *binlogFileStreamer) Stop() {
// run launches mysqlbinlog and starts the stream. It takes care of
// cleaning up the process when streaming returns.
func (bls *binlogFileStreamer) run(sendTransaction sendTransactionFunc) (err error) {
func (bls *binlogFileStreamer) run(svc *sync2.ServiceContext, sendTransaction sendTransactionFunc) (err error) {
mbl := &MysqlBinlog{}
reader, err := mbl.Launch(bls.dbname, bls.file.name, bls.file.pos)
if err != nil {
return fmt.Errorf("launch error: %v", err)
}
defer reader.Close()
err = bls.parseEvents(sendTransaction, reader)
err = bls.parseEvents(svc, sendTransaction, reader)
// Always kill because we don't read from reader all the way to EOF.
// If we wait, we may deadlock.
mbl.Kill()
@ -149,13 +149,13 @@ func (bls *binlogFileStreamer) run(sendTransaction sendTransactionFunc) (err err
}
// parseEvents parses events and transmits them as transactions for the current mysqlbinlog stream.
func (bls *binlogFileStreamer) parseEvents(sendTransaction sendTransactionFunc, reader io.Reader) (err error) {
func (bls *binlogFileStreamer) parseEvents(svc *sync2.ServiceContext, sendTransaction sendTransactionFunc, reader io.Reader) (err error) {
bls.delim = DEFAULT_DELIM
bufReader := bufio.NewReader(reader)
var statements []proto.Statement
var timestamp int64
for {
sql, err := bls.nextStatement(bufReader)
sql, err := bls.nextStatement(svc, bufReader)
if sql == nil {
return err
}
@ -203,11 +203,11 @@ func (bls *binlogFileStreamer) parseEvents(sendTransaction sendTransactionFunc,
// positional comments, it updates the binlogFileStreamer state. It also ignores events that
// are not material. If it returns nil, it's the end of stream. If err is also nil, then
// it was due to a normal termination.
func (bls *binlogFileStreamer) nextStatement(bufReader *bufio.Reader) (stmt []byte, err error) {
func (bls *binlogFileStreamer) nextStatement(svc *sync2.ServiceContext, bufReader *bufio.Reader) (stmt []byte, err error) {
eventLoop:
for {
// Stop processing if we're shutting down
if bls.svm.State() != sync2.SERVICE_RUNNING {
if !svc.IsRunning() {
return nil, io.EOF
}
event, err := bls.readEvent(bufReader)
@ -345,10 +345,10 @@ func (f *fileInfo) Save() {
}
}
func (f *fileInfo) WaitForChange(svm *sync2.ServiceManager) error {
func (f *fileInfo) WaitForChange(svc *sync2.ServiceContext) error {
for {
// Stop waiting if we're shutting down
if svm.State() != sync2.SERVICE_RUNNING {
if !svc.IsRunning() {
return io.EOF
}
time.Sleep(100 * time.Millisecond)

Просмотреть файл

@ -85,9 +85,9 @@ func TestFileInfo(t *testing.T) {
}
ch := make(chan []byte, 10)
var svm = sync2.ServiceManager{}
svm.Go(func(_ *sync2.ServiceManager) {
svm.Go(func(svc *sync2.ServiceContext) {
for svm.State() == sync2.SERVICE_RUNNING {
file.WaitForChange(&svm)
file.WaitForChange(svc)
b := make([]byte, 128)
n, err := file.handle.Read(b)
if err != nil {

Просмотреть файл

@ -74,10 +74,10 @@ func (sc *SlaveConnection) StartBinlogDump(startPos proto.GTID) (<-chan blproto.
eventChan := make(chan blproto.BinlogEvent)
// Start reading events.
sc.svm.Go(func(svm *sync2.ServiceManager) {
sc.svm.Go(func(svc *sync2.ServiceContext) {
defer close(eventChan)
for svm.IsRunning() {
for svc.IsRunning() {
buf, err := sc.Connection.ReadPacket()
if err != nil || len(buf) == 0 {
// This is not necessarily an error. It could just be that we closed
@ -95,7 +95,7 @@ func (sc *SlaveConnection) StartBinlogDump(startPos proto.GTID) (<-chan blproto.
select {
// Skip the first byte because it's only used for signaling EOF.
case eventChan <- sc.mysqld.flavor.MakeBinlogEvent(buf[1:]):
case <-svm.ShuttingDown():
case <-svc.ShuttingDown:
return
}
}

Просмотреть файл

@ -79,7 +79,7 @@ func (rci *RowcacheInvalidator) Open(dbname string, mysqld *mysqlctl.Mysqld) {
panic(NewTabletError(FATAL, "Rowcache invalidator aborting: binlog path not specified"))
}
ok := rci.svm.Go(func(_ *sync2.ServiceManager) {
ok := rci.svm.Go(func(_ *sync2.ServiceContext) {
rci.mu.Lock()
rci.dbname = dbname
rci.mysqld = mysqld