Merge pull request #1569 from dongluochen/nodeManagement

Improve node management.
This commit is contained in:
Alexandre Beslic
2016-01-07 16:14:36 -08:00
7 changed files with 333 additions and 62 deletions

View File

@@ -26,6 +26,28 @@ const (
minSupportedVersion = version.Version("1.6.0")
)
type engineState int
const (
// pending means an engine added to cluster has not been validated
statePending engineState = iota
// unhealthy means an engine is unreachable
stateUnhealthy
// healthy means an engine is reachable
stateHealthy
// TODO: add maintenance state. Proposal #1486
// maintenance means an engine is under maintenance.
// There is no action to migrate a node into maintenance state yet.
//stateMaintenance
)
var stateText = map[engineState]string{
statePending: "Pending",
stateUnhealthy: "Unhealthy",
stateHealthy: "Healthy",
//stateMaintenance: "Maintenance",
}
// delayer offers a simple API to random delay within a given time range.
type delayer struct {
rangeMin time.Duration
@@ -82,7 +104,9 @@ type Engine struct {
volumes map[string]*Volume
client dockerclient.Client
eventHandler EventHandler
healthy bool
state engineState
lastError string
updatedAt time.Time
failureCount int
overcommitRatio int64
opts *EngineOpts
@@ -99,7 +123,8 @@ func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine {
containers: make(map[string]*Container),
networks: make(map[string]*Network),
volumes: make(map[string]*Volume),
healthy: true,
state: statePending,
updatedAt: time.Now(),
overcommitRatio: int64(overcommitRatio * 100),
opts: opts,
}
@@ -153,9 +178,6 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error {
e.RefreshVolumes()
e.RefreshNetworks()
// Start the update loop.
go e.refreshLoop()
e.emitEvent("engine_connect")
return nil
@@ -182,45 +204,116 @@ func (e *Engine) isConnected() bool {
// IsHealthy returns true if the engine is healthy
func (e *Engine) IsHealthy() bool {
return e.healthy
e.RLock()
e.RUnlock()
return e.state == stateHealthy
}
// setHealthy sets engine healthy state
func (e *Engine) setHealthy(state bool) {
// setState sets engine state
func (e *Engine) setState(state engineState) {
e.Lock()
e.healthy = state
// if engine is healthy, clear failureCount
if state {
e.failureCount = 0
defer e.Unlock()
e.state = state
}
// TimeToValidate returns true if a pending node is up for validation
func (e *Engine) TimeToValidate() bool {
const validationLimit time.Duration = 4 * time.Hour
const failureBackoff time.Duration = 30 * time.Second
e.Lock()
defer e.Unlock()
if e.state != statePending {
return false
}
e.Unlock()
sinceLastUpdate := time.Since(e.updatedAt)
// Increase check interval for a pending engine according to failureCount and cap it at a limit
if sinceLastUpdate > validationLimit || sinceLastUpdate > time.Duration(e.failureCount)*failureBackoff {
return true
}
return false
}
// ValidationComplete transitions engine state from statePending to stateHealthy
func (e *Engine) ValidationComplete() {
e.Lock()
defer e.Unlock()
if e.state != statePending {
return
}
e.state = stateHealthy
e.failureCount = 0
go e.refreshLoop()
}
// setErrMsg sets error message for the engine
func (e *Engine) setErrMsg(errMsg string) {
e.Lock()
defer e.Unlock()
e.lastError = errMsg
e.updatedAt = time.Now()
}
// ErrMsg returns error message for the engine
func (e *Engine) ErrMsg() string {
e.RLock()
defer e.RUnlock()
return e.lastError
}
// HandleIDConflict handles ID duplicate with existing engine
func (e *Engine) HandleIDConflict(otherAddr string) {
e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr))
}
// Status returns the health status of the Engine: Healthy or Unhealthy
func (e *Engine) Status() string {
if e.healthy {
return "Healthy"
}
return "Unhealthy"
e.RLock()
defer e.RUnlock()
return stateText[e.state]
}
// incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed
func (e *Engine) incFailureCount() {
e.Lock()
defer e.Unlock()
e.failureCount++
if e.healthy && e.failureCount >= e.opts.FailureRetry {
e.healthy = false
if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry {
e.state = stateUnhealthy
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as unhealthy. Connect failed %d times", e.failureCount)
e.emitEvent("engine_disconnect")
}
e.Unlock()
}
// CheckConnectionErr checks error from client response and adjust engine healthy indicators
// UpdatedAt returns the previous updatedAt time
func (e *Engine) UpdatedAt() time.Time {
e.RLock()
defer e.RUnlock()
return e.updatedAt
}
func (e *Engine) resetFailureCount() {
e.Lock()
defer e.Unlock()
e.failureCount = 0
}
// CheckConnectionErr checks error from client response and adjusts engine healthy indicators
func (e *Engine) CheckConnectionErr(err error) {
if err == nil {
e.setHealthy(true)
e.setErrMsg("")
e.resetFailureCount()
// If current state is unhealthy, change it to healthy
if e.state == stateUnhealthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
e.emitEvent("engine_reconnect")
e.setState(stateHealthy)
}
return
}
// update engine error message
e.setErrMsg(err.Error())
// dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using
// dockerclient. We need string matching for these cases. Remove the first character to deal with
// case sensitive issue
@@ -235,7 +328,7 @@ func (e *Engine) CheckConnectionErr(err error) {
e.incFailureCount()
return
}
// other errors may be ambiguous. let refresh loop decide healthy or not.
// other errors may be ambiguous.
}
// Gather engine specs (CPU, memory, constraints, ...).
@@ -264,7 +357,19 @@ func (e *Engine) updateSpecs() error {
return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion)
}
e.ID = info.ID
e.Lock()
defer e.Unlock()
// Swarm/docker identifies engine by ID. Updating ID but not updating cluster
// index will put the cluster into inconsistent state. If this happens, the
// engine should be put to pending state for re-validation.
if e.ID == "" {
e.ID = info.ID
} else if e.ID != info.ID {
e.state = statePending
message := fmt.Sprintf("Engine (ID: %s, Addr: %s) shows up with another ID:%s. Please remove it from cluster, it can be added back.", e.ID, e.Addr, info.ID)
e.lastError = message
return fmt.Errorf(message)
}
e.Name = info.Name
e.Cpus = info.NCPU
e.Memory = info.MemTotal
@@ -368,9 +473,7 @@ func (e *Engine) RefreshVolumes() error {
// FIXME: unexport this method after mesos scheduler stops using it directly
func (e *Engine) RefreshContainers(full bool) error {
containers, err := e.client.ListContainers(true, false, "")
// e.CheckConnectionErr(err) is not appropriate here because refresh loop uses
// RefreshContainers function to fail/recover an engine. Adding CheckConnectionErr
// here would result in double count
e.CheckConnectionErr(err)
if err != nil {
return err
}
@@ -389,7 +492,6 @@ func (e *Engine) RefreshContainers(full bool) error {
defer e.Unlock()
e.containers = merged
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state")
return nil
}
@@ -469,6 +571,7 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string
return containers, nil
}
// refreshLoop periodically triggers engine refresh.
func (e *Engine) refreshLoop() {
for {
@@ -481,33 +584,24 @@ func (e *Engine) refreshLoop() {
return
}
if !e.IsHealthy() {
if err = e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
continue
}
e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil)
}
err = e.RefreshContainers(false)
if err == nil {
// Do not check error as older daemon don't support this call
e.RefreshVolumes()
e.RefreshNetworks()
err = e.RefreshImages()
}
if err != nil {
e.failureCount++
if e.failureCount >= e.opts.FailureRetry && e.healthy {
e.emitEvent("engine_disconnect")
e.setHealthy(false)
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", e.failureCount, err)
}
e.RefreshImages()
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine update succeeded")
} else {
if !e.healthy {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
if err := e.updateSpecs(); err != nil {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
continue
}
e.client.StopAllMonitorEvents()
e.client.StartMonitorEvents(e.handler, nil)
e.emitEvent("engine_reconnect")
}
e.setHealthy(true)
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine refresh failed")
}
}
}

View File

@@ -38,13 +38,50 @@ var (
}
)
func TestSetEngineState(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
assert.True(t, engine.state == statePending)
engine.setState(stateUnhealthy)
assert.True(t, engine.state == stateUnhealthy)
engine.setState(stateHealthy)
assert.True(t, engine.state == stateHealthy)
}
func TestErrMsg(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
assert.True(t, len(engine.ErrMsg()) == 0)
message := "cannot connect"
engine.setErrMsg(message)
assert.True(t, engine.ErrMsg() == message)
}
func TestCheckConnectionErr(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
assert.True(t, engine.failureCount == 0)
err := dockerclient.ErrConnectionRefused
engine.CheckConnectionErr(err)
assert.True(t, len(engine.ErrMsg()) > 0)
assert.True(t, engine.failureCount == 1)
engine.CheckConnectionErr(err)
assert.True(t, engine.failureCount == 2)
err = nil
engine.CheckConnectionErr(err)
assert.True(t, engine.failureCount == 0)
assert.True(t, len(engine.ErrMsg()) == 0)
}
func TestEngineFailureCount(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
for i := 0; i < engine.opts.FailureRetry; i++ {
assert.True(t, engine.IsHealthy())
engine.incFailureCount()
}
assert.False(t, engine.IsHealthy())
assert.True(t, engine.failureCount == engine.opts.FailureRetry)
engine.resetFailureCount()
assert.True(t, engine.failureCount == 0)
}
func TestEngineConnectionFailure(t *testing.T) {
@@ -82,6 +119,7 @@ func TestOutdatedEngine(t *testing.T) {
func TestEngineCpusMemory(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
@@ -105,6 +143,7 @@ func TestEngineCpusMemory(t *testing.T) {
func TestEngineSpecs(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
@@ -133,6 +172,7 @@ func TestEngineSpecs(t *testing.T) {
func TestEngineState(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
assert.False(t, engine.isConnected())
client := mockclient.NewMockClient()
@@ -185,6 +225,7 @@ func TestCreateContainer(t *testing.T) {
client = mockclient.NewMockClient()
)
engine.setState(stateUnhealthy)
client.On("Info").Return(mockInfo, nil)
client.On("Version").Return(mockVersion, nil)
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
@@ -242,6 +283,7 @@ func TestCreateContainer(t *testing.T) {
func TestImages(t *testing.T) {
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
engine.images = []*Image{
{dockerclient.Image{Id: "a"}, engine},
{dockerclient.Image{Id: "b"}, engine},
@@ -279,6 +321,7 @@ func TestUsedCpus(t *testing.T) {
)
engine := NewEngine("test", 0, engOpts)
engine.setState(stateHealthy)
client := mockclient.NewMockClient()
for _, hn := range hostNcpu {
@@ -312,6 +355,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) {
)
engine := NewEngine("test", 0, engOpts)
engine.setState(stateUnhealthy)
assert.False(t, engine.isConnected())
// A container is removed before it can be inspected.

View File

@@ -47,6 +47,8 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
if err := engine.Connect(c.TLSConfig); err != nil {
log.Error(err)
} else {
// Set engine state to healthy and start refresh loop
engine.ValidationComplete()
s = newAgent(agentID, engine)
c.agents[agentID] = s
if err := s.engine.RegisterEventHandler(c); err != nil {

View File

@@ -9,6 +9,7 @@ import (
"sort"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
@@ -51,6 +52,7 @@ type Cluster struct {
eventHandler cluster.EventHandler
engines map[string]*cluster.Engine
pendingEngines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
discovery discovery.Discovery
pendingContainers map[string]*pendingContainer
@@ -66,6 +68,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
cluster := &Cluster{
engines: make(map[string]*cluster.Engine),
pendingEngines: make(map[string]*cluster.Engine),
scheduler: scheduler,
TLSConfig: TLSConfig,
discovery: discovery,
@@ -80,6 +83,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)
go cluster.monitorPendingEngines()
return cluster, nil
}
@@ -199,6 +203,9 @@ func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine {
c.RLock()
defer c.RUnlock()
if engine, ok := c.pendingEngines[addr]; ok {
return engine
}
for _, engine := range c.engines {
if engine.Addr == addr {
return engine
@@ -221,11 +228,26 @@ func (c *Cluster) addEngine(addr string) bool {
if err := engine.RegisterEventHandler(c); err != nil {
log.Error(err)
}
// Add it to pending engine map, indexed by address. This will prevent
// duplicates from entering
c.Lock()
c.pendingEngines[addr] = engine
c.Unlock()
// validatePendingEngine will start a thread to validate the engine.
// If the engine is reachable and valid, it'll be monitored and updated in a loop.
// If engine is not reachable, pending engines will be examined once in a while
go c.validatePendingEngine(engine)
return true
}
// validatePendingEngine connects to the engine,
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
// Attempt a connection to the engine. Since this is slow, don't get a hold
// of the lock yet.
if err := engine.Connect(c.TLSConfig); err != nil {
log.Error(err)
log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
return false
}
@@ -237,16 +259,28 @@ func (c *Cluster) addEngine(addr string) bool {
if old, exists := c.engines[engine.ID]; exists {
if old.Addr != engine.Addr {
log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
// Keep this engine in pendingEngines table and show its error.
// If it's ID duplication from VM clone, user see this message and can fix it.
// If the engine is rebooted and get new IP from DHCP, previous address will be removed
// from discovery after a while.
// In both cases, retry may fix the problem.
engine.HandleIDConflict(old.Addr)
} else {
log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
engine.Disconnect()
// Remove it from pendingEngines table
delete(c.pendingEngines, engine.Addr)
}
engine.Disconnect()
return false
}
// Finally register the engine.
// Engine validated, move from pendingEngines table to engines table
delete(c.pendingEngines, engine.Addr)
// set engine state to healthy, and start refresh loop
engine.ValidationComplete()
c.engines[engine.ID] = engine
log.Infof("Registered Engine %s at %s", engine.Name, addr)
log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
return true
}
@@ -259,7 +293,12 @@ func (c *Cluster) removeEngine(addr string) bool {
defer c.Unlock()
engine.Disconnect()
delete(c.engines, engine.ID)
// it could be in pendingEngines or engines
if _, ok := c.pendingEngines[addr]; ok {
delete(c.pendingEngines, addr)
} else {
delete(c.engines, engine.ID)
}
log.Infof("Removed Engine %s", engine.Name)
return true
}
@@ -281,10 +320,8 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err
c.removeEngine(entry.String())
}
// Since `addEngine` can be very slow (it has to connect to the
// engine), we are going to do the adds in parallel.
for _, entry := range added {
go c.addEngine(entry.String())
c.addEngine(entry.String())
}
case err := <-errCh:
log.Errorf("Discovery error: %v", err)
@@ -292,6 +329,27 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err
}
}
// monitorPendingEngines checks if some previous unreachable/invalid engines have been fixed
func (c *Cluster) monitorPendingEngines() {
const minimumValidationInterval time.Duration = 10 * time.Second
for {
// Don't need to do it frequently
time.Sleep(minimumValidationInterval)
// Get the list of pendingEngines
c.RLock()
pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines))
for _, e := range c.pendingEngines {
pEngines = append(pEngines, e)
}
c.RUnlock()
for _, e := range pEngines {
if e.TimeToValidate() {
go c.validatePendingEngine(e)
}
}
}
}
// Images returns all the images in the cluster.
func (c *Cluster) Images() cluster.Images {
c.RLock()
@@ -676,7 +734,7 @@ func (c *Cluster) Volume(name string) *cluster.Volume {
return nil
}
// listNodes returns all the engines in the cluster.
// listNodes returns all validated engines in the cluster, excluding pendingEngines.
func (c *Cluster) listNodes() []*node.Node {
c.RLock()
defer c.RUnlock()
@@ -696,14 +754,18 @@ func (c *Cluster) listNodes() []*node.Node {
}
// listEngines returns all the engines in the cluster.
// This is for reporting, not scheduling, hence pendingEngines are included.
func (c *Cluster) listEngines() []*cluster.Engine {
c.RLock()
defer c.RUnlock()
out := make([]*cluster.Engine, 0, len(c.engines))
out := make([]*cluster.Engine, 0, len(c.engines)+len(c.pendingEngines))
for _, n := range c.engines {
out = append(out, n)
}
for _, n := range c.pendingEngines {
out = append(out, n)
}
return out
}
@@ -748,6 +810,12 @@ func (c *Cluster) Info() [][]string {
}
sort.Strings(labels)
info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))})
errMsg := engine.ErrMsg()
if len(errMsg) == 0 {
errMsg = "(none)"
}
info = append(info, []string{" └ Error", errMsg})
info = append(info, []string{" └ UpdatedAt", engine.UpdatedAt().UTC().Format(time.RFC3339)})
}
return info

View File

@@ -25,7 +25,7 @@ func (nopCloser) Close() error {
var (
mockInfo = &dockerclient.Info{
ID: "id",
ID: "test-engine",
Name: "name",
NCPU: 10,
MemTotal: 20,

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env bats
load helpers
load ../helpers
@test "engine refresh options" {
# minimum refresh interval
@@ -8,7 +8,7 @@ load helpers
[ "$status" -ne 0 ]
[[ "${output}" == *"minimum refresh interval should be a positive number"* ]]
# max refresh interval
# max refresh interval
run swarm manage --engine-refresh-min-interval "30s" -engine-refresh-max-interval "20s" --advertise 127.0.0.1:$SWARM_BASE_PORT 192.168.56.202:4444
[ "$status" -ne 0 ]
[[ "${output}" == *"max refresh interval cannot be less than min refresh interval"* ]]

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env bats
load ../helpers
DISCOVERY_FILE=""
DISCOVERY=""
function setup() {
# create a blank temp file for discovery
DISCOVERY_FILE=$(mktemp)
DISCOVERY="file://$DISCOVERY_FILE"
}
function teardown() {
swarm_manage_cleanup
stop_docker
rm -f "$DISCOVERY_FILE"
}
function setup_discovery_file() {
rm -f "$DISCOVERY_FILE"
for host in ${HOSTS[@]}; do
echo "$host" >> $DISCOVERY_FILE
done
}
@test "node failure and recovery" {
# Start 1 engine and register it in the file.
start_docker 1
setup_discovery_file
# Start swarm and check it can reach the node
swarm_manage --engine-refresh-min-interval "1s" --engine-refresh-max-interval "1s" --engine-failure-retry 2 "$DISCOVERY"
eval "docker_swarm info | grep -q -i 'Status: Healthy'"
# Stop the node and let it fail
docker_host stop ${DOCKER_CONTAINERS[0]}
# Wait for swarm to detect node failure
retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Unhealthy'"
# Restart node
docker_host start ${DOCKER_CONTAINERS[0]}
# Wait for swarm to detect node recovery
retry 5 1 eval "docker_swarm info | grep -q -i 'Status: Healthy'"
}
@test "node pending and recovery" {
# Start 1 engine and register it in the file.
start_docker 1
setup_discovery_file
# Stop the node
docker_host stop ${DOCKER_CONTAINERS[0]}
# Start swarm with the stopped node
swarm_manage_no_wait "$DISCOVERY"
retry 2 1 eval "docker_swarm info | grep -q -i 'Status: Pending'"
# Restart the node
docker_host start ${DOCKER_CONTAINERS[0]}
# Wait for swarm to detect node recovery
retry 15 3 eval "docker_swarm info | grep -q -i 'Status: Healthy'"
}