diff --git a/builtins/builtins.go b/builtins/builtins.go index 9379c67c88..75f9d4ce34 100644 --- a/builtins/builtins.go +++ b/builtins/builtins.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" + "github.com/docker/docker/events" "github.com/docker/docker/pkg/parsers/kernel" "github.com/docker/docker/registry" "github.com/docker/docker/server" @@ -20,6 +21,9 @@ func Register(eng *engine.Engine) error { if err := remote(eng); err != nil { return err } + if err := events.New().Install(eng); err != nil { + return err + } if err := eng.Register("version", dockerVersion); err != nil { return err } diff --git a/daemon/container.go b/daemon/container.go index d62eb6d2fa..bd97f00f88 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -168,6 +168,13 @@ func (container *Container) WriteHostConfig() error { return ioutil.WriteFile(pth, data, 0666) } +func (container *Container) LogEvent(action string) { + d := container.daemon + if err := d.eng.Job("log_event", action, container.ID, d.Repositories().ImageName(container.Image)).Run(); err != nil { + utils.Errorf("Error running container: %s", err) + } +} + func (container *Container) getResourcePath(path string) (string, error) { cleanPath := filepath.Join("/", path) return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs) @@ -508,7 +515,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error { container.stdin, container.stdinPipe = io.Pipe() } if container.daemon != nil && container.daemon.srv != nil { - container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) + container.LogEvent("die") } if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { // FIXME: here is race condition between two RUN instructions in Dockerfile diff --git a/daemon/create.go b/daemon/create.go index 8d008f8ade..c3aa9ee58e 100644 --- a/daemon/create.go +++ b/daemon/create.go @@ -40,7 +40,7 @@ func (daemon *Daemon) ContainerCreate(job *engine.Job) engine.Status { if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled { job.Errorf("IPv4 forwarding is disabled.\n") } - job.Eng.Job("log", "create", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("create") // FIXME: this is necessary because daemon.Create might return a nil container // with a non-nil error. This should not happen! Once it's fixed we // can remove this workaround. diff --git a/daemon/delete.go b/daemon/delete.go index 9c92be3fb1..ad4df69860 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -70,7 +70,7 @@ func (daemon *Daemon) ContainerDestroy(job *engine.Job) engine.Status { if err := daemon.Destroy(container); err != nil { return job.Errorf("Cannot destroy container %s: %s", name, err) } - job.Eng.Job("log", "destroy", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("destroy") if removeVolume { var ( diff --git a/daemon/export.go b/daemon/export.go index 204e862d5c..bc0f14a3bb 100644 --- a/daemon/export.go +++ b/daemon/export.go @@ -23,7 +23,7 @@ func (daemon *Daemon) ContainerExport(job *engine.Job) engine.Status { return job.Errorf("%s: %s", name, err) } // FIXME: factor job-specific LogEvent to engine.Job.Run() - job.Eng.Job("log", "export", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("export") return engine.StatusOK } return job.Errorf("No such container: %s", name) diff --git a/daemon/image_delete.go b/daemon/image_delete.go index 77e8f85907..dd3e584e23 100644 --- a/daemon/image_delete.go +++ b/daemon/image_delete.go @@ -93,7 +93,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine. out := &engine.Env{} out.Set("Untagged", repoName+":"+tag) imgs.Add(out) - eng.Job("log", "untag", img.ID, "").Run() + eng.Job("log_event", "untag", img.ID, "").Run() } } tags = daemon.Repositories().ByID()[img.ID] @@ -111,7 +111,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine. out := &engine.Env{} out.Set("Deleted", img.ID) imgs.Add(out) - eng.Job("log", "delete", img.ID, "").Run() + eng.Job("log_event", "delete", img.ID, "").Run() if img.Parent != "" && !noprune { err := daemon.DeleteImage(eng, img.Parent, imgs, false, force, noprune) if first { diff --git a/daemon/kill.go b/daemon/kill.go index f883495cef..f5f5897c88 100644 --- a/daemon/kill.go +++ b/daemon/kill.go @@ -44,7 +44,7 @@ func (daemon *Daemon) ContainerKill(job *engine.Job) engine.Status { if err := container.Kill(); err != nil { return job.Errorf("Cannot kill container %s: %s", name, err) } - job.Eng.Job("log", "kill", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("kill") } else { // Otherwise, just send the requested signal if err := container.KillSig(int(sig)); err != nil { diff --git a/daemon/pause.go b/daemon/pause.go index 72e5cee020..0e4323d9a8 100644 --- a/daemon/pause.go +++ b/daemon/pause.go @@ -16,7 +16,7 @@ func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status { if err := container.Pause(); err != nil { return job.Errorf("Cannot pause container %s: %s", name, err) } - job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("pause") return engine.StatusOK } @@ -32,6 +32,6 @@ func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status { if err := container.Unpause(); err != nil { return job.Errorf("Cannot unpause container %s: %s", name, err) } - job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("unpause") return engine.StatusOK } diff --git a/daemon/restart.go b/daemon/restart.go index c0ae949a88..bcc057156d 100644 --- a/daemon/restart.go +++ b/daemon/restart.go @@ -19,7 +19,7 @@ func (daemon *Daemon) ContainerRestart(job *engine.Job) engine.Status { if err := container.Restart(int(t)); err != nil { return job.Errorf("Cannot restart container %s: %s\n", name, err) } - job.Eng.Job("log", "restart", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("restart") } else { return job.Errorf("No such container: %s\n", name) } diff --git a/daemon/server.go b/daemon/server.go index 1e06eda896..12fb0f57c8 100644 --- a/daemon/server.go +++ b/daemon/server.go @@ -1,10 +1,5 @@ package daemon -import ( - "github.com/docker/docker/utils" -) - type Server interface { - LogEvent(action, id, from string) *utils.JSONMessage IsRunning() bool // returns true if the server is currently in operation } diff --git a/daemon/start.go b/daemon/start.go index 0e64a4e916..cb6e9cb21f 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -36,8 +36,7 @@ func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status { if err := container.Start(); err != nil { return job.Errorf("Cannot start container %s: %s", name, err) } - job.Eng.Job("log", "start", container.ID, daemon.Repositories().ImageName(container.Image)).Run() - + container.LogEvent("start") return engine.StatusOK } diff --git a/daemon/stop.go b/daemon/stop.go index 5ce2e1726e..f1851291fb 100644 --- a/daemon/stop.go +++ b/daemon/stop.go @@ -22,7 +22,7 @@ func (daemon *Daemon) ContainerStop(job *engine.Job) engine.Status { if err := container.Stop(int(t)); err != nil { return job.Errorf("Cannot stop container %s: %s\n", name, err) } - job.Eng.Job("log", "stop", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + container.LogEvent("stop") } else { return job.Errorf("No such container: %s\n", name) } diff --git a/events/events.go b/events/events.go new file mode 100644 index 0000000000..01965b3e3b --- /dev/null +++ b/events/events.go @@ -0,0 +1,176 @@ +package events + +import ( + "encoding/json" + "sync" + "time" + + "github.com/docker/docker/engine" + "github.com/docker/docker/utils" +) + +const eventsLimit = 64 + +type listener chan<- *utils.JSONMessage + +type Events struct { + mu sync.RWMutex + events []*utils.JSONMessage + subscribers []listener +} + +func New() *Events { + return &Events{ + events: make([]*utils.JSONMessage, 0, eventsLimit), + } +} + +// Install installs events public api in docker engine +func (e *Events) Install(eng *engine.Engine) error { + // Here you should describe public interface + jobs := map[string]engine.Handler{ + "events": e.Get, + "log_event": e.Log, + "subscribers_count": e.SubscribersCount, + } + for name, job := range jobs { + if err := eng.Register(name, job); err != nil { + return err + } + } + return nil +} + +func (e *Events) Get(job *engine.Job) engine.Status { + var ( + since = job.GetenvInt64("since") + until = job.GetenvInt64("until") + timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) + ) + + // If no until, disable timeout + if until == 0 { + timeout.Stop() + } + + listener := make(chan *utils.JSONMessage) + e.subscribe(listener) + defer e.unsubscribe(listener) + + job.Stdout.Write(nil) + + // Resend every event in the [since, until] time interval. + if since != 0 { + if err := e.writeCurrent(job, since, until); err != nil { + return job.Error(err) + } + } + + for { + select { + case event, ok := <-listener: + if !ok { + return engine.StatusOK + } + if err := writeEvent(job, event); err != nil { + return job.Error(err) + } + case <-timeout.C: + return engine.StatusOK + } + } +} + +func (e *Events) Log(job *engine.Job) engine.Status { + if len(job.Args) != 3 { + return job.Errorf("usage: %s ACTION ID FROM", job.Name) + } + // not waiting for receivers + go e.log(job.Args[0], job.Args[1], job.Args[2]) + return engine.StatusOK +} + +func (e *Events) SubscribersCount(job *engine.Job) engine.Status { + ret := &engine.Env{} + ret.SetInt("count", e.subscribersCount()) + ret.WriteTo(job.Stdout) + return engine.StatusOK +} + +func writeEvent(job *engine.Job, event *utils.JSONMessage) error { + // When sending an event JSON serialization errors are ignored, but all + // other errors lead to the eviction of the listener. + if b, err := json.Marshal(event); err == nil { + if _, err = job.Stdout.Write(b); err != nil { + return err + } + } + return nil +} + +func (e *Events) writeCurrent(job *engine.Job, since, until int64) error { + e.mu.RLock() + for _, event := range e.events { + if event.Time >= since && (event.Time <= until || until == 0) { + if err := writeEvent(job, event); err != nil { + e.mu.RUnlock() + return err + } + } + } + e.mu.RUnlock() + return nil +} + +func (e *Events) subscribersCount() int { + e.mu.RLock() + c := len(e.subscribers) + e.mu.RUnlock() + return c +} + +func (e *Events) log(action, id, from string) { + e.mu.Lock() + now := time.Now().UTC().Unix() + jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now} + if len(e.events) == cap(e.events) { + // discard oldest event + copy(e.events, e.events[1:]) + e.events[len(e.events)-1] = jm + } else { + e.events = append(e.events, jm) + } + for _, s := range e.subscribers { + // We give each subscriber a 100ms time window to receive the event, + // after which we move to the next. + select { + case s <- jm: + case <-time.After(100 * time.Millisecond): + } + } + e.mu.Unlock() +} + +func (e *Events) subscribe(l listener) { + e.mu.Lock() + e.subscribers = append(e.subscribers, l) + e.mu.Unlock() +} + +// unsubscribe closes and removes the specified listener from the list of +// previously registed ones. +// It returns a boolean value indicating if the listener was successfully +// found, closed and unregistered. +func (e *Events) unsubscribe(l listener) bool { + e.mu.Lock() + for i, subscriber := range e.subscribers { + if subscriber == l { + close(l) + e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...) + e.mu.Unlock() + return true + } + } + e.mu.Unlock() + return false +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 0000000000..d07d5d1eff --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,154 @@ +package events + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "testing" + "time" + + "github.com/docker/docker/engine" + "github.com/docker/docker/utils" +) + +func TestEventsPublish(t *testing.T) { + e := New() + l1 := make(chan *utils.JSONMessage) + l2 := make(chan *utils.JSONMessage) + e.subscribe(l1) + e.subscribe(l2) + count := e.subscribersCount() + if count != 2 { + t.Fatalf("Must be 2 subscribers, got %d", count) + } + go e.log("test", "cont", "image") + select { + case msg := <-l1: + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if msg.Status != "test" { + t.Fatalf("Status should be test, got %s", msg.Status) + } + if msg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", msg.ID) + } + if msg.From != "image" { + t.Fatalf("From should be image, got %s", msg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } + select { + case msg := <-l2: + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if msg.Status != "test" { + t.Fatalf("Status should be test, got %s", msg.Status) + } + if msg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", msg.ID) + } + if msg.From != "image" { + t.Fatalf("From should be image, got %s", msg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } +} + +func TestEventsPublishTimeout(t *testing.T) { + e := New() + l := make(chan *utils.JSONMessage) + e.subscribe(l) + + c := make(chan struct{}) + go func() { + e.log("test", "cont", "image") + close(c) + }() + + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout publishing message") + } +} + +func TestLogEvents(t *testing.T) { + e := New() + eng := engine.New() + if err := e.Install(eng); err != nil { + t.Fatal(err) + } + + for i := 0; i < eventsLimit+16; i++ { + action := fmt.Sprintf("action_%d", i) + id := fmt.Sprintf("cont_%d", i) + from := fmt.Sprintf("image_%d", i) + job := eng.Job("log_event", action, id, from) + if err := job.Run(); err != nil { + t.Fatal(err) + } + } + time.Sleep(50 * time.Millisecond) + if len(e.events) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) + } + + job := eng.Job("events") + job.SetenvInt64("since", 1) + job.SetenvInt64("until", time.Now().Unix()) + buf := bytes.NewBuffer(nil) + job.Stdout.Add(buf) + if err := job.Run(); err != nil { + t.Fatal(err) + } + buf = bytes.NewBuffer(buf.Bytes()) + dec := json.NewDecoder(buf) + var msgs []utils.JSONMessage + for { + var jm utils.JSONMessage + if err := dec.Decode(&jm); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + msgs = append(msgs, jm) + } + if len(msgs) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs)) + } + first := msgs[0] + if first.Status != "action_16" { + t.Fatalf("First action is %s, must be action_15", first.Status) + } + last := msgs[len(msgs)-1] + if last.Status != "action_79" { + t.Fatalf("First action is %s, must be action_79", first.Status) + } +} + +func TestEventsCountJob(t *testing.T) { + e := New() + eng := engine.New() + if err := e.Install(eng); err != nil { + t.Fatal(err) + } + l1 := make(chan *utils.JSONMessage) + l2 := make(chan *utils.JSONMessage) + e.subscribe(l1) + e.subscribe(l2) + job := eng.Job("subscribers_count") + env, _ := job.Stdout.AddEnv() + if err := job.Run(); err != nil { + t.Fatal(err) + } + count := env.GetInt("count") + if count != 2 { + t.Fatalf("There must be 2 subscribers, got %d", count) + } +} diff --git a/server/events.go b/server/events.go deleted file mode 100644 index 214dd69e04..0000000000 --- a/server/events.go +++ /dev/null @@ -1,108 +0,0 @@ -// DEPRECATION NOTICE. PLEASE DO NOT ADD ANYTHING TO THIS FILE. -// -// For additional commments see server/server.go -// -package server - -import ( - "encoding/json" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/utils" -) - -func (srv *Server) Events(job *engine.Job) engine.Status { - if len(job.Args) != 0 { - return job.Errorf("Usage: %s", job.Name) - } - - var ( - since = job.GetenvInt64("since") - until = job.GetenvInt64("until") - timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) - ) - - // If no until, disable timeout - if until == 0 { - timeout.Stop() - } - - listener := make(chan utils.JSONMessage) - srv.eventPublisher.Subscribe(listener) - defer srv.eventPublisher.Unsubscribe(listener) - - // When sending an event JSON serialization errors are ignored, but all - // other errors lead to the eviction of the listener. - sendEvent := func(event *utils.JSONMessage) error { - if b, err := json.Marshal(event); err == nil { - if _, err = job.Stdout.Write(b); err != nil { - return err - } - } - return nil - } - - job.Stdout.Write(nil) - - // Resend every event in the [since, until] time interval. - if since != 0 { - for _, event := range srv.GetEvents() { - if event.Time >= since && (event.Time <= until || until == 0) { - if err := sendEvent(&event); err != nil { - return job.Error(err) - } - } - } - } - - for { - select { - case event, ok := <-listener: - if !ok { - return engine.StatusOK - } - if err := sendEvent(&event); err != nil { - return job.Error(err) - } - case <-timeout.C: - return engine.StatusOK - } - } -} - -// FIXME: this is a shim to allow breaking up other parts of Server without -// dragging the sphagetti dependency along. -func (srv *Server) Log(job *engine.Job) engine.Status { - if len(job.Args) != 3 { - return job.Errorf("usage: %s ACTION ID FROM", job.Name) - } - srv.LogEvent(job.Args[0], job.Args[1], job.Args[2]) - return engine.StatusOK -} - -func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { - now := time.Now().UTC().Unix() - jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now} - srv.AddEvent(jm) - srv.eventPublisher.Publish(jm) - return &jm -} - -func (srv *Server) AddEvent(jm utils.JSONMessage) { - srv.Lock() - if len(srv.events) == cap(srv.events) { - // discard oldest event - copy(srv.events, srv.events[1:]) - srv.events[len(srv.events)-1] = jm - } else { - srv.events = append(srv.events, jm) - } - srv.Unlock() -} - -func (srv *Server) GetEvents() []utils.JSONMessage { - srv.RLock() - defer srv.RUnlock() - return srv.events -} diff --git a/server/init.go b/server/init.go index a3866e7bef..1814e6f534 100644 --- a/server/init.go +++ b/server/init.go @@ -86,12 +86,10 @@ func InitServer(job *engine.Job) engine.Status { job.Eng.Hack_SetGlobalVar("httpapi.daemon", srv.daemon) for name, handler := range map[string]engine.Handler{ - "info": srv.DockerInfo, - "log": srv.Log, - "build": srv.Build, - "pull": srv.ImagePull, - "events": srv.Events, - "push": srv.ImagePush, + "info": srv.DockerInfo, + "build": srv.Build, + "pull": srv.ImagePull, + "push": srv.ImagePush, } { if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil { return job.Error(err) @@ -117,12 +115,10 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) return nil, err } srv := &Server{ - Eng: eng, - daemon: daemon, - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), - events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events - eventPublisher: utils.NewJSONMessagePublisher(), + Eng: eng, + daemon: daemon, + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), } daemon.SetServer(srv) return srv, nil diff --git a/server/server.go b/server/server.go index afa4f65f3c..3c8f0708a9 100644 --- a/server/server.go +++ b/server/server.go @@ -67,6 +67,11 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { initPath = srv.daemon.SystemInitPath() } + cjob := job.Eng.Job("subscribers_count") + env, _ := cjob.Stdout.AddEnv() + if err := cjob.Run(); err != nil { + return job.Error(err) + } v := &engine.Env{} v.SetInt("Containers", len(srv.daemon.List())) v.SetInt("Images", imgcount) @@ -79,7 +84,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { v.SetInt("NFd", utils.GetTotalUsedFds()) v.SetInt("NGoroutines", runtime.NumGoroutine()) v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name()) - v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount()) + v.SetInt("NEventsListener", env.GetInt("count")) v.Set("KernelVersion", kernelVersion) v.Set("OperatingSystem", operatingSystem) v.Set("IndexServerAddress", registry.IndexServerAddress()) @@ -128,12 +133,10 @@ func (srv *Server) Close() error { type Server struct { sync.RWMutex - daemon *daemon.Daemon - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} - events []utils.JSONMessage - eventPublisher *utils.JSONMessagePublisher - Eng *engine.Engine - running bool - tasks sync.WaitGroup + daemon *daemon.Daemon + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} + Eng *engine.Engine + running bool + tasks sync.WaitGroup } diff --git a/server/server_unit_test.go b/server/server_unit_test.go index 91ca709f4f..16f06c145e 100644 --- a/server/server_unit_test.go +++ b/server/server_unit_test.go @@ -1,11 +1,6 @@ package server -import ( - "testing" - "time" - - "github.com/docker/docker/utils" -) +import "testing" func TestPools(t *testing.T) { srv := &Server{ @@ -44,55 +39,3 @@ func TestPools(t *testing.T) { t.Fatalf("Expected `Unknown pool type`") } } - -func TestLogEvent(t *testing.T) { - srv := &Server{ - events: make([]utils.JSONMessage, 0, 64), - eventPublisher: utils.NewJSONMessagePublisher(), - } - - srv.LogEvent("fakeaction", "fakeid", "fakeimage") - - listener := make(chan utils.JSONMessage) - srv.eventPublisher.Subscribe(listener) - - srv.LogEvent("fakeaction2", "fakeid", "fakeimage") - - numEvents := len(srv.GetEvents()) - if numEvents != 2 { - t.Fatalf("Expected 2 events, found %d", numEvents) - } - go func() { - time.Sleep(200 * time.Millisecond) - srv.LogEvent("fakeaction3", "fakeid", "fakeimage") - time.Sleep(200 * time.Millisecond) - srv.LogEvent("fakeaction4", "fakeid", "fakeimage") - }() - - setTimeout(t, "Listening for events timed out", 2*time.Second, func() { - for i := 2; i < 4; i++ { - event := <-listener - if event != srv.GetEvents()[i] { - t.Fatalf("Event received it different than expected") - } - } - }) -} - -// FIXME: this is duplicated from integration/commands_test.go -func setTimeout(t *testing.T, msg string, d time.Duration, f func()) { - c := make(chan bool) - - // Make sure we are not too long - go func() { - time.Sleep(d) - c <- true - }() - go func() { - f() - c <- false - }() - if <-c && msg != "" { - t.Fatal(msg) - } -} diff --git a/utils/jsonmessagepublisher.go b/utils/jsonmessagepublisher.go deleted file mode 100644 index 659e6c8304..0000000000 --- a/utils/jsonmessagepublisher.go +++ /dev/null @@ -1,61 +0,0 @@ -package utils - -import ( - "sync" - "time" -) - -func NewJSONMessagePublisher() *JSONMessagePublisher { - return &JSONMessagePublisher{} -} - -type JSONMessageListener chan<- JSONMessage - -type JSONMessagePublisher struct { - m sync.RWMutex - subscribers []JSONMessageListener -} - -func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) { - p.m.Lock() - p.subscribers = append(p.subscribers, l) - p.m.Unlock() -} - -func (p *JSONMessagePublisher) SubscribersCount() int { - p.m.RLock() - count := len(p.subscribers) - p.m.RUnlock() - return count -} - -// Unsubscribe closes and removes the specified listener from the list of -// previously registed ones. -// It returns a boolean value indicating if the listener was successfully -// found, closed and unregistered. -func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool { - p.m.Lock() - defer p.m.Unlock() - - for i, subscriber := range p.subscribers { - if subscriber == l { - close(l) - p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) - return true - } - } - return false -} - -func (p *JSONMessagePublisher) Publish(m JSONMessage) { - p.m.RLock() - for _, subscriber := range p.subscribers { - // We give each subscriber a 100ms time window to receive the event, - // after which we move to the next. - select { - case subscriber <- m: - case <-time.After(100 * time.Millisecond): - } - } - p.m.RUnlock() -} diff --git a/utils/jsonmessagepublisher_test.go b/utils/jsonmessagepublisher_test.go deleted file mode 100644 index 2e1a820ca3..0000000000 --- a/utils/jsonmessagepublisher_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package utils - -import ( - "testing" - "time" -) - -func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) { - if q.SubscribersCount() != expected { - t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount()) - } -} - -func TestJSONMessagePublisherSubscription(t *testing.T) { - q := NewJSONMessagePublisher() - l1 := make(chan JSONMessage) - l2 := make(chan JSONMessage) - - assertSubscribersCount(t, q, 0) - q.Subscribe(l1) - assertSubscribersCount(t, q, 1) - q.Subscribe(l2) - assertSubscribersCount(t, q, 2) - - q.Unsubscribe(l1) - q.Unsubscribe(l2) - assertSubscribersCount(t, q, 0) -} - -func TestJSONMessagePublisherPublish(t *testing.T) { - q := NewJSONMessagePublisher() - l1 := make(chan JSONMessage) - l2 := make(chan JSONMessage) - - go func() { - for { - select { - case <-l1: - close(l1) - l1 = nil - case <-l2: - close(l2) - l2 = nil - case <-time.After(1 * time.Second): - q.Unsubscribe(l1) - q.Unsubscribe(l2) - t.Fatal("Timeout waiting for broadcasted message") - } - } - }() - - q.Subscribe(l1) - q.Subscribe(l2) - q.Publish(JSONMessage{}) -} - -func TestJSONMessagePublishTimeout(t *testing.T) { - q := NewJSONMessagePublisher() - l := make(chan JSONMessage) - q.Subscribe(l) - - c := make(chan struct{}) - go func() { - q.Publish(JSONMessage{}) - close(c) - }() - - select { - case <-c: - case <-time.After(time.Second): - t.Fatal("Timeout publishing message") - } -}