keep a consistent view of containers rendered

Replicate relevant mutations to the in-memory ACID store. Readers will
then be able to query container state without locking.

Signed-off-by: Fabio Kung <fabio.kung@gmail.com>
This commit is contained in:
Fabio Kung 2017-02-22 14:02:20 -08:00
Родитель 054728b1f5
Коммит eed4c7b73f
13 изменённых файлов: 107 добавлений и 22 удалений

Просмотреть файл

@ -262,11 +262,8 @@ func (container *Container) ConfigMounts() []Mount {
return mounts
}
// UpdateContainer updates configuration of a container.
// UpdateContainer updates configuration of a container. Callers must hold a Lock on the Container.
func (container *Container) UpdateContainer(hostConfig *containertypes.HostConfig) error {
container.Lock()
defer container.Unlock()
// update resources of container
resources := hostConfig.Resources
cResources := &container.HostConfig.Resources

Просмотреть файл

@ -126,11 +126,8 @@ func (container *Container) TmpfsMounts() ([]Mount, error) {
return mounts, nil
}
// UpdateContainer updates configuration of a container
// UpdateContainer updates configuration of a container. Callers must hold a Lock on the Container.
func (container *Container) UpdateContainer(hostConfig *containertypes.HostConfig) error {
container.Lock()
defer container.Unlock()
resources := hostConfig.Resources
if resources.CPUShares != 0 ||
resources.Memory != 0 ||

Просмотреть файл

@ -99,7 +99,7 @@ func (daemon *Daemon) load(id string) (*container.Container, error) {
}
// Register makes a container object usable by the daemon as <container.ID>
func (daemon *Daemon) Register(c *container.Container) {
func (daemon *Daemon) Register(c *container.Container) error {
// Attach to stdout and stderr
if c.Config.OpenStdin {
c.StreamConfig.NewInputPipes()
@ -107,8 +107,14 @@ func (daemon *Daemon) Register(c *container.Container) {
c.StreamConfig.NewNopInputPipe()
}
// once in the memory store it is visible to other goroutines
// grab a Lock until it has been replicated to avoid races
c.Lock()
defer c.Unlock()
daemon.containers.Add(c.ID, c)
daemon.idIndex.Add(c.ID)
return daemon.containersReplica.Save(c.Snapshot())
}
func (daemon *Daemon) newContainer(name string, platform string, config *containertypes.Config, hostConfig *containertypes.HostConfig, imgID image.ID, managed bool) (*container.Container, error) {
@ -212,6 +218,9 @@ func (daemon *Daemon) setHostConfig(container *container.Container, hostConfig *
runconfig.SetDefaultNetModeIfBlank(hostConfig)
container.HostConfig = hostConfig
if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
return err
}
return container.ToDisk()
}

Просмотреть файл

@ -44,6 +44,19 @@ func (daemon *Daemon) getDNSSearchSettings(container *container.Container) []str
return nil
}
func (daemon *Daemon) saveAndReplicate(container *container.Container) error {
container.Lock()
defer container.Unlock()
if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
return fmt.Errorf("Error replicating container state: %v", err)
}
if err := container.ToDisk(); err != nil {
return fmt.Errorf("Error saving container to disk: %v", err)
}
return nil
}
func (daemon *Daemon) buildSandboxOptions(container *container.Container) ([]libnetwork.SandboxOption, error) {
var (
sboxOptions []libnetwork.SandboxOption
@ -1005,7 +1018,7 @@ func (daemon *Daemon) ConnectToNetwork(container *container.Container, idOrName
return err
}
}
if err := container.ToDisk(); err != nil {
if err := daemon.saveAndReplicate(container); err != nil {
return fmt.Errorf("Error saving container to disk: %v", err)
}
return nil
@ -1044,16 +1057,16 @@ func (daemon *Daemon) DisconnectFromNetwork(container *container.Container, netw
return err
}
if err := container.ToDisk(); err != nil {
if err := daemon.saveAndReplicate(container); err != nil {
return fmt.Errorf("Error saving container to disk: %v", err)
}
if n != nil {
attributes := map[string]string{
daemon.LogNetworkEventWithAttributes(n, "disconnect", map[string]string{
"container": container.ID,
}
daemon.LogNetworkEventWithAttributes(n, "disconnect", attributes)
})
}
return nil
}

Просмотреть файл

@ -172,7 +172,9 @@ func (daemon *Daemon) create(params types.ContainerCreateConfig, managed bool) (
logrus.Errorf("Error saving new container to disk: %v", err)
return nil, err
}
daemon.Register(container)
if err := daemon.Register(container); err != nil {
return nil, err
}
stateCtr.set(container.ID, "stopped")
daemon.LogContainerEvent(container, "create")
return container, nil

Просмотреть файл

@ -83,6 +83,7 @@ type Daemon struct {
ID string
repository string
containers container.Store
containersReplica *container.MemDB
execCommands *exec.Store
downloadManager *xfer.LayerDownloadManager
uploadManager *xfer.LayerUploadManager
@ -182,11 +183,15 @@ func (daemon *Daemon) restore() error {
activeSandboxes := make(map[string]interface{})
for id, c := range containers {
if err := daemon.registerName(c); err != nil {
logrus.Errorf("Failed to register container name %s: %s", c.ID, err)
delete(containers, id)
continue
}
if err := daemon.Register(c); err != nil {
logrus.Errorf("Failed to register container %s: %s", c.ID, err)
delete(containers, id)
continue
}
daemon.Register(c)
// verify that all volumes valid and have been migrated from the pre-1.7 layout
if err := daemon.verifyVolumesInfo(c); err != nil {
@ -757,6 +762,9 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
d.ID = trustKey.PublicKey().KeyID()
d.repository = daemonRepo
d.containers = container.NewMemoryStore()
if d.containersReplica, err = container.NewMemDB(); err != nil {
return nil, err
}
d.execCommands = exec.NewStore()
d.trustKey = trustKey
d.idIndex = truncindex.NewTruncIndex([]string{})

Просмотреть файл

@ -103,14 +103,20 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
}
// Mark container dead. We don't want anybody to be restarting it.
container.SetDead()
container.Lock()
container.Dead = true
if err = daemon.containersReplica.Save(container.Snapshot()); err != nil {
container.Unlock()
return err
}
// Save container state to disk. So that if error happens before
// container meta file got removed from disk, then a restart of
// docker should not make a dead container alive.
if err := container.ToDiskLocking(); err != nil && !os.IsNotExist(err) {
if err := container.ToDisk(); err != nil && !os.IsNotExist(err) {
logrus.Errorf("Error saving dying container to disk: %v", err)
}
container.Unlock()
// When container creation fails and `RWLayer` has not been created yet, we
// do not call `ReleaseRWLayer`
@ -131,6 +137,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
selinuxFreeLxcContexts(container.ProcessLabel)
daemon.idIndex.Delete(container.ID)
daemon.containers.Delete(container.ID)
daemon.containersReplica.Delete(container.ID)
if e := daemon.removeMountPoints(container, removeVolume); e != nil {
logrus.Error(e)
}

Просмотреть файл

@ -167,6 +167,13 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch
// Else we're starting or healthy. Stay in that state.
}
// replicate Health status changes
if err := d.containersReplica.Save(c.Snapshot()); err != nil {
// queries will be inconsistent until the next probe runs or other state mutations
// trigger a replication
logrus.Errorf("Error replicating health state for container %s: %v", c.ID, err)
}
if oldStatus != h.Status {
d.LogContainerEvent(c, "health_status: "+h.Status)
}

Просмотреть файл

@ -29,7 +29,13 @@ func TestNoneHealthcheck(t *testing.T) {
},
State: &container.State{},
}
daemon := &Daemon{}
store, err := container.NewMemDB()
if err != nil {
t.Fatal(err)
}
daemon := &Daemon{
containersReplica: store,
}
daemon.initHealthMonitor(c)
if c.State.Health != nil {
@ -62,8 +68,15 @@ func TestHealthStates(t *testing.T) {
Image: "image_name",
},
}
store, err := container.NewMemDB()
if err != nil {
t.Fatal(err)
}
daemon := &Daemon{
EventsService: e,
EventsService: e,
containersReplica: store,
}
c.Config.Healthcheck = &containertypes.HealthConfig{

Просмотреть файл

@ -90,6 +90,9 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
daemon.setStateCounter(c)
defer c.Unlock()
if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
return err
}
if err := c.ToDisk(); err != nil {
return err
}
@ -119,6 +122,10 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
c.HasBeenStartedBefore = true
daemon.setStateCounter(c)
if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
c.Reset(false)
return err
}
if err := c.ToDisk(); err != nil {
c.Reset(false)
return err
@ -130,6 +137,9 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
// Container is already locked in this case
c.Paused = true
daemon.setStateCounter(c)
if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
return err
}
if err := c.ToDisk(); err != nil {
return err
}
@ -139,6 +149,9 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
// Container is already locked in this case
c.Paused = false
daemon.setStateCounter(c)
if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
return err
}
if err := c.ToDisk(); err != nil {
return err
}

Просмотреть файл

@ -82,6 +82,9 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
daemon.nameIndex.Release(oldName + k)
}
daemon.releaseName(oldName)
if err = daemon.containersReplica.Save(container.Snapshot()); err != nil {
return err
}
if err = container.ToDisk(); err != nil {
return err
}
@ -99,6 +102,9 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
if err != nil {
container.Name = oldName
container.NetworkSettings.IsAnonymousEndpoint = oldIsAnonymousEndpoint
if e := daemon.containersReplica.Save(container.Snapshot()); err != nil {
logrus.Errorf("%s: Failed in replicating state on rename failure: %v", container.ID, e)
}
if e := container.ToDisk(); e != nil {
logrus.Errorf("%s: Failed in writing to Disk on rename failure: %v", container.ID, e)
}

Просмотреть файл

@ -117,8 +117,12 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
if container.ExitCode() == 0 {
container.SetExitCode(128)
}
container.ToDisk()
if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
logrus.Errorf("%s: failed replicating state on start failure: %v", container.ID, err)
}
if err := container.ToDisk(); err != nil {
logrus.Errorf("%s: failed writing to disk on start failure: %v", container.ID, err)
}
container.Reset(false)
daemon.Cleanup(container)

Просмотреть файл

@ -38,6 +38,7 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
if restoreConfig {
container.Lock()
container.HostConfig = &backupHostConfig
daemon.containersReplica.Save(container.Snapshot())
container.ToDisk()
container.Unlock()
}
@ -47,10 +48,18 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
return errCannotUpdate(container.ID, fmt.Errorf("Container is marked for removal and cannot be \"update\"."))
}
container.Lock()
if err := container.UpdateContainer(hostConfig); err != nil {
restoreConfig = true
container.Unlock()
return errCannotUpdate(container.ID, err)
}
if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
restoreConfig = true
container.Unlock()
return errCannotUpdate(container.ID, err)
}
container.Unlock()
// if Restart Policy changed, we need to update container monitor
if hostConfig.RestartPolicy.Name != "" {