From 995866d76c749185c7f2eac31644b6c69ceddf68 Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Tue, 22 Dec 2015 11:35:52 -0800 Subject: [PATCH 1/6] Improve node management. 1. Introduce pending state. Pending nodes need validation before moving to healthy state. Resolve issues of duplicate ID and dead node drop issues. 2. Expose error and last update time in docker info. 3. Use connect success/failure to drive state transition between healthy and unhealthy. Signed-off-by: Dong Chen --- cluster/engine.go | 180 +++++++++++++++++++++++++--------- cluster/engine_test.go | 28 ++++++ cluster/mesos/driver.go | 2 + cluster/swarm/cluster.go | 83 ++++++++++++++-- cluster/swarm/cluster_test.go | 2 +- 5 files changed, 238 insertions(+), 57 deletions(-) diff --git a/cluster/engine.go b/cluster/engine.go index b62efa822d..0a6af8f47f 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -26,6 +26,27 @@ const ( minSupportedVersion = version.Version("1.6.0") ) +type engineState int + +const ( + // pending means an engine added to cluster has not been validated + statePending engineState = iota + // unhealthy means an engine is unreachable + stateUnhealthy + // healthy means an engine is reachable + stateHealthy + // maintenance means an engine is under maintenance. + // There is no action to migrate a node into maintenance state yet. + //stateMaintenance +) + +var stateText = map[engineState]string{ + statePending: "Pending", + stateUnhealthy: "Unhealthy", + stateHealthy: "Healthy", + //stateMaintenance: "Maintenance", +} + // delayer offers a simple API to random delay within a given time range. type delayer struct { rangeMin time.Duration @@ -82,7 +103,9 @@ type Engine struct { volumes map[string]*Volume client dockerclient.Client eventHandler EventHandler - healthy bool + state engineState + lastError string + updatedAt time.Time failureCount int overcommitRatio int64 opts *EngineOpts @@ -99,7 +122,8 @@ func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine { containers: make(map[string]*Container), networks: make(map[string]*Network), volumes: make(map[string]*Volume), - healthy: true, + state: statePending, + updatedAt: time.Now(), overcommitRatio: int64(overcommitRatio * 100), opts: opts, } @@ -153,9 +177,6 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error { e.RefreshVolumes() e.RefreshNetworks() - // Start the update loop. - go e.refreshLoop() - e.emitEvent("engine_connect") return nil @@ -182,45 +203,111 @@ func (e *Engine) isConnected() bool { // IsHealthy returns true if the engine is healthy func (e *Engine) IsHealthy() bool { - return e.healthy + e.RLock() + e.RUnlock() + return e.state == stateHealthy } -// setHealthy sets engine healthy state -func (e *Engine) setHealthy(state bool) { +// setState sets engine healthy state +func (e *Engine) setState(state engineState) { e.Lock() - e.healthy = state + defer e.Unlock() + e.state = state // if engine is healthy, clear failureCount - if state { + if state == stateHealthy { e.failureCount = 0 } - e.Unlock() +} + +// TimeToValidate returns true if a pending node is up for validation +func (e *Engine) TimeToValidate() bool { + e.Lock() + defer e.Unlock() + if e.state != statePending { + return false + } + sinceLastUpdate := time.Since(e.updatedAt) + // Increase check interval for a pending engine according to failureCount and cap it at 4 hours + if sinceLastUpdate > 4*time.Hour || sinceLastUpdate > time.Duration(e.failureCount)*30*time.Second { + return true + } + return false +} + +// ValidationComplete transitions engine state from statePending to stateHealthy +func (e *Engine) ValidationComplete() { + e.Lock() + defer e.Unlock() + if e.state != statePending { + return + } + e.state = stateHealthy + e.failureCount = 0 + go e.refreshLoop() +} + +// setErrMsg sets error message for the engine +func (e *Engine) setErrMsg(errMsg string) { + e.Lock() + defer e.Unlock() + e.lastError = errMsg + e.updatedAt = time.Now() +} + +// ErrMsg returns error message for the engine +func (e *Engine) ErrMsg() string { + e.RLock() + defer e.RUnlock() + return e.lastError +} + +// HandleIDConflict handles ID duplicate with existing engine +func (e *Engine) HandleIDConflict(otherAddr string) { + e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr)) } // Status returns the health status of the Engine: Healthy or Unhealthy func (e *Engine) Status() string { - if e.healthy { - return "Healthy" - } - return "Unhealthy" + e.RLock() + defer e.RUnlock() + return stateText[e.state] } // incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed func (e *Engine) incFailureCount() { e.Lock() + defer e.Unlock() e.failureCount++ - if e.healthy && e.failureCount >= e.opts.FailureRetry { - e.healthy = false + if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry { + e.state = stateUnhealthy + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as unhealthy. Connect failed %d times", e.failureCount) + e.emitEvent("engine_disconnect") } - e.Unlock() } -// CheckConnectionErr checks error from client response and adjust engine healthy indicators +// UpdatedAt returns the previous updatedAt time +func (e *Engine) UpdatedAt() time.Time { + e.RLock() + defer e.RUnlock() + return e.updatedAt +} + +// CheckConnectionErr checks error from client response and adjusts engine healthy indicators func (e *Engine) CheckConnectionErr(err error) { if err == nil { - e.setHealthy(true) + e.setErrMsg("") + // If current state is unhealthy, change it to healthy + if e.state == stateUnhealthy { + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount) + e.emitEvent("engine_reconnect") + e.setState(stateHealthy) + } return } + // update engine error message + e.setErrMsg(err.Error()) + // dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using // dockerclient. We need string matching for these cases. Remove the first character to deal with // case sensitive issue @@ -264,7 +351,19 @@ func (e *Engine) updateSpecs() error { return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion) } - e.ID = info.ID + e.Lock() + defer e.Unlock() + // Swarm/docker identifies engine by ID. Updating ID but not updating cluster + // index will put the cluster into inconsistent state. If this happens, the + // engine should be put to pending state for re-validation. + if e.ID == "" { + e.ID = info.ID + } else if e.ID != info.ID { + e.state = statePending + message := fmt.Sprintf("Engine (ID: %s, Addr: %s) shows up with another ID:%s. Please remove it from cluster, it can be added back.", e.ID, e.Addr, info.ID) + e.lastError = message + return fmt.Errorf(message) + } e.Name = info.Name e.Cpus = info.NCPU e.Memory = info.MemTotal @@ -368,9 +467,7 @@ func (e *Engine) RefreshVolumes() error { // FIXME: unexport this method after mesos scheduler stops using it directly func (e *Engine) RefreshContainers(full bool) error { containers, err := e.client.ListContainers(true, false, "") - // e.CheckConnectionErr(err) is not appropriate here because refresh loop uses - // RefreshContainers function to fail/recover an engine. Adding CheckConnectionErr - // here would result in double count + e.CheckConnectionErr(err) if err != nil { return err } @@ -389,7 +486,6 @@ func (e *Engine) RefreshContainers(full bool) error { defer e.Unlock() e.containers = merged - log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state") return nil } @@ -469,6 +565,7 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string return containers, nil } +// refreshLoop periodically triggers engine refresh. func (e *Engine) refreshLoop() { for { @@ -481,33 +578,24 @@ func (e *Engine) refreshLoop() { return } + if !e.IsHealthy() { + if err = e.updateSpecs(); err != nil { + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) + continue + } + e.client.StopAllMonitorEvents() + e.client.StartMonitorEvents(e.handler, nil) + } + err = e.RefreshContainers(false) if err == nil { // Do not check error as older daemon don't support this call e.RefreshVolumes() e.RefreshNetworks() - err = e.RefreshImages() - } - - if err != nil { - e.failureCount++ - if e.failureCount >= e.opts.FailureRetry && e.healthy { - e.emitEvent("engine_disconnect") - e.setHealthy(false) - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", e.failureCount, err) - } + e.RefreshImages() + log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine update succeeded") } else { - if !e.healthy { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount) - if err := e.updateSpecs(); err != nil { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) - continue - } - e.client.StopAllMonitorEvents() - e.client.StartMonitorEvents(e.handler, nil) - e.emitEvent("engine_reconnect") - } - e.setHealthy(true) + log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine refresh failed") } } } diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 07635360be..2757242513 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -38,8 +38,29 @@ var ( } ) +func TestSetEngineState(t *testing.T) { + engine := NewEngine("test", 0, engOpts) + assert.True(t, engine.state == statePending) + engine.setState(stateUnhealthy) + assert.True(t, engine.state == stateUnhealthy) + engine.incFailureCount() + assert.True(t, engine.failureCount == 1) + engine.setState(stateHealthy) + assert.True(t, engine.state == stateHealthy) + assert.True(t, engine.failureCount == 0) +} + +func TestErrMsg(t *testing.T) { + engine := NewEngine("test", 0, engOpts) + assert.True(t, len(engine.ErrMsg()) == 0) + message := "cannot connect" + engine.setErrMsg(message) + assert.True(t, engine.ErrMsg() == message) +} + func TestEngineFailureCount(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) for i := 0; i < engine.opts.FailureRetry; i++ { assert.True(t, engine.IsHealthy()) engine.incFailureCount() @@ -82,6 +103,7 @@ func TestOutdatedEngine(t *testing.T) { func TestEngineCpusMemory(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -105,6 +127,7 @@ func TestEngineCpusMemory(t *testing.T) { func TestEngineSpecs(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -133,6 +156,7 @@ func TestEngineSpecs(t *testing.T) { func TestEngineState(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -185,6 +209,7 @@ func TestCreateContainer(t *testing.T) { client = mockclient.NewMockClient() ) + engine.setState(stateUnhealthy) client.On("Info").Return(mockInfo, nil) client.On("Version").Return(mockVersion, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() @@ -242,6 +267,7 @@ func TestCreateContainer(t *testing.T) { func TestImages(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) engine.images = []*Image{ {dockerclient.Image{Id: "a"}, engine}, {dockerclient.Image{Id: "b"}, engine}, @@ -279,6 +305,7 @@ func TestUsedCpus(t *testing.T) { ) engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) client := mockclient.NewMockClient() for _, hn := range hostNcpu { @@ -312,6 +339,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) { ) engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) // A container is removed before it can be inspected. diff --git a/cluster/mesos/driver.go b/cluster/mesos/driver.go index 62c3bb37a3..b39ebde5cf 100644 --- a/cluster/mesos/driver.go +++ b/cluster/mesos/driver.go @@ -47,6 +47,8 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes if err := engine.Connect(c.TLSConfig); err != nil { log.Error(err) } else { + // Set engine state to healthy and start refresh loop + engine.ValidationComplete() s = newAgent(agentID, engine) c.agents[agentID] = s if err := s.engine.RegisterEventHandler(c); err != nil { diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 461d4a2833..138923deb1 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -51,6 +52,7 @@ type Cluster struct { eventHandler cluster.EventHandler engines map[string]*cluster.Engine + pendingEngines map[string]*cluster.Engine scheduler *scheduler.Scheduler discovery discovery.Discovery pendingContainers map[string]*pendingContainer @@ -66,6 +68,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery cluster := &Cluster{ engines: make(map[string]*cluster.Engine), + pendingEngines: make(map[string]*cluster.Engine), scheduler: scheduler, TLSConfig: TLSConfig, discovery: discovery, @@ -80,6 +83,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discoveryCh, errCh := cluster.discovery.Watch(nil) go cluster.monitorDiscovery(discoveryCh, errCh) + go cluster.monitorPendingEngines() return cluster, nil } @@ -195,6 +199,9 @@ func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine { c.RLock() defer c.RUnlock() + if engine, ok := c.pendingEngines[addr]; ok { + return engine + } for _, engine := range c.engines { if engine.Addr == addr { return engine @@ -217,11 +224,26 @@ func (c *Cluster) addEngine(addr string) bool { if err := engine.RegisterEventHandler(c); err != nil { log.Error(err) } + // Add it to pending engine map, indexed by address. This will prevent + // duplicates from entering + c.Lock() + c.pendingEngines[addr] = engine + c.Unlock() + // validatePendingEngine will start a thread to validate the engine. + // If the engine is reachable and valid, it'll be monitored and updated in a loop. + // If engine is not reachable, pending engines will be examined once in a while + go c.validatePendingEngine(engine) + + return true +} + +// validatePendingEngine connects to the engine, +func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool { // Attempt a connection to the engine. Since this is slow, don't get a hold // of the lock yet. if err := engine.Connect(c.TLSConfig); err != nil { - log.Error(err) + log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err) return false } @@ -233,16 +255,28 @@ func (c *Cluster) addEngine(addr string) bool { if old, exists := c.engines[engine.ID]; exists { if old.Addr != engine.Addr { log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) + // Keep this engine in pendingEngines table and show its error. + // If it's ID duplication from VM clone, user see this message and can fix it. + // If the engine is rebooted and get new IP from DHCP, previous address will be removed + // from discovery after a while. + // In both cases, retry may fix the problem. + engine.HandleIDConflict(old.Addr) } else { log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) + engine.Disconnect() + // Remove it from pendingEngines table + delete(c.pendingEngines, engine.Addr) } - engine.Disconnect() return false } - // Finally register the engine. + // Engine validated, move from pendingEngines table to engines table + delete(c.pendingEngines, engine.Addr) + // set engine state to healthy, and start refresh loop + engine.ValidationComplete() c.engines[engine.ID] = engine - log.Infof("Registered Engine %s at %s", engine.Name, addr) + + log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr) return true } @@ -255,7 +289,12 @@ func (c *Cluster) removeEngine(addr string) bool { defer c.Unlock() engine.Disconnect() - delete(c.engines, engine.ID) + // it could be in pendingEngines or engines + if _, ok := c.pendingEngines[addr]; ok { + delete(c.pendingEngines, addr) + } else { + delete(c.engines, engine.ID) + } log.Infof("Removed Engine %s", engine.Name) return true } @@ -277,10 +316,8 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err c.removeEngine(entry.String()) } - // Since `addEngine` can be very slow (it has to connect to the - // engine), we are going to do the adds in parallel. for _, entry := range added { - go c.addEngine(entry.String()) + c.addEngine(entry.String()) } case err := <-errCh: log.Errorf("Discovery error: %v", err) @@ -288,6 +325,26 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err } } +// monitorPendingEngines checks if some previous unreachable/invalid engines have been fixed +func (c *Cluster) monitorPendingEngines() { + for { + // Don't need to do it frequently + time.Sleep(30 * time.Second) + // Get the list of pendingEngines + c.RLock() + pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines)) + for _, e := range c.pendingEngines { + pEngines = append(pEngines, e) + } + c.RUnlock() + for _, e := range pEngines { + if e.TimeToValidate() { + go c.validatePendingEngine(e) + } + } + } +} + // Images returns all the images in the cluster. func (c *Cluster) Images() cluster.Images { c.RLock() @@ -672,7 +729,7 @@ func (c *Cluster) Volume(name string) *cluster.Volume { return nil } -// listNodes returns all the engines in the cluster. +// listNodes returns all validated engines in the cluster, excluding pendingEngines. func (c *Cluster) listNodes() []*node.Node { c.RLock() defer c.RUnlock() @@ -692,14 +749,18 @@ func (c *Cluster) listNodes() []*node.Node { } // listEngines returns all the engines in the cluster. +// This is for reporting, not scheduling, hence pendingEngines are included. func (c *Cluster) listEngines() []*cluster.Engine { c.RLock() defer c.RUnlock() - out := make([]*cluster.Engine, 0, len(c.engines)) + out := make([]*cluster.Engine, 0, len(c.engines)+len(c.pendingEngines)) for _, n := range c.engines { out = append(out, n) } + for _, n := range c.pendingEngines { + out = append(out, n) + } return out } @@ -744,6 +805,8 @@ func (c *Cluster) Info() [][]string { } sort.Strings(labels) info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))}) + info = append(info, []string{" └ Error", engine.ErrMsg()}) + info = append(info, []string{" └ UpdatedAt", engine.UpdatedAt().UTC().Format(time.RFC3339)}) } return info diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index bb02d2918d..862fc00afd 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -25,7 +25,7 @@ func (nopCloser) Close() error { var ( mockInfo = &dockerclient.Info{ - ID: "id", + ID: "test-engine", Name: "name", NCPU: 10, MemTotal: 20, From 52a7616d9936ebb711ed4150dc062430a6f53af3 Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Tue, 5 Jan 2016 14:59:30 -0800 Subject: [PATCH 2/6] Add integration test for state machine. Signed-off-by: Dong Chen --- cluster/swarm/cluster.go | 6 +- .../{ => nodemanagement}/engine_options.bats | 4 +- test/integration/nodemanagement/state.bats | 77 +++++++++++++++++++ 3 files changed, 84 insertions(+), 3 deletions(-) rename test/integration/{ => nodemanagement}/engine_options.bats (95%) create mode 100644 test/integration/nodemanagement/state.bats diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 138923deb1..f23175f205 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -805,7 +805,11 @@ func (c *Cluster) Info() [][]string { } sort.Strings(labels) info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))}) - info = append(info, []string{" └ Error", engine.ErrMsg()}) + errMsg := engine.ErrMsg() + if len(errMsg) == 0 { + errMsg = "(none)" + } + info = append(info, []string{" └ Error", errMsg}) info = append(info, []string{" └ UpdatedAt", engine.UpdatedAt().UTC().Format(time.RFC3339)}) } diff --git a/test/integration/engine_options.bats b/test/integration/nodemanagement/engine_options.bats similarity index 95% rename from test/integration/engine_options.bats rename to test/integration/nodemanagement/engine_options.bats index aef39fde29..bb817d4371 100644 --- a/test/integration/engine_options.bats +++ b/test/integration/nodemanagement/engine_options.bats @@ -1,6 +1,6 @@ #!/usr/bin/env bats -load helpers +load ../helpers @test "engine refresh options" { # minimum refresh interval @@ -8,7 +8,7 @@ load helpers [ "$status" -ne 0 ] [[ "${output}" == *"minimum refresh interval should be a positive number"* ]] - # max refresh interval + # max refresh interval run swarm manage --engine-refresh-min-interval "30s" -engine-refresh-max-interval "20s" --advertise 127.0.0.1:$SWARM_BASE_PORT 192.168.56.202:4444 [ "$status" -ne 0 ] [[ "${output}" == *"max refresh interval cannot be less than min refresh interval"* ]] diff --git a/test/integration/nodemanagement/state.bats b/test/integration/nodemanagement/state.bats new file mode 100644 index 0000000000..90ecd5d40c --- /dev/null +++ b/test/integration/nodemanagement/state.bats @@ -0,0 +1,77 @@ +#!/usr/bin/env bats + +load ../helpers + +DISCOVERY_FILE="" +DISCOVERY="" + +function setup() { + # create a blank temp file for discovery + DISCOVERY_FILE=$(mktemp) + DISCOVERY="file://$DISCOVERY_FILE" +} + +function teardown() { + swarm_manage_cleanup + stop_docker + rm -f "$DISCOVERY_FILE" +} + +function setup_discovery_file() { + rm -f "$DISCOVERY_FILE" + for host in ${HOSTS[@]}; do + echo "$host" >> $DISCOVERY_FILE + done +} + +@test "node failure and recovery" { + # Start 1 engine and register it in the file. + start_docker 1 + setup_discovery_file + # Start swarm and check it can reach the node + swarm_manage --engine-refresh-min-interval "1s" --engine-refresh-max-interval "1s" --engine-failure-retry 2 "$DISCOVERY" + + run docker_swarm info + [ "$status" -eq 0 ] + [[ "${output}" == *"Status: Healthy"* ]] + + # Stop the node and let it fail + docker_host stop ${DOCKER_CONTAINERS[0]} + sleep 4 + + # Verify swarm detects node failure + run docker_swarm info + [ "$status" -eq 0 ] + [[ "${output}" == *"Status: Unhealthy"* ]] + + # Verify swarm detects recovery + docker_host start ${DOCKER_CONTAINERS[0]} + sleep 4 + run docker_swarm info + [ "$status" -eq 0 ] + [[ "${output}" == *"Status: Healthy"* ]] +} + +@test "node pending and recovery" { + # Start 1 engine and register it in the file. + start_docker 1 + setup_discovery_file + # Stop the node + docker_host stop ${DOCKER_CONTAINERS[0]} + + # Start swarm with the stopped node + swarm_manage_no_wait "$DISCOVERY" + run docker_swarm info + [ "$status" -eq 0 ] + [[ "${output}" == *"Status: Pending"* ]] + + # Restart the node and wait for revalidation + docker_host start ${DOCKER_CONTAINERS[0]} + sleep 40 + + # Verify swarm detects recovery + run docker_swarm info + [ "$status" -eq 0 ] + [[ "${output}" == *"Status: Healthy"* ]] +} + From 9a1584d508eb7c4076328ef0f1e4559512863c3e Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Tue, 5 Jan 2016 15:56:55 -0800 Subject: [PATCH 3/6] Update integration test. Reduce pending node validation sleep interval. Each pending node has its own validation interval according to failure count. So reducing sleep interval is not increasing validation frequency for unreachable nodes. Signed-off-by: Dong Chen --- cluster/swarm/cluster.go | 2 +- test/integration/nodemanagement/state.bats | 34 +++++++--------------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index f23175f205..b2dbc14958 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -329,7 +329,7 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err func (c *Cluster) monitorPendingEngines() { for { // Don't need to do it frequently - time.Sleep(30 * time.Second) + time.Sleep(10 * time.Second) // Get the list of pendingEngines c.RLock() pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines)) diff --git a/test/integration/nodemanagement/state.bats b/test/integration/nodemanagement/state.bats index 90ecd5d40c..ac955131a1 100644 --- a/test/integration/nodemanagement/state.bats +++ b/test/integration/nodemanagement/state.bats @@ -31,25 +31,17 @@ function setup_discovery_file() { # Start swarm and check it can reach the node swarm_manage --engine-refresh-min-interval "1s" --engine-refresh-max-interval "1s" --engine-failure-retry 2 "$DISCOVERY" - run docker_swarm info - [ "$status" -eq 0 ] - [[ "${output}" == *"Status: Healthy"* ]] + eval "docker_swarm info | grep -q -i 'Status: Healthy'" # Stop the node and let it fail docker_host stop ${DOCKER_CONTAINERS[0]} - sleep 4 + # Wait for swarm to detect node failure + retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Unhealthy'" - # Verify swarm detects node failure - run docker_swarm info - [ "$status" -eq 0 ] - [[ "${output}" == *"Status: Unhealthy"* ]] - - # Verify swarm detects recovery + # Restart node docker_host start ${DOCKER_CONTAINERS[0]} - sleep 4 - run docker_swarm info - [ "$status" -eq 0 ] - [[ "${output}" == *"Status: Healthy"* ]] + # Wait for swarm to detect node recovery + retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Healthy'" } @test "node pending and recovery" { @@ -61,17 +53,11 @@ function setup_discovery_file() { # Start swarm with the stopped node swarm_manage_no_wait "$DISCOVERY" - run docker_swarm info - [ "$status" -eq 0 ] - [[ "${output}" == *"Status: Pending"* ]] + retry 2 1 eval "docker_swarm info | grep -q -i 'Status: Pending'" - # Restart the node and wait for revalidation + # Restart the node docker_host start ${DOCKER_CONTAINERS[0]} - sleep 40 - - # Verify swarm detects recovery - run docker_swarm info - [ "$status" -eq 0 ] - [[ "${output}" == *"Status: Healthy"* ]] + # Wait for swarm to detect node recovery + retry 15 3 eval "docker_swarm info | grep -q -i 'Status: Healthy'" } From 6a1b49cf4e9ba7a2cd6a52fef16459c6f0a64e68 Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Tue, 5 Jan 2016 16:20:38 -0800 Subject: [PATCH 4/6] Fix format issue in state.bats. Signed-off-by: Dong Chen --- test/integration/nodemanagement/state.bats | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/integration/nodemanagement/state.bats b/test/integration/nodemanagement/state.bats index ac955131a1..da81687bb1 100644 --- a/test/integration/nodemanagement/state.bats +++ b/test/integration/nodemanagement/state.bats @@ -31,17 +31,17 @@ function setup_discovery_file() { # Start swarm and check it can reach the node swarm_manage --engine-refresh-min-interval "1s" --engine-refresh-max-interval "1s" --engine-failure-retry 2 "$DISCOVERY" - eval "docker_swarm info | grep -q -i 'Status: Healthy'" + eval "docker_swarm info | grep -q -i 'Status: Healthy'" # Stop the node and let it fail docker_host stop ${DOCKER_CONTAINERS[0]} - # Wait for swarm to detect node failure - retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Unhealthy'" + # Wait for swarm to detect node failure + retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Unhealthy'" # Restart node docker_host start ${DOCKER_CONTAINERS[0]} - # Wait for swarm to detect node recovery - retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Healthy'" + # Wait for swarm to detect node recovery + retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Healthy'" } @test "node pending and recovery" { @@ -53,11 +53,11 @@ function setup_discovery_file() { # Start swarm with the stopped node swarm_manage_no_wait "$DISCOVERY" - retry 2 1 eval "docker_swarm info | grep -q -i 'Status: Pending'" + retry 2 1 eval "docker_swarm info | grep -q -i 'Status: Pending'" # Restart the node docker_host start ${DOCKER_CONTAINERS[0]} - # Wait for swarm to detect node recovery - retry 15 3 eval "docker_swarm info | grep -q -i 'Status: Healthy'" + # Wait for swarm to detect node recovery + retry 15 3 eval "docker_swarm info | grep -q -i 'Status: Healthy'" } From 58a0e1719d035d3c2120d7c767595db68b51caf1 Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Wed, 6 Jan 2016 10:33:51 -0800 Subject: [PATCH 5/6] Update failureCount scenario and test cases. Signed-off-by: Dong Chen --- cluster/engine.go | 15 +++++++++------ cluster/engine_test.go | 22 +++++++++++++++++++--- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/cluster/engine.go b/cluster/engine.go index 0a6af8f47f..c860acfac7 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -208,15 +208,11 @@ func (e *Engine) IsHealthy() bool { return e.state == stateHealthy } -// setState sets engine healthy state +// setState sets engine state func (e *Engine) setState(state engineState) { e.Lock() defer e.Unlock() e.state = state - // if engine is healthy, clear failureCount - if state == stateHealthy { - e.failureCount = 0 - } } // TimeToValidate returns true if a pending node is up for validation @@ -292,10 +288,17 @@ func (e *Engine) UpdatedAt() time.Time { return e.updatedAt } +func (e *Engine) resetFailureCount() { + e.Lock() + defer e.Unlock() + e.failureCount = 0 +} + // CheckConnectionErr checks error from client response and adjusts engine healthy indicators func (e *Engine) CheckConnectionErr(err error) { if err == nil { e.setErrMsg("") + e.resetFailureCount() // If current state is unhealthy, change it to healthy if e.state == stateUnhealthy { log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount) @@ -322,7 +325,7 @@ func (e *Engine) CheckConnectionErr(err error) { e.incFailureCount() return } - // other errors may be ambiguous. let refresh loop decide healthy or not. + // other errors may be ambiguous. } // Gather engine specs (CPU, memory, constraints, ...). diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 2757242513..0cb8fb7762 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -43,11 +43,8 @@ func TestSetEngineState(t *testing.T) { assert.True(t, engine.state == statePending) engine.setState(stateUnhealthy) assert.True(t, engine.state == stateUnhealthy) - engine.incFailureCount() - assert.True(t, engine.failureCount == 1) engine.setState(stateHealthy) assert.True(t, engine.state == stateHealthy) - assert.True(t, engine.failureCount == 0) } func TestErrMsg(t *testing.T) { @@ -58,6 +55,22 @@ func TestErrMsg(t *testing.T) { assert.True(t, engine.ErrMsg() == message) } +func TestCheckConnectionErr(t *testing.T) { + engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) + assert.True(t, engine.failureCount == 0) + err := dockerclient.ErrConnectionRefused + engine.CheckConnectionErr(err) + assert.True(t, len(engine.ErrMsg()) > 0) + assert.True(t, engine.failureCount == 1) + engine.CheckConnectionErr(err) + assert.True(t, engine.failureCount == 2) + err = nil + engine.CheckConnectionErr(err) + assert.True(t, engine.failureCount == 0) + assert.True(t, len(engine.ErrMsg()) == 0) +} + func TestEngineFailureCount(t *testing.T) { engine := NewEngine("test", 0, engOpts) engine.setState(stateHealthy) @@ -66,6 +79,9 @@ func TestEngineFailureCount(t *testing.T) { engine.incFailureCount() } assert.False(t, engine.IsHealthy()) + assert.True(t, engine.failureCount == engine.opts.FailureRetry) + engine.resetFailureCount() + assert.True(t, engine.failureCount == 0) } func TestEngineConnectionFailure(t *testing.T) { From 7e266f18ed364a376a9829ef94ff42c8bf0789ec Mon Sep 17 00:00:00 2001 From: Dong Chen Date: Thu, 7 Jan 2016 15:55:12 -0800 Subject: [PATCH 6/6] Name constants. Signed-off-by: Dong Chen --- cluster/engine.go | 7 +++++-- cluster/swarm/cluster.go | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cluster/engine.go b/cluster/engine.go index c860acfac7..9c528f1b9c 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -35,6 +35,7 @@ const ( stateUnhealthy // healthy means an engine is reachable stateHealthy + // TODO: add maintenance state. Proposal #1486 // maintenance means an engine is under maintenance. // There is no action to migrate a node into maintenance state yet. //stateMaintenance @@ -217,14 +218,16 @@ func (e *Engine) setState(state engineState) { // TimeToValidate returns true if a pending node is up for validation func (e *Engine) TimeToValidate() bool { + const validationLimit time.Duration = 4 * time.Hour + const failureBackoff time.Duration = 30 * time.Second e.Lock() defer e.Unlock() if e.state != statePending { return false } sinceLastUpdate := time.Since(e.updatedAt) - // Increase check interval for a pending engine according to failureCount and cap it at 4 hours - if sinceLastUpdate > 4*time.Hour || sinceLastUpdate > time.Duration(e.failureCount)*30*time.Second { + // Increase check interval for a pending engine according to failureCount and cap it at a limit + if sinceLastUpdate > validationLimit || sinceLastUpdate > time.Duration(e.failureCount)*failureBackoff { return true } return false diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index b2dbc14958..2055641e26 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -327,9 +327,10 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err // monitorPendingEngines checks if some previous unreachable/invalid engines have been fixed func (c *Cluster) monitorPendingEngines() { + const minimumValidationInterval time.Duration = 10 * time.Second for { // Don't need to do it frequently - time.Sleep(10 * time.Second) + time.Sleep(minimumValidationInterval) // Get the list of pendingEngines c.RLock() pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines))