* Events subsystem merged from `server/events.go` and
  `utils/jsonmessagepublisher.go` and moved to `events/events.go`
* Only public interface for this subsystem is engine jobs
* There is two new engine jobs - `log_event` and `subscribers_count`
* There is auxiliary function `container.LogEvent` for logging events for
  containers

Docker-DCO-1.1-Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com> (github: LK4D4)
[solomon@docker.com: resolve merge conflicts]
Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Alexandr Morozov 2014-07-31 15:50:59 +04:00 коммит произвёл Solomon Hykes
Родитель af0781974d
Коммит 8d056423f8
20 изменённых файлов: 374 добавлений и 339 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
"github.com/docker/docker/daemon/networkdriver/bridge"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/engine"
"github.com/docker/docker/events"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/registry"
"github.com/docker/docker/server"
@ -20,6 +21,9 @@ func Register(eng *engine.Engine) error {
if err := remote(eng); err != nil {
return err
}
if err := events.New().Install(eng); err != nil {
return err
}
if err := eng.Register("version", dockerVersion); err != nil {
return err
}

Просмотреть файл

@ -168,6 +168,13 @@ func (container *Container) WriteHostConfig() error {
return ioutil.WriteFile(pth, data, 0666)
}
func (container *Container) LogEvent(action string) {
d := container.daemon
if err := d.eng.Job("log_event", action, container.ID, d.Repositories().ImageName(container.Image)).Run(); err != nil {
utils.Errorf("Error running container: %s", err)
}
}
func (container *Container) getResourcePath(path string) (string, error) {
cleanPath := filepath.Join("/", path)
return symlink.FollowSymlinkInScope(filepath.Join(container.basefs, cleanPath), container.basefs)
@ -508,7 +515,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error {
container.stdin, container.stdinPipe = io.Pipe()
}
if container.daemon != nil && container.daemon.srv != nil {
container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image))
container.LogEvent("die")
}
if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() {
// FIXME: here is race condition between two RUN instructions in Dockerfile

Просмотреть файл

@ -40,7 +40,7 @@ func (daemon *Daemon) ContainerCreate(job *engine.Job) engine.Status {
if !container.Config.NetworkDisabled && daemon.SystemConfig().IPv4ForwardingDisabled {
job.Errorf("IPv4 forwarding is disabled.\n")
}
job.Eng.Job("log", "create", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("create")
// FIXME: this is necessary because daemon.Create might return a nil container
// with a non-nil error. This should not happen! Once it's fixed we
// can remove this workaround.

Просмотреть файл

@ -70,7 +70,7 @@ func (daemon *Daemon) ContainerDestroy(job *engine.Job) engine.Status {
if err := daemon.Destroy(container); err != nil {
return job.Errorf("Cannot destroy container %s: %s", name, err)
}
job.Eng.Job("log", "destroy", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("destroy")
if removeVolume {
var (

Просмотреть файл

@ -23,7 +23,7 @@ func (daemon *Daemon) ContainerExport(job *engine.Job) engine.Status {
return job.Errorf("%s: %s", name, err)
}
// FIXME: factor job-specific LogEvent to engine.Job.Run()
job.Eng.Job("log", "export", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("export")
return engine.StatusOK
}
return job.Errorf("No such container: %s", name)

Просмотреть файл

@ -93,7 +93,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.
out := &engine.Env{}
out.Set("Untagged", repoName+":"+tag)
imgs.Add(out)
eng.Job("log", "untag", img.ID, "").Run()
eng.Job("log_event", "untag", img.ID, "").Run()
}
}
tags = daemon.Repositories().ByID()[img.ID]
@ -111,7 +111,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, imgs *engine.
out := &engine.Env{}
out.Set("Deleted", img.ID)
imgs.Add(out)
eng.Job("log", "delete", img.ID, "").Run()
eng.Job("log_event", "delete", img.ID, "").Run()
if img.Parent != "" && !noprune {
err := daemon.DeleteImage(eng, img.Parent, imgs, false, force, noprune)
if first {

Просмотреть файл

@ -44,7 +44,7 @@ func (daemon *Daemon) ContainerKill(job *engine.Job) engine.Status {
if err := container.Kill(); err != nil {
return job.Errorf("Cannot kill container %s: %s", name, err)
}
job.Eng.Job("log", "kill", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("kill")
} else {
// Otherwise, just send the requested signal
if err := container.KillSig(int(sig)); err != nil {

Просмотреть файл

@ -16,7 +16,7 @@ func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status {
if err := container.Pause(); err != nil {
return job.Errorf("Cannot pause container %s: %s", name, err)
}
job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("pause")
return engine.StatusOK
}
@ -32,6 +32,6 @@ func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status {
if err := container.Unpause(); err != nil {
return job.Errorf("Cannot unpause container %s: %s", name, err)
}
job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("unpause")
return engine.StatusOK
}

Просмотреть файл

@ -19,7 +19,7 @@ func (daemon *Daemon) ContainerRestart(job *engine.Job) engine.Status {
if err := container.Restart(int(t)); err != nil {
return job.Errorf("Cannot restart container %s: %s\n", name, err)
}
job.Eng.Job("log", "restart", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("restart")
} else {
return job.Errorf("No such container: %s\n", name)
}

Просмотреть файл

@ -1,10 +1,5 @@
package daemon
import (
"github.com/docker/docker/utils"
)
type Server interface {
LogEvent(action, id, from string) *utils.JSONMessage
IsRunning() bool // returns true if the server is currently in operation
}

Просмотреть файл

@ -36,8 +36,7 @@ func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status {
if err := container.Start(); err != nil {
return job.Errorf("Cannot start container %s: %s", name, err)
}
job.Eng.Job("log", "start", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("start")
return engine.StatusOK
}

Просмотреть файл

@ -22,7 +22,7 @@ func (daemon *Daemon) ContainerStop(job *engine.Job) engine.Status {
if err := container.Stop(int(t)); err != nil {
return job.Errorf("Cannot stop container %s: %s\n", name, err)
}
job.Eng.Job("log", "stop", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
container.LogEvent("stop")
} else {
return job.Errorf("No such container: %s\n", name)
}

176
events/events.go Normal file
Просмотреть файл

@ -0,0 +1,176 @@
package events
import (
"encoding/json"
"sync"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
const eventsLimit = 64
type listener chan<- *utils.JSONMessage
type Events struct {
mu sync.RWMutex
events []*utils.JSONMessage
subscribers []listener
}
func New() *Events {
return &Events{
events: make([]*utils.JSONMessage, 0, eventsLimit),
}
}
// Install installs events public api in docker engine
func (e *Events) Install(eng *engine.Engine) error {
// Here you should describe public interface
jobs := map[string]engine.Handler{
"events": e.Get,
"log_event": e.Log,
"subscribers_count": e.SubscribersCount,
}
for name, job := range jobs {
if err := eng.Register(name, job); err != nil {
return err
}
}
return nil
}
func (e *Events) Get(job *engine.Job) engine.Status {
var (
since = job.GetenvInt64("since")
until = job.GetenvInt64("until")
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
)
// If no until, disable timeout
if until == 0 {
timeout.Stop()
}
listener := make(chan *utils.JSONMessage)
e.subscribe(listener)
defer e.unsubscribe(listener)
job.Stdout.Write(nil)
// Resend every event in the [since, until] time interval.
if since != 0 {
if err := e.writeCurrent(job, since, until); err != nil {
return job.Error(err)
}
}
for {
select {
case event, ok := <-listener:
if !ok {
return engine.StatusOK
}
if err := writeEvent(job, event); err != nil {
return job.Error(err)
}
case <-timeout.C:
return engine.StatusOK
}
}
}
func (e *Events) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
// not waiting for receivers
go e.log(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}
func (e *Events) SubscribersCount(job *engine.Job) engine.Status {
ret := &engine.Env{}
ret.SetInt("count", e.subscribersCount())
ret.WriteTo(job.Stdout)
return engine.StatusOK
}
func writeEvent(job *engine.Job, event *utils.JSONMessage) error {
// When sending an event JSON serialization errors are ignored, but all
// other errors lead to the eviction of the listener.
if b, err := json.Marshal(event); err == nil {
if _, err = job.Stdout.Write(b); err != nil {
return err
}
}
return nil
}
func (e *Events) writeCurrent(job *engine.Job, since, until int64) error {
e.mu.RLock()
for _, event := range e.events {
if event.Time >= since && (event.Time <= until || until == 0) {
if err := writeEvent(job, event); err != nil {
e.mu.RUnlock()
return err
}
}
}
e.mu.RUnlock()
return nil
}
func (e *Events) subscribersCount() int {
e.mu.RLock()
c := len(e.subscribers)
e.mu.RUnlock()
return c
}
func (e *Events) log(action, id, from string) {
e.mu.Lock()
now := time.Now().UTC().Unix()
jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
for _, s := range e.subscribers {
// We give each subscriber a 100ms time window to receive the event,
// after which we move to the next.
select {
case s <- jm:
case <-time.After(100 * time.Millisecond):
}
}
e.mu.Unlock()
}
func (e *Events) subscribe(l listener) {
e.mu.Lock()
e.subscribers = append(e.subscribers, l)
e.mu.Unlock()
}
// unsubscribe closes and removes the specified listener from the list of
// previously registed ones.
// It returns a boolean value indicating if the listener was successfully
// found, closed and unregistered.
func (e *Events) unsubscribe(l listener) bool {
e.mu.Lock()
for i, subscriber := range e.subscribers {
if subscriber == l {
close(l)
e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)
e.mu.Unlock()
return true
}
}
e.mu.Unlock()
return false
}

154
events/events_test.go Normal file
Просмотреть файл

@ -0,0 +1,154 @@
package events
import (
"bytes"
"encoding/json"
"fmt"
"io"
"testing"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func TestEventsPublish(t *testing.T) {
e := New()
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
count := e.subscribersCount()
if count != 2 {
t.Fatalf("Must be 2 subscribers, got %d", count)
}
go e.log("test", "cont", "image")
select {
case msg := <-l1:
if len(e.events) != 1 {
t.Fatalf("Must be only one event, got %d", len(e.events))
}
if msg.Status != "test" {
t.Fatalf("Status should be test, got %s", msg.Status)
}
if msg.ID != "cont" {
t.Fatalf("ID should be cont, got %s", msg.ID)
}
if msg.From != "image" {
t.Fatalf("From should be image, got %s", msg.From)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for broadcasted message")
}
select {
case msg := <-l2:
if len(e.events) != 1 {
t.Fatalf("Must be only one event, got %d", len(e.events))
}
if msg.Status != "test" {
t.Fatalf("Status should be test, got %s", msg.Status)
}
if msg.ID != "cont" {
t.Fatalf("ID should be cont, got %s", msg.ID)
}
if msg.From != "image" {
t.Fatalf("From should be image, got %s", msg.From)
}
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for broadcasted message")
}
}
func TestEventsPublishTimeout(t *testing.T) {
e := New()
l := make(chan *utils.JSONMessage)
e.subscribe(l)
c := make(chan struct{})
go func() {
e.log("test", "cont", "image")
close(c)
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Timeout publishing message")
}
}
func TestLogEvents(t *testing.T) {
e := New()
eng := engine.New()
if err := e.Install(eng); err != nil {
t.Fatal(err)
}
for i := 0; i < eventsLimit+16; i++ {
action := fmt.Sprintf("action_%d", i)
id := fmt.Sprintf("cont_%d", i)
from := fmt.Sprintf("image_%d", i)
job := eng.Job("log_event", action, id, from)
if err := job.Run(); err != nil {
t.Fatal(err)
}
}
time.Sleep(50 * time.Millisecond)
if len(e.events) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
}
job := eng.Job("events")
job.SetenvInt64("since", 1)
job.SetenvInt64("until", time.Now().Unix())
buf := bytes.NewBuffer(nil)
job.Stdout.Add(buf)
if err := job.Run(); err != nil {
t.Fatal(err)
}
buf = bytes.NewBuffer(buf.Bytes())
dec := json.NewDecoder(buf)
var msgs []utils.JSONMessage
for {
var jm utils.JSONMessage
if err := dec.Decode(&jm); err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
msgs = append(msgs, jm)
}
if len(msgs) != eventsLimit {
t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs))
}
first := msgs[0]
if first.Status != "action_16" {
t.Fatalf("First action is %s, must be action_15", first.Status)
}
last := msgs[len(msgs)-1]
if last.Status != "action_79" {
t.Fatalf("First action is %s, must be action_79", first.Status)
}
}
func TestEventsCountJob(t *testing.T) {
e := New()
eng := engine.New()
if err := e.Install(eng); err != nil {
t.Fatal(err)
}
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
job := eng.Job("subscribers_count")
env, _ := job.Stdout.AddEnv()
if err := job.Run(); err != nil {
t.Fatal(err)
}
count := env.GetInt("count")
if count != 2 {
t.Fatalf("There must be 2 subscribers, got %d", count)
}
}

Просмотреть файл

@ -1,108 +0,0 @@
// DEPRECATION NOTICE. PLEASE DO NOT ADD ANYTHING TO THIS FILE.
//
// For additional commments see server/server.go
//
package server
import (
"encoding/json"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func (srv *Server) Events(job *engine.Job) engine.Status {
if len(job.Args) != 0 {
return job.Errorf("Usage: %s", job.Name)
}
var (
since = job.GetenvInt64("since")
until = job.GetenvInt64("until")
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
)
// If no until, disable timeout
if until == 0 {
timeout.Stop()
}
listener := make(chan utils.JSONMessage)
srv.eventPublisher.Subscribe(listener)
defer srv.eventPublisher.Unsubscribe(listener)
// When sending an event JSON serialization errors are ignored, but all
// other errors lead to the eviction of the listener.
sendEvent := func(event *utils.JSONMessage) error {
if b, err := json.Marshal(event); err == nil {
if _, err = job.Stdout.Write(b); err != nil {
return err
}
}
return nil
}
job.Stdout.Write(nil)
// Resend every event in the [since, until] time interval.
if since != 0 {
for _, event := range srv.GetEvents() {
if event.Time >= since && (event.Time <= until || until == 0) {
if err := sendEvent(&event); err != nil {
return job.Error(err)
}
}
}
}
for {
select {
case event, ok := <-listener:
if !ok {
return engine.StatusOK
}
if err := sendEvent(&event); err != nil {
return job.Error(err)
}
case <-timeout.C:
return engine.StatusOK
}
}
}
// FIXME: this is a shim to allow breaking up other parts of Server without
// dragging the sphagetti dependency along.
func (srv *Server) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
srv.LogEvent(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}
func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
now := time.Now().UTC().Unix()
jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
srv.AddEvent(jm)
srv.eventPublisher.Publish(jm)
return &jm
}
func (srv *Server) AddEvent(jm utils.JSONMessage) {
srv.Lock()
if len(srv.events) == cap(srv.events) {
// discard oldest event
copy(srv.events, srv.events[1:])
srv.events[len(srv.events)-1] = jm
} else {
srv.events = append(srv.events, jm)
}
srv.Unlock()
}
func (srv *Server) GetEvents() []utils.JSONMessage {
srv.RLock()
defer srv.RUnlock()
return srv.events
}

Просмотреть файл

@ -87,10 +87,8 @@ func InitServer(job *engine.Job) engine.Status {
for name, handler := range map[string]engine.Handler{
"info": srv.DockerInfo,
"log": srv.Log,
"build": srv.Build,
"pull": srv.ImagePull,
"events": srv.Events,
"push": srv.ImagePush,
} {
if err := job.Eng.Register(name, srv.handlerWrap(handler)); err != nil {
@ -121,8 +119,6 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
daemon: daemon,
pullingPool: make(map[string]chan struct{}),
pushingPool: make(map[string]chan struct{}),
events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
eventPublisher: utils.NewJSONMessagePublisher(),
}
daemon.SetServer(srv)
return srv, nil

Просмотреть файл

@ -67,6 +67,11 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
initPath = srv.daemon.SystemInitPath()
}
cjob := job.Eng.Job("subscribers_count")
env, _ := cjob.Stdout.AddEnv()
if err := cjob.Run(); err != nil {
return job.Error(err)
}
v := &engine.Env{}
v.SetInt("Containers", len(srv.daemon.List()))
v.SetInt("Images", imgcount)
@ -79,7 +84,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
v.SetInt("NFd", utils.GetTotalUsedFds())
v.SetInt("NGoroutines", runtime.NumGoroutine())
v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name())
v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount())
v.SetInt("NEventsListener", env.GetInt("count"))
v.Set("KernelVersion", kernelVersion)
v.Set("OperatingSystem", operatingSystem)
v.Set("IndexServerAddress", registry.IndexServerAddress())
@ -131,8 +136,6 @@ type Server struct {
daemon *daemon.Daemon
pullingPool map[string]chan struct{}
pushingPool map[string]chan struct{}
events []utils.JSONMessage
eventPublisher *utils.JSONMessagePublisher
Eng *engine.Engine
running bool
tasks sync.WaitGroup

Просмотреть файл

@ -1,11 +1,6 @@
package server
import (
"testing"
"time"
"github.com/docker/docker/utils"
)
import "testing"
func TestPools(t *testing.T) {
srv := &Server{
@ -44,55 +39,3 @@ func TestPools(t *testing.T) {
t.Fatalf("Expected `Unknown pool type`")
}
}
func TestLogEvent(t *testing.T) {
srv := &Server{
events: make([]utils.JSONMessage, 0, 64),
eventPublisher: utils.NewJSONMessagePublisher(),
}
srv.LogEvent("fakeaction", "fakeid", "fakeimage")
listener := make(chan utils.JSONMessage)
srv.eventPublisher.Subscribe(listener)
srv.LogEvent("fakeaction2", "fakeid", "fakeimage")
numEvents := len(srv.GetEvents())
if numEvents != 2 {
t.Fatalf("Expected 2 events, found %d", numEvents)
}
go func() {
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction3", "fakeid", "fakeimage")
time.Sleep(200 * time.Millisecond)
srv.LogEvent("fakeaction4", "fakeid", "fakeimage")
}()
setTimeout(t, "Listening for events timed out", 2*time.Second, func() {
for i := 2; i < 4; i++ {
event := <-listener
if event != srv.GetEvents()[i] {
t.Fatalf("Event received it different than expected")
}
}
})
}
// FIXME: this is duplicated from integration/commands_test.go
func setTimeout(t *testing.T, msg string, d time.Duration, f func()) {
c := make(chan bool)
// Make sure we are not too long
go func() {
time.Sleep(d)
c <- true
}()
go func() {
f()
c <- false
}()
if <-c && msg != "" {
t.Fatal(msg)
}
}

Просмотреть файл

@ -1,61 +0,0 @@
package utils
import (
"sync"
"time"
)
func NewJSONMessagePublisher() *JSONMessagePublisher {
return &JSONMessagePublisher{}
}
type JSONMessageListener chan<- JSONMessage
type JSONMessagePublisher struct {
m sync.RWMutex
subscribers []JSONMessageListener
}
func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
p.m.Lock()
p.subscribers = append(p.subscribers, l)
p.m.Unlock()
}
func (p *JSONMessagePublisher) SubscribersCount() int {
p.m.RLock()
count := len(p.subscribers)
p.m.RUnlock()
return count
}
// Unsubscribe closes and removes the specified listener from the list of
// previously registed ones.
// It returns a boolean value indicating if the listener was successfully
// found, closed and unregistered.
func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
p.m.Lock()
defer p.m.Unlock()
for i, subscriber := range p.subscribers {
if subscriber == l {
close(l)
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
return true
}
}
return false
}
func (p *JSONMessagePublisher) Publish(m JSONMessage) {
p.m.RLock()
for _, subscriber := range p.subscribers {
// We give each subscriber a 100ms time window to receive the event,
// after which we move to the next.
select {
case subscriber <- m:
case <-time.After(100 * time.Millisecond):
}
}
p.m.RUnlock()
}

Просмотреть файл

@ -1,73 +0,0 @@
package utils
import (
"testing"
"time"
)
func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) {
if q.SubscribersCount() != expected {
t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount())
}
}
func TestJSONMessagePublisherSubscription(t *testing.T) {
q := NewJSONMessagePublisher()
l1 := make(chan JSONMessage)
l2 := make(chan JSONMessage)
assertSubscribersCount(t, q, 0)
q.Subscribe(l1)
assertSubscribersCount(t, q, 1)
q.Subscribe(l2)
assertSubscribersCount(t, q, 2)
q.Unsubscribe(l1)
q.Unsubscribe(l2)
assertSubscribersCount(t, q, 0)
}
func TestJSONMessagePublisherPublish(t *testing.T) {
q := NewJSONMessagePublisher()
l1 := make(chan JSONMessage)
l2 := make(chan JSONMessage)
go func() {
for {
select {
case <-l1:
close(l1)
l1 = nil
case <-l2:
close(l2)
l2 = nil
case <-time.After(1 * time.Second):
q.Unsubscribe(l1)
q.Unsubscribe(l2)
t.Fatal("Timeout waiting for broadcasted message")
}
}
}()
q.Subscribe(l1)
q.Subscribe(l2)
q.Publish(JSONMessage{})
}
func TestJSONMessagePublishTimeout(t *testing.T) {
q := NewJSONMessagePublisher()
l := make(chan JSONMessage)
q.Subscribe(l)
c := make(chan struct{})
go func() {
q.Publish(JSONMessage{})
close(c)
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Timeout publishing message")
}
}