From 1297a4cef2813839da91bf962beda2e3a35cf77e Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 3 Apr 2015 18:40:13 -0700 Subject: [PATCH] transform node interface to engine struct Signed-off-by: Victor Vieux --- api/api.go | 48 ++-- api/events.go | 10 +- api/events_test.go | 24 +- cluster/container.go | 4 +- cluster/{swarm/node.go => engine.go} | 219 ++++++++---------- cluster/engine_sorter.go | 21 ++ cluster/engine_sorter_test.go | 18 ++ .../{swarm/node_test.go => engine_test.go} | 40 ++-- cluster/event.go | 2 +- cluster/fakenode_test.go | 20 -- cluster/image.go | 2 +- cluster/node.go | 55 ----- cluster/node_test.go | 18 -- cluster/swarm/cluster.go | 98 +++++--- cluster/swarm/cluster_test.go | 14 +- manage.go | 2 +- scheduler/filter/affinity.go | 10 +- scheduler/filter/affinity_test.go | 37 +-- scheduler/filter/constraint.go | 10 +- scheduler/filter/constraint_test.go | 70 +++--- scheduler/filter/dependency.go | 8 +- scheduler/filter/dependency_test.go | 99 ++++---- scheduler/filter/fakenode_test.go | 46 ---- scheduler/filter/filter.go | 6 +- scheduler/filter/health.go | 8 +- scheduler/filter/port.go | 10 +- scheduler/filter/port_test.go | 133 +++++------ scheduler/node/node.go | 67 ++++++ scheduler/scheduler.go | 4 +- scheduler/strategy/binpack.go | 6 +- scheduler/strategy/binpack_test.go | 120 +++++----- scheduler/strategy/fakenode_test.go | 53 ----- scheduler/strategy/random.go | 4 +- scheduler/strategy/spread.go | 6 +- scheduler/strategy/spread_test.go | 70 +++--- scheduler/strategy/strategy.go | 4 +- scheduler/strategy/weighted_node.go | 14 +- 37 files changed, 642 insertions(+), 738 deletions(-) rename cluster/{swarm/node.go => engine.go} (63%) create mode 100644 cluster/engine_sorter.go create mode 100644 cluster/engine_sorter_test.go rename cluster/{swarm/node_test.go => engine_test.go} (90%) delete mode 100644 cluster/fakenode_test.go delete mode 100644 cluster/node.go delete mode 100644 cluster/node_test.go delete mode 100644 scheduler/filter/fakenode_test.go create mode 100644 scheduler/node/node.go delete mode 100644 scheduler/strategy/fakenode_test.go diff --git a/api/api.go b/api/api.go index 8825140998..215c6144d7 100644 --- a/api/api.go +++ b/api/api.go @@ -16,7 +16,6 @@ import ( log "github.com/Sirupsen/logrus" dockerfilters "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/swarm/cluster" - "github.com/docker/swarm/scheduler/filter" "github.com/docker/swarm/version" "github.com/gorilla/mux" "github.com/samalba/dockerclient" @@ -94,7 +93,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) { if len(accepteds) != 0 { found := false for _, accepted := range accepteds { - if accepted == image.Node.Name() || accepted == image.Node.ID() { + if accepted == image.Engine.Name || accepted == image.Engine.ID { found = true break } @@ -132,20 +131,20 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { if strings.Split(tmp.Image, ":")[0] == "swarm" && !all { continue } - if !container.Node.IsHealthy() { + if !container.Engine.IsHealthy() { tmp.Status = "Pending" } // TODO remove the Node Name in the name when we have a good solution tmp.Names = make([]string, len(container.Names)) for i, name := range container.Names { - tmp.Names[i] = "/" + container.Node.Name() + name + tmp.Names[i] = "/" + container.Engine.Name + name } // insert node IP tmp.Ports = make([]dockerclient.Port, len(container.Ports)) for i, port := range container.Ports { tmp.Ports[i] = port if port.IP == "0.0.0.0" { - tmp.Ports[i].IP = container.Node.IP() + tmp.Ports[i].IP = container.Engine.IP } } out = append(out, &tmp) @@ -171,7 +170,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { } client, scheme := newClientAndScheme(c.tlsConfig) - resp, err := client.Get(scheme + "://" + container.Node.Addr() + "/containers/" + container.Id + "/json") + resp, err := client.Get(scheme + "://" + container.Engine.Addr + "/containers/" + container.Id + "/json") if err != nil { httpError(w, err.Error(), http.StatusInternalServerError) return @@ -188,10 +187,10 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { } // insert Node field - data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", cluster.SerializeNode(container.Node))), -1) + data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", container.Engine)), -1) // insert node IP - data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP())), -1) + data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Engine.IP)), -1) w.Header().Set("Content-Type", "application/json") w.Write(data) @@ -301,7 +300,7 @@ func postContainersExec(c *context, w http.ResponseWriter, r *http.Request) { client, scheme := newClientAndScheme(c.tlsConfig) - resp, err := client.Post(scheme+"://"+container.Node.Addr()+"/containers/"+container.Id+"/exec", "application/json", r.Body) + resp, err := client.Post(scheme+"://"+container.Engine.Addr+"/containers/"+container.Id+"/exec", "application/json", r.Body) if err != nil { httpError(w, err.Error(), http.StatusInternalServerError) return @@ -356,7 +355,7 @@ func deleteImages(c *context, w http.ResponseWriter, r *http.Request) { for _, image := range matchedImages { content, err := c.cluster.RemoveImage(image) if err != nil { - errs = append(errs, fmt.Sprintf("%s: %s", image.Node.Name(), err.Error())) + errs = append(errs, fmt.Sprintf("%s: %s", image.Engine.Name, err.Error())) continue } out = append(out, content...) @@ -384,7 +383,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { return } - if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil { + if err := proxy(c.tlsConfig, container.Engine.Addr, w, r); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) } } @@ -394,7 +393,7 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] if image := c.cluster.Image(name); image != nil { - proxy(c.tlsConfig, image.Node.Addr(), w, r) + proxy(c.tlsConfig, image.Engine.Addr, w, r) return } httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound) @@ -402,23 +401,24 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { // Proxy a request to a random node func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) { - candidates := []cluster.Node{} + candidates := []*cluster.Engine{} // FIXME: doesn't work if there are no container in the cluster // remove proxyRandom and implemente the features locally for _, container := range c.cluster.Containers() { - candidates = append(candidates, container.Node) + candidates = append(candidates, container.Engine) } - healthFilter := &filter.HealthFilter{} - accepted, err := healthFilter.Filter(nil, candidates) + // FIXME: + // healthFilter := &filter.HealthFilter{} + // accepted, err := healthFilter.Filter(nil, candidates) + accepted := candidates + //if err != nil { + // httpError(w, err.Error(), http.StatusInternalServerError) + // return + //} - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr(), w, r); err != nil { + if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) } } @@ -441,7 +441,7 @@ func postCommit(c *context, w http.ResponseWriter, r *http.Request) { } // proxy commit request to the right node - if err := proxy(c.tlsConfig, container.Node.Addr(), w, r); err != nil { + if err := proxy(c.tlsConfig, container.Engine.Addr, w, r); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) } } @@ -454,7 +454,7 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) { return } - if err := hijack(c.tlsConfig, container.Node.Addr(), w, r); err != nil { + if err := hijack(c.tlsConfig, container.Engine.Addr, w, r); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) } } diff --git a/api/events.go b/api/events.go index cf4a01144c..e67a2fbba9 100644 --- a/api/events.go +++ b/api/events.go @@ -42,12 +42,16 @@ func (eh *eventsHandler) Wait(remoteAddr string) { func (eh *eventsHandler) Handle(e *cluster.Event) error { eh.RLock() - str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:%s}", + str := fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%d,%q:{%q:%q,%q:%q,%q:%q,%q:%q}}", "status", e.Status, "id", e.Id, - "from", e.From+" node:"+e.Node.Name(), + "from", e.From+" node:"+e.Engine.Name, "time", e.Time, - "node", cluster.SerializeNode(e.Node)) + "node", + "Name", e.Engine.Name, + "Id", e.Engine.ID, + "Addr", e.Engine.Addr, + "Ip", e.Engine.IP) for key, w := range eh.ws { if _, err := fmt.Fprintf(w, str); err != nil { diff --git a/api/events_test.go b/api/events_test.go index eafbdd7639..e8ecd7281b 100644 --- a/api/events_test.go +++ b/api/events_test.go @@ -17,23 +17,6 @@ func (fw *FakeWriter) Write(p []byte) (n int, err error) { return len(p), nil } -type FakeNode struct{} - -func (fn *FakeNode) ID() string { return "node_id" } -func (fn *FakeNode) Name() string { return "node_name" } -func (fn *FakeNode) IP() string { return "node_ip" } -func (fn *FakeNode) Addr() string { return "node_addr" } -func (fn *FakeNode) Images() []*cluster.Image { return nil } -func (fn *FakeNode) Image(_ string) *cluster.Image { return nil } -func (fn *FakeNode) Containers() []*cluster.Container { return nil } -func (fn *FakeNode) Container(_ string) *cluster.Container { return nil } -func (fn *FakeNode) TotalCpus() int64 { return 0 } -func (fn *FakeNode) UsedCpus() int64 { return 0 } -func (fn *FakeNode) TotalMemory() int64 { return 0 } -func (fn *FakeNode) UsedMemory() int64 { return 0 } -func (fn *FakeNode) Labels() map[string]string { return nil } -func (fn *FakeNode) IsHealthy() bool { return true } - func TestHandle(t *testing.T) { eh := NewEventsHandler() assert.Equal(t, eh.Size(), 0) @@ -44,7 +27,12 @@ func TestHandle(t *testing.T) { assert.Equal(t, eh.Size(), 1) event := &cluster.Event{ - Node: &FakeNode{}, + Engine: &cluster.Engine{ + ID: "node_id", + Name: "node_name", + IP: "node_ip", + Addr: "node_addr", + }, } event.Event.Status = "status" diff --git a/cluster/container.go b/cluster/container.go index a45aec828a..f52b061fcd 100644 --- a/cluster/container.go +++ b/cluster/container.go @@ -6,6 +6,6 @@ import "github.com/samalba/dockerclient" type Container struct { dockerclient.Container - Info dockerclient.ContainerInfo - Node Node + Info dockerclient.ContainerInfo + Engine *Engine } diff --git a/cluster/swarm/node.go b/cluster/engine.go similarity index 63% rename from cluster/swarm/node.go rename to cluster/engine.go index 957a3d17fb..e9e633f0d5 100644 --- a/cluster/swarm/node.go +++ b/cluster/engine.go @@ -1,4 +1,4 @@ -package swarm +package cluster import ( "crypto/tls" @@ -10,7 +10,6 @@ import ( "time" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" "github.com/samalba/dockerclient" ) @@ -22,63 +21,44 @@ const ( requestTimeout = 10 * time.Second ) -// NewNode is exported -func NewNode(addr string, overcommitRatio float64) *node { - e := &node{ - addr: addr, - labels: make(map[string]string), +// NewEngine is exported +func NewEngine(addr string, overcommitRatio float64) *Engine { + e := &Engine{ + Addr: addr, + Labels: make(map[string]string), ch: make(chan bool), - containers: make(map[string]*cluster.Container), + containers: make(map[string]*Container), healthy: true, overcommitRatio: int64(overcommitRatio * 100), } return e } -type node struct { +// Engine represents a docker engine +type Engine struct { sync.RWMutex - id string - ip string - addr string - name string + ID string + IP string + Addr string + Name string Cpus int64 Memory int64 - labels map[string]string + Labels map[string]string ch chan bool - containers map[string]*cluster.Container - images []*cluster.Image + containers map[string]*Container + images []*Image client dockerclient.Client - eventHandler cluster.EventHandler + eventHandler EventHandler healthy bool overcommitRatio int64 } -func (n *node) ID() string { - return n.id -} - -func (n *node) IP() string { - return n.ip -} - -func (n *node) Addr() string { - return n.addr -} - -func (n *node) Name() string { - return n.name -} - -func (n *node) Labels() map[string]string { - return n.labels -} - // Connect will initialize a connection to the Docker daemon running on the // host, gather machine specs (memory, cpu, ...) and monitor state changes. -func (n *node) connect(config *tls.Config) error { - host, _, err := net.SplitHostPort(n.addr) +func (n *Engine) Connect(config *tls.Config) error { + host, _, err := net.SplitHostPort(n.Addr) if err != nil { return err } @@ -87,9 +67,9 @@ func (n *node) connect(config *tls.Config) error { if err != nil { return err } - n.ip = addr.IP.String() + n.IP = addr.IP.String() - c, err := dockerclient.NewDockerClientTimeout("tcp://"+n.addr, config, time.Duration(requestTimeout)) + c, err := dockerclient.NewDockerClientTimeout("tcp://"+n.Addr, config, time.Duration(requestTimeout)) if err != nil { return err } @@ -97,7 +77,7 @@ func (n *node) connect(config *tls.Config) error { return n.connectClient(c) } -func (n *node) connectClient(client dockerclient.Client) error { +func (n *Engine) connectClient(client dockerclient.Client) error { n.client = client // Fetch the engine labels. @@ -120,43 +100,44 @@ func (n *node) connectClient(client dockerclient.Client) error { // Start the update loop. go n.refreshLoop() - // Start monitoring events from the node. + // Start monitoring events from the engine. n.client.StartMonitorEvents(n.handler, nil) - n.emitEvent("node_connect") + n.emitEvent("engine_connect") return nil } // isConnected returns true if the engine is connected to a remote docker API -func (n *node) isConnected() bool { +func (n *Engine) isConnected() bool { return n.client != nil } -func (n *node) IsHealthy() bool { +// IsHealthy returns true if the engine is healthy +func (n *Engine) IsHealthy() bool { return n.healthy } -// Gather node specs (CPU, memory, constraints, ...). -func (n *node) updateSpecs() error { +// Gather engine specs (CPU, memory, constraints, ...). +func (n *Engine) updateSpecs() error { info, err := n.client.Info() if err != nil { return err } if info.NCPU == 0 || info.MemTotal == 0 { - return fmt.Errorf("cannot get resources for this node, make sure %s is a Docker Engine, not a Swarm manager", n.addr) + return fmt.Errorf("cannot get resources for this engine, make sure %s is a Docker Engine, not a Swarm manager", n.Addr) } // Older versions of Docker don't expose the ID field and are not supported // by Swarm. Catch the error ASAP and refuse to connect. if len(info.ID) == 0 { - return fmt.Errorf("node %s is running an unsupported version of Docker Engine. Please upgrade", n.addr) + return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade", n.Addr) } - n.id = info.ID - n.name = info.Name + n.ID = info.ID + n.Name = info.Name n.Cpus = info.NCPU n.Memory = info.MemTotal - n.labels = map[string]string{ + n.Labels = map[string]string{ "storagedriver": info.Driver, "executiondriver": info.ExecutionDriver, "kernelversion": info.KernelVersion, @@ -164,18 +145,18 @@ func (n *node) updateSpecs() error { } for _, label := range info.Labels { kv := strings.SplitN(label, "=", 2) - n.labels[kv[0]] = kv[1] + n.Labels[kv[0]] = kv[1] } return nil } -// Delete an image from the node. -func (n *node) removeImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) { +// RemoveImage deletes an image from the engine. +func (n *Engine) RemoveImage(image *Image) ([]*dockerclient.ImageDelete, error) { return n.client.RemoveImage(image.Id) } -// Refresh the list of images on the node. -func (n *node) refreshImages() error { +// Refresh the list of images on the engine. +func (n *Engine) refreshImages() error { images, err := n.client.ListImages() if err != nil { return err @@ -183,25 +164,25 @@ func (n *node) refreshImages() error { n.Lock() n.images = nil for _, image := range images { - n.images = append(n.images, &cluster.Image{Image: *image, Node: n}) + n.images = append(n.images, &Image{Image: *image, Engine: n}) } n.Unlock() return nil } -// Refresh the list and status of containers running on the node. If `full` is +// Refresh the list and status of containers running on the engine. If `full` is // true, each container will be inspected. -func (n *node) refreshContainers(full bool) error { +func (n *Engine) refreshContainers(full bool) error { containers, err := n.client.ListContainers(true, false, "") if err != nil { return err } - merged := make(map[string]*cluster.Container) + merged := make(map[string]*Container) for _, c := range containers { merged, err = n.updateContainer(c, merged, full) if err != nil { - log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Unable to update state of container %q", c.Id) + log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Unable to update state of container %q", c.Id) } } @@ -209,13 +190,13 @@ func (n *node) refreshContainers(full bool) error { defer n.Unlock() n.containers = merged - log.WithFields(log.Fields{"id": n.id, "name": n.name}).Debugf("Updated node state") + log.WithFields(log.Fields{"id": n.ID, "name": n.Name}).Debugf("Updated engine state") return nil } -// Refresh the status of a container running on the node. If `full` is true, +// Refresh the status of a container running on the engine. If `full` is true, // the container will be inspected. -func (n *node) refreshContainer(ID string, full bool) error { +func (n *Engine) refreshContainer(ID string, full bool) error { containers, err := n.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID)) if err != nil { return err @@ -227,7 +208,7 @@ func (n *node) refreshContainer(ID string, full bool) error { } if len(containers) == 0 { - // The container doesn't exist on the node, remove it. + // The container doesn't exist on the engine, remove it. n.Lock() delete(n.containers, ID) n.Unlock() @@ -239,8 +220,8 @@ func (n *node) refreshContainer(ID string, full bool) error { return err } -func (n *node) updateContainer(c dockerclient.Container, containers map[string]*cluster.Container, full bool) (map[string]*cluster.Container, error) { - var container *cluster.Container +func (n *Engine) updateContainer(c dockerclient.Container, containers map[string]*Container, full bool) (map[string]*Container, error) { + var container *Container n.RLock() if current, exists := n.containers[c.Id]; exists { @@ -248,8 +229,8 @@ func (n *node) updateContainer(c dockerclient.Container, containers map[string]* container = current } else { // This is a brand new container. We need to do a full refresh. - container = &cluster.Container{ - Node: n, + container = &Container{ + Engine: n, } full = true } @@ -279,11 +260,11 @@ func (n *node) updateContainer(c dockerclient.Container, containers map[string]* return containers, nil } -func (n *node) refreshContainersAsync() { +func (n *Engine) refreshContainersAsync() { n.ch <- true } -func (n *node) refreshLoop() { +func (n *Engine) refreshLoop() { for { var err error select { @@ -299,18 +280,18 @@ func (n *node) refreshLoop() { if err != nil { if n.healthy { - n.emitEvent("node_disconnect") + n.emitEvent("engine_disconnect") } n.healthy = false - log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Flagging node as dead. Updated state failed: %v", err) + log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Flagging engine as dead. Updated state failed: %v", err) } else { if !n.healthy { - log.WithFields(log.Fields{"name": n.name, "id": n.id}).Info("Node came back to life. Hooray!") + log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Info("Engine came back to life. Hooray!") n.client.StopAllMonitorEvents() n.client.StartMonitorEvents(n.handler, nil) - n.emitEvent("node_reconnect") + n.emitEvent("engine_reconnect") if err := n.updateSpecs(); err != nil { - log.WithFields(log.Fields{"name": n.name, "id": n.id}).Errorf("Update node specs failed: %v", err) + log.WithFields(log.Fields{"name": n.Name, "id": n.ID}).Errorf("Update engine specs failed: %v", err) } } n.healthy = true @@ -318,24 +299,24 @@ func (n *node) refreshLoop() { } } -func (n *node) emitEvent(event string) { +func (n *Engine) emitEvent(event string) { // If there is no event handler registered, abort right now. if n.eventHandler == nil { return } - ev := &cluster.Event{ + ev := &Event{ Event: dockerclient.Event{ Status: event, From: "swarm", Time: time.Now().Unix(), }, - Node: n, + Engine: n, } n.eventHandler.Handle(ev) } -// Return the sum of memory reserved by containers. -func (n *node) UsedMemory() int64 { +// UsedMemory returns the sum of memory reserved by containers. +func (n *Engine) UsedMemory() int64 { var r int64 n.RLock() for _, c := range n.containers { @@ -345,8 +326,8 @@ func (n *node) UsedMemory() int64 { return r } -// Return the sum of CPUs reserved by containers. -func (n *node) UsedCpus() int64 { +// UsedCpus returns the sum of CPUs reserved by containers. +func (n *Engine) UsedCpus() int64 { var r int64 n.RLock() for _, c := range n.containers { @@ -356,15 +337,18 @@ func (n *node) UsedCpus() int64 { return r } -func (n *node) TotalMemory() int64 { +// TotalMemory returns the total memory + overcommit +func (n *Engine) TotalMemory() int64 { return n.Memory + (n.Memory * n.overcommitRatio / 100) } -func (n *node) TotalCpus() int64 { +// TotalCpus returns the total cpus + overcommit +func (n *Engine) TotalCpus() int64 { return n.Cpus + (n.Cpus * n.overcommitRatio / 100) } -func (n *node) create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*cluster.Container, error) { +// Create a new container +func (n *Engine) Create(config *dockerclient.ContainerConfig, name string, pullImage bool) (*Container, error) { var ( err error id string @@ -382,7 +366,7 @@ func (n *node) create(config *dockerclient.ContainerConfig, name string, pullIma return nil, err } // Otherwise, try to pull the image... - if err = n.pull(config.Image); err != nil { + if err = n.Pull(config.Image); err != nil { return nil, err } // ...And try again. @@ -401,8 +385,8 @@ func (n *node) create(config *dockerclient.ContainerConfig, name string, pullIma return n.containers[id], nil } -// Destroy and remove a container from the node. -func (n *node) destroy(container *cluster.Container, force bool) error { +// Destroy and remove a container from the engine. +func (n *Engine) Destroy(container *Container, force bool) error { if err := n.client.RemoveContainer(container.Id, force, true); err != nil { return err } @@ -416,7 +400,8 @@ func (n *node) destroy(container *cluster.Container, force bool) error { return nil } -func (n *node) pull(image string) error { +// Pull an image on the node +func (n *Engine) Pull(image string) error { if !strings.Contains(image, ":") { image = image + ":latest" } @@ -426,8 +411,8 @@ func (n *node) pull(image string) error { return nil } -// Register an event handler. -func (n *node) events(h cluster.EventHandler) error { +// Events register an event handler. +func (n *Engine) Events(h EventHandler) error { if n.eventHandler != nil { return errors.New("event handler already set") } @@ -435,9 +420,9 @@ func (n *node) events(h cluster.EventHandler) error { return nil } -// Containers returns all the containers in the node. -func (n *node) Containers() []*cluster.Container { - containers := []*cluster.Container{} +// Containers returns all the containers in the engine. +func (n *Engine) Containers() []*Container { + containers := []*Container{} n.RLock() for _, container := range n.containers { containers = append(containers, container) @@ -446,8 +431,8 @@ func (n *node) Containers() []*cluster.Container { return containers } -// Container returns the container with IDOrName in the node. -func (n *node) Container(IDOrName string) *cluster.Container { +// Container returns the container with IDOrName in the engine. +func (n *Engine) Container(IDOrName string) *Container { // Abort immediately if the name is empty. if len(IDOrName) == 0 { return nil @@ -464,7 +449,7 @@ func (n *node) Container(IDOrName string) *cluster.Container { // Match name, /name or engine/name. for _, name := range container.Names { - if name == IDOrName || name == "/"+IDOrName || container.Node.ID()+name == IDOrName || container.Node.Name()+name == IDOrName { + if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName { return container } } @@ -473,9 +458,9 @@ func (n *node) Container(IDOrName string) *cluster.Container { return nil } -// Images returns all the images in the node -func (n *node) Images() []*cluster.Image { - images := []*cluster.Image{} +// Images returns all the images in the engine +func (n *Engine) Images() []*Image { + images := []*Image{} n.RLock() for _, image := range n.images { @@ -485,8 +470,8 @@ func (n *node) Images() []*cluster.Image { return images } -// Image returns the image with IDOrName in the node -func (n *node) Image(IDOrName string) *cluster.Image { +// Image returns the image with IDOrName in the engine +func (n *Engine) Image(IDOrName string) *Image { n.RLock() defer n.RUnlock() @@ -498,11 +483,11 @@ func (n *node) Image(IDOrName string) *cluster.Image { return nil } -func (n *node) String() string { - return fmt.Sprintf("node %s addr %s", n.id, n.addr) +func (n *Engine) String() string { + return fmt.Sprintf("engine %s addr %s", n.ID, n.Addr) } -func (n *node) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) { +func (n *Engine) handler(ev *dockerclient.Event, _ chan error, args ...interface{}) { // Something changed - refresh our internal state. switch ev.Status { case "pull", "untag", "delete": @@ -523,16 +508,16 @@ func (n *node) handler(ev *dockerclient.Event, _ chan error, args ...interface{} return } - event := &cluster.Event{ - Node: n, - Event: *ev, + event := &Event{ + Engine: n, + Event: *ev, } n.eventHandler.Handle(event) } -// Inject a container into the internal state. -func (n *node) addContainer(container *cluster.Container) error { +// AddContainer inject a container into the internal state. +func (n *Engine) AddContainer(container *Container) error { n.Lock() defer n.Unlock() @@ -544,7 +529,7 @@ func (n *node) addContainer(container *cluster.Container) error { } // Inject an image into the internal state. -func (n *node) addImage(image *cluster.Image) { +func (n *Engine) addImage(image *Image) { n.Lock() defer n.Unlock() @@ -552,7 +537,7 @@ func (n *node) addImage(image *cluster.Image) { } // Remove a container from the internal test. -func (n *node) removeContainer(container *cluster.Container) error { +func (n *Engine) removeContainer(container *Container) error { n.Lock() defer n.Unlock() @@ -564,8 +549,8 @@ func (n *node) removeContainer(container *cluster.Container) error { } // Wipes the internal container state. -func (n *node) cleanupContainers() { +func (n *Engine) cleanupContainers() { n.Lock() - n.containers = make(map[string]*cluster.Container) + n.containers = make(map[string]*Container) n.Unlock() } diff --git a/cluster/engine_sorter.go b/cluster/engine_sorter.go new file mode 100644 index 0000000000..49b7671784 --- /dev/null +++ b/cluster/engine_sorter.go @@ -0,0 +1,21 @@ +package cluster + +// EngineSorter implements the Sort interface to sort Cluster.Node. +// It is not guaranteed to be a stable sort. +type EngineSorter []*Engine + +// Len returns the number of nodes to be sorted. +func (s EngineSorter) Len() int { + return len(s) +} + +// Swap exchanges the node elements with indices i and j. +func (s EngineSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Less reports whether the node with index i should sort before the node with index j. +// Nodes are sorted chronologically by name. +func (s EngineSorter) Less(i, j int) bool { + return s[i].Name < s[j].Name +} diff --git a/cluster/engine_sorter_test.go b/cluster/engine_sorter_test.go new file mode 100644 index 0000000000..631b5c4f97 --- /dev/null +++ b/cluster/engine_sorter_test.go @@ -0,0 +1,18 @@ +package cluster + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNodeSorter(t *testing.T) { + nodes := []*Engine{{Name: "name1"}, {Name: "name3"}, {Name: "name2"}} + + sort.Sort(EngineSorter(nodes)) + + assert.Equal(t, nodes[0].Name, "name1") + assert.Equal(t, nodes[1].Name, "name2") + assert.Equal(t, nodes[2].Name, "name3") +} diff --git a/cluster/swarm/node_test.go b/cluster/engine_test.go similarity index 90% rename from cluster/swarm/node_test.go rename to cluster/engine_test.go index 4f1e922465..98604c750c 100644 --- a/cluster/swarm/node_test.go +++ b/cluster/engine_test.go @@ -1,4 +1,4 @@ -package swarm +package cluster import ( "errors" @@ -26,7 +26,7 @@ var ( ) func TestNodeConnectionFailure(t *testing.T) { - node := NewNode("test", 0) + node := NewEngine("test", 0) assert.False(t, node.isConnected()) // Always fail. @@ -41,7 +41,7 @@ func TestNodeConnectionFailure(t *testing.T) { } func TestOutdatedNode(t *testing.T) { - node := NewNode("test", 0) + node := NewEngine("test", 0) client := mockclient.NewMockClient() client.On("Info").Return(&dockerclient.Info{}, nil) @@ -52,7 +52,7 @@ func TestOutdatedNode(t *testing.T) { } func TestNodeCpusMemory(t *testing.T) { - node := NewNode("test", 0) + node := NewEngine("test", 0) assert.False(t, node.isConnected()) client := mockclient.NewMockClient() @@ -72,7 +72,7 @@ func TestNodeCpusMemory(t *testing.T) { } func TestNodeSpecs(t *testing.T) { - node := NewNode("test", 0) + node := NewEngine("test", 0) assert.False(t, node.isConnected()) client := mockclient.NewMockClient() @@ -87,17 +87,17 @@ func TestNodeSpecs(t *testing.T) { assert.Equal(t, node.Cpus, mockInfo.NCPU) assert.Equal(t, node.Memory, mockInfo.MemTotal) - assert.Equal(t, node.Labels()["storagedriver"], mockInfo.Driver) - assert.Equal(t, node.Labels()["executiondriver"], mockInfo.ExecutionDriver) - assert.Equal(t, node.Labels()["kernelversion"], mockInfo.KernelVersion) - assert.Equal(t, node.Labels()["operatingsystem"], mockInfo.OperatingSystem) - assert.Equal(t, node.Labels()["foo"], "bar") + assert.Equal(t, node.Labels["storagedriver"], mockInfo.Driver) + assert.Equal(t, node.Labels["executiondriver"], mockInfo.ExecutionDriver) + assert.Equal(t, node.Labels["kernelversion"], mockInfo.KernelVersion) + assert.Equal(t, node.Labels["operatingsystem"], mockInfo.OperatingSystem) + assert.Equal(t, node.Labels["foo"], "bar") client.Mock.AssertExpectations(t) } func TestNodeState(t *testing.T) { - node := NewNode("test", 0) + node := NewEngine("test", 0) assert.False(t, node.isConnected()) client := mockclient.NewMockClient() @@ -136,7 +136,7 @@ func TestNodeState(t *testing.T) { } func TestNodeContainerLookup(t *testing.T) { - node := NewNode("test-node", 0) + node := NewEngine("test-node", 0) assert.False(t, node.isConnected()) client := mockclient.NewMockClient() @@ -175,7 +175,7 @@ func TestCreateContainer(t *testing.T) { Cmd: []string{"date"}, Tty: false, } - node = NewNode("test", 0) + node = NewEngine("test", 0) client = mockclient.NewMockClient() ) @@ -196,7 +196,7 @@ func TestCreateContainer(t *testing.T) { client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once() client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once() client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once() - container, err := node.create(config, name, false) + container, err := node.Create(config, name, false) assert.Nil(t, err) assert.Equal(t, container.Id, id) assert.Len(t, node.Containers(), 1) @@ -205,7 +205,7 @@ func TestCreateContainer(t *testing.T) { name = "test2" mockConfig.CpuShares = config.CpuShares * 1024 / mockInfo.NCPU client.On("CreateContainer", &mockConfig, name).Return("", dockerclient.ErrNotFound).Once() - container, err = node.create(config, name, false) + container, err = node.Create(config, name, false) assert.Equal(t, err, dockerclient.ErrNotFound) assert.Nil(t, container) @@ -219,28 +219,28 @@ func TestCreateContainer(t *testing.T) { client.On("ListContainers", true, false, fmt.Sprintf(`{"id":[%q]}`, id)).Return([]dockerclient.Container{{Id: id}}, nil).Once() client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once() client.On("InspectContainer", id).Return(&dockerclient.ContainerInfo{Config: config}, nil).Once() - container, err = node.create(config, name, true) + container, err = node.Create(config, name, true) assert.Nil(t, err) assert.Equal(t, container.Id, id) assert.Len(t, node.Containers(), 2) } func TestTotalMemory(t *testing.T) { - node := NewNode("test", 0.05) + node := NewEngine("test", 0.05) node.Memory = 1024 assert.Equal(t, node.TotalMemory(), 1024+1024*5/100) - node = NewNode("test", 0) + node = NewEngine("test", 0) node.Memory = 1024 assert.Equal(t, node.TotalMemory(), 1024) } func TestTotalCpus(t *testing.T) { - node := NewNode("test", 0.05) + node := NewEngine("test", 0.05) node.Cpus = 2 assert.Equal(t, node.TotalCpus(), 2+2*5/100) - node = NewNode("test", 0) + node = NewEngine("test", 0) node.Cpus = 2 assert.Equal(t, node.TotalCpus(), 2) } diff --git a/cluster/event.go b/cluster/event.go index 272a76bfa3..e55ac47dda 100644 --- a/cluster/event.go +++ b/cluster/event.go @@ -5,7 +5,7 @@ import "github.com/samalba/dockerclient" // Event is exported type Event struct { dockerclient.Event - Node Node + Engine *Engine } // EventHandler is exported diff --git a/cluster/fakenode_test.go b/cluster/fakenode_test.go deleted file mode 100644 index 1098753f23..0000000000 --- a/cluster/fakenode_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package cluster - -type FakeNode struct { - name string -} - -func (fn *FakeNode) ID() string { return "" } -func (fn *FakeNode) Name() string { return fn.name } -func (fn *FakeNode) IP() string { return "" } -func (fn *FakeNode) Addr() string { return "" } -func (fn *FakeNode) Images() []*Image { return nil } -func (fn *FakeNode) Image(_ string) *Image { return nil } -func (fn *FakeNode) Containers() []*Container { return nil } -func (fn *FakeNode) Container(_ string) *Container { return nil } -func (fn *FakeNode) TotalCpus() int64 { return 0 } -func (fn *FakeNode) UsedCpus() int64 { return 0 } -func (fn *FakeNode) TotalMemory() int64 { return 0 } -func (fn *FakeNode) UsedMemory() int64 { return 0 } -func (fn *FakeNode) Labels() map[string]string { return nil } -func (fn *FakeNode) IsHealthy() bool { return true } diff --git a/cluster/image.go b/cluster/image.go index de940c8999..0552fa0e0c 100644 --- a/cluster/image.go +++ b/cluster/image.go @@ -10,7 +10,7 @@ import ( type Image struct { dockerclient.Image - Node Node + Engine *Engine } // Match is exported diff --git a/cluster/node.go b/cluster/node.go deleted file mode 100644 index a5c44b14f6..0000000000 --- a/cluster/node.go +++ /dev/null @@ -1,55 +0,0 @@ -package cluster - -import "fmt" - -// Node is exported -type Node interface { - ID() string - Name() string - - IP() string //to inject the actual IP of the machine in docker ps (hostname:port or ip:port) - Addr() string //to know where to connect with the proxy - - Images() []*Image //used by the API - Image(IDOrName string) *Image //used by the filters - Containers() []*Container //used by the filters - Container(IDOrName string) *Container //used by the filters - - TotalCpus() int64 //used by the strategy - UsedCpus() int64 //used by the strategy - TotalMemory() int64 //used by the strategy - UsedMemory() int64 //used by the strategy - - Labels() map[string]string //used by the filters - - IsHealthy() bool -} - -// SerializeNode is exported -func SerializeNode(node Node) string { - return fmt.Sprintf("{%q:%q,%q:%q,%q:%q,%q:%q}", - "Name", node.Name(), - "Id", node.ID(), - "Addr", node.Addr(), - "Ip", node.IP()) -} - -// NodeSorter implements the Sort interface to sort Cluster.Node. -// It is not guaranteed to be a stable sort. -type NodeSorter []Node - -// Len returns the number of nodes to be sorted. -func (s NodeSorter) Len() int { - return len(s) -} - -// Swap exchanges the node elements with indices i and j. -func (s NodeSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -// Less reports whether the node with index i should sort before the node with index j. -// Nodes are sorted chronologically by name. -func (s NodeSorter) Less(i, j int) bool { - return s[i].Name() < s[j].Name() -} diff --git a/cluster/node_test.go b/cluster/node_test.go deleted file mode 100644 index d92c62d17b..0000000000 --- a/cluster/node_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package cluster - -import ( - "sort" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNodeSorter(t *testing.T) { - nodes := []Node{&FakeNode{"name1"}, &FakeNode{"name3"}, &FakeNode{"name2"}} - - sort.Sort(NodeSorter(nodes)) - - assert.Equal(t, nodes[0].Name(), "name1") - assert.Equal(t, nodes[1].Name(), "name2") - assert.Equal(t, nodes[2].Name(), "name3") -} diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index b9f32d121a..7c7723951a 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -10,6 +10,7 @@ import ( "github.com/docker/swarm/cluster" "github.com/docker/swarm/discovery" "github.com/docker/swarm/scheduler" + "github.com/docker/swarm/scheduler/node" "github.com/docker/swarm/state" "github.com/samalba/dockerclient" ) @@ -19,7 +20,7 @@ type Cluster struct { sync.RWMutex eventHandler cluster.EventHandler - nodes map[string]*node + nodes map[string]*cluster.Engine scheduler *scheduler.Scheduler options *cluster.Options store *state.Store @@ -31,7 +32,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, eventhandler cluster := &Cluster{ eventHandler: eventhandler, - nodes: make(map[string]*node), + nodes: make(map[string]*cluster.Engine), scheduler: scheduler, options: options, store: store, @@ -75,8 +76,8 @@ func (c *Cluster) CreateContainer(config *dockerclient.ContainerConfig, name str return nil, err } - if nn, ok := n.(*node); ok { - container, err := nn.create(config, name, true) + if nn, ok := c.nodes[n.ID]; ok { + container, err := nn.Create(config, name, true) if err != nil { return nil, err } @@ -98,10 +99,8 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro c.scheduler.Lock() defer c.scheduler.Unlock() - if n, ok := container.Node.(*node); ok { - if err := n.destroy(container, force); err != nil { - return err - } + if err := container.Engine.Destroy(container, force); err != nil { + return err } if err := c.store.Remove(container.Id); err != nil { @@ -118,25 +117,25 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro func (c *Cluster) newEntries(entries []*discovery.Entry) { for _, entry := range entries { go func(m *discovery.Entry) { - if c.getNode(m.String()) == nil { - n := NewNode(m.String(), c.options.OvercommitRatio) - if err := n.connect(c.options.TLSConfig); err != nil { + if !c.hasNode(m.String()) { + n := cluster.NewEngine(m.String(), c.options.OvercommitRatio) + if err := n.Connect(c.options.TLSConfig); err != nil { log.Error(err) return } c.Lock() - if old, exists := c.nodes[n.id]; exists { + if old, exists := c.nodes[n.ID]; exists { c.Unlock() - if old.ip != n.ip { - log.Errorf("ID duplicated. %s shared by %s and %s", n.id, old.IP(), n.IP()) + if old.IP != n.IP { + log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP) } else { - log.Errorf("node %q is already registered", n.id) + log.Errorf("node %q is already registered", n.ID) } return } - c.nodes[n.id] = n - if err := n.events(c); err != nil { + c.nodes[n.ID] = n + if err := n.Events(c); err != nil { log.Error(err) c.Unlock() return @@ -148,13 +147,13 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) { } } -func (c *Cluster) getNode(addr string) *node { +func (c *Cluster) hasNode(addr string) bool { for _, node := range c.nodes { - if node.addr == addr { - return node + if node.Addr == addr { + return true } } - return nil + return false } // Images returns all the images in the cluster. @@ -192,10 +191,7 @@ func (c *Cluster) Image(IDOrName string) *cluster.Image { func (c *Cluster) RemoveImage(image *cluster.Image) ([]*dockerclient.ImageDelete, error) { c.Lock() defer c.Unlock() - if n, ok := image.Node.(*node); ok { - return n.removeImage(image) - } - return nil, nil + return image.Engine.RemoveImage(image) } // Pull is exported @@ -203,16 +199,16 @@ func (c *Cluster) Pull(name string, callback func(what, status string)) { size := len(c.nodes) done := make(chan bool, size) for _, n := range c.nodes { - go func(nn *node) { + go func(nn *cluster.Engine) { if callback != nil { - callback(nn.Name(), "") + callback(nn.Name, "") } - err := nn.pull(name) + err := nn.Pull(name) if callback != nil { if err != nil { - callback(nn.Name(), err.Error()) + callback(nn.Name, err.Error()) } else { - callback(nn.Name(), "downloaded") + callback(nn.Name, "downloaded") } } done <- true @@ -254,16 +250,42 @@ func (c *Cluster) Container(IDOrName string) *cluster.Container { return nil } -// nodes returns all the nodes in the cluster. -func (c *Cluster) listNodes() []cluster.Node { +// listNodes returns all the nodes in the cluster. +func (c *Cluster) listNodes() []*node.Node { c.RLock() defer c.RUnlock() - out := []cluster.Node{} + out := []*node.Node{} + for _, n := range c.nodes { + out = append(out, &node.Node{ + ID: n.ID, + IP: n.IP, + Addr: n.Addr, + Name: n.Name, + Cpus: n.Cpus, + Labels: n.Labels, + Containers: n.Containers(), + Images: n.Images(), + UsedMemory: n.UsedMemory(), + UsedCpus: n.UsedCpus(), + TotalMemory: n.TotalMemory(), + TotalCpus: n.TotalCpus(), + IsHealthy: n.IsHealthy(), + }) + } + + return out +} + +// listEngines returns all the engines in the cluster. +func (c *Cluster) listEngines() []*cluster.Engine { + c.RLock() + defer c.RUnlock() + + out := []*cluster.Engine{} for _, n := range c.nodes { out = append(out, n) } - return out } @@ -275,11 +297,11 @@ func (c *Cluster) Info() [][2]string { {"\bNodes", fmt.Sprintf("%d", len(c.nodes))}, } - nodes := c.listNodes() - sort.Sort(cluster.NodeSorter(nodes)) + nodes := c.listEngines() + sort.Sort(cluster.EngineSorter(nodes)) - for _, node := range nodes { - info = append(info, [2]string{node.Name(), node.Addr()}) + for _, node := range c.nodes { + info = append(info, [2]string{node.Name, node.Addr}) info = append(info, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))}) info = append(info, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.UsedCpus(), node.TotalCpus())}) info = append(info, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.UsedMemory())), units.BytesSize(float64(node.TotalMemory())))}) diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index 9315ba20ca..d038c0ff37 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -8,13 +8,13 @@ import ( "github.com/stretchr/testify/assert" ) -func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *node { - node := NewNode(ID, 0) - node.name = ID - node.id = ID +func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *cluster.Engine { + node := cluster.NewEngine(ID, 0) + node.Name = ID + node.ID = ID for _, container := range containers { - node.addContainer(&cluster.Container{Container: container, Node: node}) + node.AddContainer(&cluster.Container{Container: container, Engine: node}) } return node @@ -22,7 +22,7 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) * func TestContainerLookup(t *testing.T) { c := &Cluster{ - nodes: make(map[string]*node), + nodes: make(map[string]*cluster.Engine), } container := dockerclient.Container{ Id: "container-id", @@ -30,7 +30,7 @@ func TestContainerLookup(t *testing.T) { } n := createNode(t, "test-node", container) - c.nodes[n.ID()] = n + c.nodes[n.ID] = n // Invalid lookup assert.Nil(t, c.Container("invalid-id")) diff --git a/manage.go b/manage.go index 5c94c60d44..932a30f116 100644 --- a/manage.go +++ b/manage.go @@ -27,7 +27,7 @@ func (h *logHandler) Handle(e *cluster.Event) error { if len(id) > 12 { id = id[:12] } - log.WithFields(log.Fields{"node": e.Node.Name, "id": id, "from": e.From, "status": e.Status}).Debug("Event received") + log.WithFields(log.Fields{"node": e.Engine.Name, "id": id, "from": e.From, "status": e.Status}).Debug("Event received") return nil } diff --git a/scheduler/filter/affinity.go b/scheduler/filter/affinity.go index ba0bd7d0e0..aa22960c44 100644 --- a/scheduler/filter/affinity.go +++ b/scheduler/filter/affinity.go @@ -5,7 +5,7 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -19,7 +19,7 @@ func (f *AffinityFilter) Name() string { } // Filter is exported -func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { +func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { affinities, err := parseExprs("affinity", config.Env) if err != nil { return nil, err @@ -28,12 +28,12 @@ func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cl for _, affinity := range affinities { log.Debugf("matching affinity: %s%s%s", affinity.key, OPERATORS[affinity.operator], affinity.value) - candidates := []cluster.Node{} + candidates := []*node.Node{} for _, node := range nodes { switch affinity.key { case "container": containers := []string{} - for _, container := range node.Containers() { + for _, container := range node.Containers { containers = append(containers, container.Id, strings.TrimPrefix(container.Names[0], "/")) } if affinity.Match(containers...) { @@ -41,7 +41,7 @@ func (f *AffinityFilter) Filter(config *dockerclient.ContainerConfig, nodes []cl } case "image": images := []string{} - for _, image := range node.Images() { + for _, image := range node.Images { images = append(images, image.Id) images = append(images, image.RepoTags...) for _, tag := range image.RepoTags { diff --git a/scheduler/filter/affinity_test.go b/scheduler/filter/affinity_test.go index 9117c07f52..51cc1d80ff 100644 --- a/scheduler/filter/affinity_test.go +++ b/scheduler/filter/affinity_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" ) @@ -11,12 +12,12 @@ import ( func TestAffinityFilter(t *testing.T) { var ( f = AffinityFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", - containers: []*cluster.Container{ + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", + Containers: []*cluster.Container{ {Container: dockerclient.Container{ Id: "container-n0-0-id", Names: []string{"/container-n0-0-name"}, @@ -26,16 +27,16 @@ func TestAffinityFilter(t *testing.T) { Names: []string{"/container-n0-1-name"}, }}, }, - images: []*cluster.Image{{Image: dockerclient.Image{ + Images: []*cluster.Image{{Image: dockerclient.Image{ Id: "image-0-id", RepoTags: []string{"image-0:tag1", "image-0:tag2"}, }}}, }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", - containers: []*cluster.Container{ + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", + Containers: []*cluster.Container{ {Container: dockerclient.Container{ Id: "container-n1-0-id", Names: []string{"/container-n1-0-name"}, @@ -45,18 +46,18 @@ func TestAffinityFilter(t *testing.T) { Names: []string{"/container-n1-1-name"}, }}, }, - images: []*cluster.Image{{Image: dockerclient.Image{ + Images: []*cluster.Image{{Image: dockerclient.Image{ Id: "image-1-id", RepoTags: []string{"image-1:tag1", "image-0:tag3", "image-1:tag2"}, }}}, }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error ) diff --git a/scheduler/filter/constraint.go b/scheduler/filter/constraint.go index 5992123dfb..b5cf0a778f 100644 --- a/scheduler/filter/constraint.go +++ b/scheduler/filter/constraint.go @@ -4,7 +4,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -18,7 +18,7 @@ func (f *ConstraintFilter) Name() string { } // Filter is exported -func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { +func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { constraints, err := parseExprs("constraint", config.Env) if err != nil { return nil, err @@ -27,16 +27,16 @@ func (f *ConstraintFilter) Filter(config *dockerclient.ContainerConfig, nodes [] for _, constraint := range constraints { log.Debugf("matching constraint: %s %s %s", constraint.key, OPERATORS[constraint.operator], constraint.value) - candidates := []cluster.Node{} + candidates := []*node.Node{} for _, node := range nodes { switch constraint.key { case "node": // "node" label is a special case pinning a container to a specific node. - if constraint.Match(node.ID(), node.Name()) { + if constraint.Match(node.ID, node.Name) { candidates = append(candidates, node) } default: - if constraint.Match(node.Labels()[constraint.key]) { + if constraint.Match(node.Labels[constraint.key]) { candidates = append(candidates, node) } } diff --git a/scheduler/filter/constraint_test.go b/scheduler/filter/constraint_test.go index 8a5fb8a924..cc14d24924 100644 --- a/scheduler/filter/constraint_test.go +++ b/scheduler/filter/constraint_test.go @@ -3,50 +3,50 @@ package filter import ( "testing" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" ) -func testFixtures() []cluster.Node { - return []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", - labels: map[string]string{ +func testFixtures() []*node.Node { + return []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", + Labels: map[string]string{ "name": "node0", "group": "1", "region": "us-west", }, }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", - labels: map[string]string{ + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", + Labels: map[string]string{ "name": "node1", "group": "1", "region": "us-east", }, }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", - labels: map[string]string{ + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", + Labels: map[string]string{ "name": "node2", "group": "2", "region": "eu", }, }, - &FakeNode{ - id: "node-3-id", - name: "node-3-name", - addr: "node-3", + { + ID: "node-3-id", + Name: "node-3-name", + Addr: "node-3", }, } } @@ -55,7 +55,7 @@ func TestConstrainteFilter(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) @@ -116,7 +116,7 @@ func TestConstraintNotExpr(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) @@ -145,7 +145,7 @@ func TestConstraintRegExp(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) @@ -174,17 +174,15 @@ func TestFilterRegExpCaseInsensitive(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) // Prepare node with a strange name - if n, ok := nodes[3].(*FakeNode); ok { - n.labels = map[string]string{ - "name": "aBcDeF", - "group": "2", - "region": "eu", - } + nodes[3].Labels = map[string]string{ + "name": "aBcDeF", + "group": "2", + "region": "eu", } // Case-sensitive, so not match @@ -197,7 +195,7 @@ func TestFilterRegExpCaseInsensitive(t *testing.T) { assert.NoError(t, err) assert.Len(t, result, 1) assert.Equal(t, result[0], nodes[3]) - assert.Equal(t, result[0].Labels()["name"], "aBcDeF") + assert.Equal(t, result[0].Labels["name"], "aBcDeF") // Test ! filter combined with case insensitive result, err = f.Filter(&dockerclient.ContainerConfig{Env: []string{`constraint:name!=/(?i)abc*/`}}, nodes) @@ -209,7 +207,7 @@ func TestFilterEquals(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) @@ -234,7 +232,7 @@ func TestUnsupportedOperators(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) @@ -251,7 +249,7 @@ func TestFilterSoftConstraint(t *testing.T) { var ( f = ConstraintFilter{} nodes = testFixtures() - result []cluster.Node + result []*node.Node err error ) diff --git a/scheduler/filter/dependency.go b/scheduler/filter/dependency.go index 3bd0113d2d..f94183a367 100644 --- a/scheduler/filter/dependency.go +++ b/scheduler/filter/dependency.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -18,7 +18,7 @@ func (f *DependencyFilter) Name() string { } // Filter is exported -func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { +func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { if len(nodes) == 0 { return nodes, nil } @@ -35,7 +35,7 @@ func (f *DependencyFilter) Filter(config *dockerclient.ContainerConfig, nodes [] net = append(net, strings.TrimPrefix(config.HostConfig.NetworkMode, "container:")) } - candidates := []cluster.Node{} + candidates := []*node.Node{} for _, node := range nodes { if f.check(config.HostConfig.VolumesFrom, node) && f.check(links, node) && @@ -67,7 +67,7 @@ func (f *DependencyFilter) String(config *dockerclient.ContainerConfig) string { } // Ensure that the node contains all dependent containers. -func (f *DependencyFilter) check(dependencies []string, node cluster.Node) bool { +func (f *DependencyFilter) check(dependencies []string, node *node.Node) bool { for _, dependency := range dependencies { if node.Container(dependency) == nil { return false diff --git a/scheduler/filter/dependency_test.go b/scheduler/filter/dependency_test.go index 46870ef763..b0bbeddb0f 100644 --- a/scheduler/filter/dependency_test.go +++ b/scheduler/filter/dependency_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" ) @@ -11,29 +12,29 @@ import ( func TestDependencyFilterSimple(t *testing.T) { var ( f = DependencyFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", - containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}}, + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", + Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}}, }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", - containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}}, + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", + Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}}, }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", - containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", + Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, }, } - result []cluster.Node + result []*node.Node err error config *dockerclient.ContainerConfig ) @@ -83,34 +84,34 @@ func TestDependencyFilterSimple(t *testing.T) { func TestDependencyFilterMulti(t *testing.T) { var ( f = DependencyFilter{} - nodes = []cluster.Node{ + nodes = []*node.Node{ // nodes[0] has c0 and c1 - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", - containers: []*cluster.Container{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", + Containers: []*cluster.Container{ {Container: dockerclient.Container{Id: "c0"}}, {Container: dockerclient.Container{Id: "c1"}}, }, }, // nodes[1] has c2 - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", - containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", + Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, }, // nodes[2] has nothing - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error config *dockerclient.ContainerConfig ) @@ -153,34 +154,34 @@ func TestDependencyFilterMulti(t *testing.T) { func TestDependencyFilterChaining(t *testing.T) { var ( f = DependencyFilter{} - nodes = []cluster.Node{ + nodes = []*node.Node{ // nodes[0] has c0 and c1 - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", - containers: []*cluster.Container{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", + Containers: []*cluster.Container{ {Container: dockerclient.Container{Id: "c0"}}, {Container: dockerclient.Container{Id: "c1"}}, }, }, // nodes[1] has c2 - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", - containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", + Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}}, }, // nodes[2] has nothing - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error config *dockerclient.ContainerConfig ) diff --git a/scheduler/filter/fakenode_test.go b/scheduler/filter/fakenode_test.go deleted file mode 100644 index 54dea30915..0000000000 --- a/scheduler/filter/fakenode_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package filter - -import "github.com/docker/swarm/cluster" - -type FakeNode struct { - id string - name string - addr string - containers []*cluster.Container - images []*cluster.Image - labels map[string]string -} - -func (fn *FakeNode) ID() string { return fn.id } -func (fn *FakeNode) Name() string { return fn.name } -func (fn *FakeNode) IP() string { return "" } -func (fn *FakeNode) Addr() string { return fn.addr } -func (fn *FakeNode) Images() []*cluster.Image { return fn.images } -func (fn *FakeNode) Image(id string) *cluster.Image { - for _, image := range fn.images { - if image.Id == id { - return image - } - } - return nil -} -func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers } -func (fn *FakeNode) Container(id string) *cluster.Container { - for _, container := range fn.containers { - if container.Id == id { - return container - } - } - return nil -} -func (fn *FakeNode) TotalCpus() int64 { return 0 } -func (fn *FakeNode) UsedCpus() int64 { return 0 } -func (fn *FakeNode) TotalMemory() int64 { return 0 } -func (fn *FakeNode) UsedMemory() int64 { return 0 } -func (fn *FakeNode) Labels() map[string]string { return fn.labels } -func (fn *FakeNode) IsHealthy() bool { return true } - -func (fn *FakeNode) AddContainer(container *cluster.Container) error { - fn.containers = append(fn.containers, container) - return nil -} diff --git a/scheduler/filter/filter.go b/scheduler/filter/filter.go index 36bd3074ac..df2afbc1e2 100644 --- a/scheduler/filter/filter.go +++ b/scheduler/filter/filter.go @@ -4,7 +4,7 @@ import ( "errors" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -13,7 +13,7 @@ type Filter interface { Name() string // Return a subset of nodes that were accepted by the filtering policy. - Filter(*dockerclient.ContainerConfig, []cluster.Node) ([]cluster.Node, error) + Filter(*dockerclient.ContainerConfig, []*node.Node) ([]*node.Node, error) } var ( @@ -54,7 +54,7 @@ func New(names []string) ([]Filter, error) { } // ApplyFilters applies a set of filters in batch. -func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { +func ApplyFilters(filters []Filter, config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { var err error for _, filter := range filters { diff --git a/scheduler/filter/health.go b/scheduler/filter/health.go index 63cf35bd69..b0148107d0 100644 --- a/scheduler/filter/health.go +++ b/scheduler/filter/health.go @@ -3,7 +3,7 @@ package filter import ( "errors" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -22,10 +22,10 @@ func (f *HealthFilter) Name() string { } // Filter is exported -func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { - result := []cluster.Node{} +func (f *HealthFilter) Filter(_ *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { + result := []*node.Node{} for _, node := range nodes { - if node.IsHealthy() { + if node.IsHealthy { result = append(result, node) } } diff --git a/scheduler/filter/port.go b/scheduler/filter/port.go index 4b141fe3f6..686b8e48a3 100644 --- a/scheduler/filter/port.go +++ b/scheduler/filter/port.go @@ -3,7 +3,7 @@ package filter import ( "fmt" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -19,10 +19,10 @@ func (p *PortFilter) Name() string { } // Filter is exported -func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluster.Node) ([]cluster.Node, error) { +func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) { for _, port := range config.HostConfig.PortBindings { for _, binding := range port { - candidates := []cluster.Node{} + candidates := []*node.Node{} for _, node := range nodes { if !p.portAlreadyInUse(node, binding) { candidates = append(candidates, node) @@ -37,8 +37,8 @@ func (p *PortFilter) Filter(config *dockerclient.ContainerConfig, nodes []cluste return nodes, nil } -func (p *PortFilter) portAlreadyInUse(node cluster.Node, requested dockerclient.PortBinding) bool { - for _, c := range node.Containers() { +func (p *PortFilter) portAlreadyInUse(node *node.Node, requested dockerclient.PortBinding) bool { + for _, c := range node.Containers { // HostConfig.PortBindings contains the requested ports. // NetworkSettings.Ports contains the actual ports. // diff --git a/scheduler/filter/port_test.go b/scheduler/filter/port_test.go index 06e75d2da5..11eb9d151d 100644 --- a/scheduler/filter/port_test.go +++ b/scheduler/filter/port_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" ) @@ -23,24 +24,24 @@ func makeBinding(ip, port string) map[string][]dockerclient.PortBinding { func TestPortFilterNoConflicts(t *testing.T) { var ( p = PortFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error ) @@ -66,9 +67,7 @@ func TestPortFilterNoConflicts(t *testing.T) { // Add a container taking a different (4242) port. container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "4242")}}} - if n, ok := nodes[0].(*FakeNode); ok { - assert.NoError(t, n.AddContainer(container)) - } + assert.NoError(t, nodes[0].AddContainer(container)) // Since no node is using port 80, there should be no filter result, err = p.Filter(config, nodes) @@ -79,32 +78,30 @@ func TestPortFilterNoConflicts(t *testing.T) { func TestPortFilterSimple(t *testing.T) { var ( p = PortFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error ) // Add a container taking away port 80 to nodes[0]. container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}} - if n, ok := nodes[0].(*FakeNode); ok { - assert.NoError(t, n.AddContainer(container)) - } + assert.NoError(t, nodes[0].AddContainer(container)) // Request port 80. config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{ @@ -120,32 +117,30 @@ func TestPortFilterSimple(t *testing.T) { func TestPortFilterDifferentInterfaces(t *testing.T) { var ( p = PortFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error ) // Add a container taking away port 80 on every interface to nodes[0]. container := &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("", "80")}}} - if n, ok := nodes[0].(*FakeNode); ok { - assert.NoError(t, n.AddContainer(container)) - } + assert.NoError(t, nodes[0].AddContainer(container)) // Request port 80 for the local interface. config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{ @@ -161,9 +156,7 @@ func TestPortFilterDifferentInterfaces(t *testing.T) { // Add a container taking away port 4242 on the local interface of // nodes[1]. container = &cluster.Container{Container: dockerclient.Container{Id: "c1"}, Info: dockerclient.ContainerInfo{HostConfig: &dockerclient.HostConfig{PortBindings: makeBinding("127.0.0.1", "4242")}}} - if n, ok := nodes[1].(*FakeNode); ok { - assert.NoError(t, n.AddContainer(container)) - } + assert.NoError(t, nodes[1].AddContainer(container)) // Request port 4242 on the same interface. config = &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{ PortBindings: makeBinding("127.0.0.1", "4242"), @@ -206,24 +199,24 @@ func TestPortFilterDifferentInterfaces(t *testing.T) { func TestPortFilterRandomAssignment(t *testing.T) { var ( p = PortFilter{} - nodes = []cluster.Node{ - &FakeNode{ - id: "node-0-id", - name: "node-0-name", - addr: "node-0", + nodes = []*node.Node{ + { + ID: "node-0-id", + Name: "node-0-name", + Addr: "node-0", }, - &FakeNode{ - id: "node-1-id", - name: "node-1-name", - addr: "node-1", + { + ID: "node-1-id", + Name: "node-1-name", + Addr: "node-1", }, - &FakeNode{ - id: "node-2-id", - name: "node-2-name", - addr: "node-2", + { + ID: "node-2-id", + Name: "node-2-name", + Addr: "node-2", }, } - result []cluster.Node + result []*node.Node err error ) @@ -261,9 +254,7 @@ func TestPortFilterRandomAssignment(t *testing.T) { }, } - if n, ok := nodes[0].(*FakeNode); ok { - assert.NoError(t, n.AddContainer(container)) - } + assert.NoError(t, nodes[0].AddContainer(container)) // Request port 80. config := &dockerclient.ContainerConfig{HostConfig: dockerclient.HostConfig{ diff --git a/scheduler/node/node.go b/scheduler/node/node.go new file mode 100644 index 0000000000..8d58a0f98a --- /dev/null +++ b/scheduler/node/node.go @@ -0,0 +1,67 @@ +package node + +import ( + "errors" + "strings" + + "github.com/docker/swarm/cluster" +) + +// Node is an abstract type used by the scheduler +type Node struct { + ID string + IP string + Addr string + Name string + Cpus int64 + Memory int64 + Labels map[string]string + Containers []*cluster.Container + Images []*cluster.Image + + UsedMemory int64 + UsedCpus int64 + TotalMemory int64 + TotalCpus int64 + + IsHealthy bool +} + +// Container returns the container with IDOrName in the engine. +func (n *Node) Container(IDOrName string) *cluster.Container { + // Abort immediately if the name is empty. + if len(IDOrName) == 0 { + return nil + } + + for _, container := range n.Containers { + // Match ID prefix. + if strings.HasPrefix(container.Id, IDOrName) { + return container + } + + // Match name, /name or engine/name. + for _, name := range container.Names { + if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName { + return container + } + } + } + + return nil +} + +// AddContainer inject a container into the internal state. +func (n *Node) AddContainer(container *cluster.Container) error { + if container.Info.Config != nil { + memory := container.Info.Config.Memory + cpus := container.Info.Config.CpuShares + if n.TotalMemory-memory < 0 || n.TotalCpus-cpus < 0 { + return errors.New("not enough resources") + } + n.UsedMemory = n.UsedMemory + memory + n.UsedCpus = n.UsedCpus + cpus + } + n.Containers = append(n.Containers, container) + return nil +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 95ad74f74e..c85763ecda 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -4,8 +4,8 @@ import ( "strings" "sync" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/scheduler/filter" + "github.com/docker/swarm/scheduler/node" "github.com/docker/swarm/scheduler/strategy" "github.com/samalba/dockerclient" ) @@ -27,7 +27,7 @@ func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Schedule } // SelectNodeForContainer will find a nice home for our container. -func (s *Scheduler) SelectNodeForContainer(nodes []cluster.Node, config *dockerclient.ContainerConfig) (cluster.Node, error) { +func (s *Scheduler) SelectNodeForContainer(nodes []*node.Node, config *dockerclient.ContainerConfig) (*node.Node, error) { accepted, err := filter.ApplyFilters(s.filters, config, nodes) if err != nil { return nil, err diff --git a/scheduler/strategy/binpack.go b/scheduler/strategy/binpack.go index 45c2e8257d..c168b564ac 100644 --- a/scheduler/strategy/binpack.go +++ b/scheduler/strategy/binpack.go @@ -3,7 +3,7 @@ package strategy import ( "sort" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -22,7 +22,7 @@ func (p *BinpackPlacementStrategy) Name() string { } // PlaceContainer is exported -func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) { +func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) { weightedNodes, err := weighNodes(config, nodes) if err != nil { return nil, err @@ -36,7 +36,7 @@ func (p *BinpackPlacementStrategy) PlaceContainer(config *dockerclient.Container if node.Weight != topNode.Weight { break } - if len(node.Node.Containers()) > len(topNode.Node.Containers()) { + if len(node.Node.Containers) > len(topNode.Node.Containers) { topNode = node } } diff --git a/scheduler/strategy/binpack_test.go b/scheduler/strategy/binpack_test.go index 1e45651e46..593f355ece 100644 --- a/scheduler/strategy/binpack_test.go +++ b/scheduler/strategy/binpack_test.go @@ -5,18 +5,20 @@ import ( "testing" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" "github.com/stretchr/testify/assert" ) -func createNode(ID string, memory int64, cpus int64) cluster.Node { +func createNode(ID string, memory int64, cpus int64) *node.Node { oc := 0.05 memory = int64(float64(memory) + float64(memory)*oc) - return &FakeNode{ - id: ID, - addr: ID, - memory: memory * 1024 * 1024 * 1024, - cpus: cpus, + return &node.Node{ + ID: ID, + IP: ID, + Addr: ID, + TotalMemory: memory * 1024 * 1024 * 1024, + TotalCpus: cpus, } } @@ -31,39 +33,39 @@ func createContainer(ID string, config *dockerclient.ContainerConfig) *cluster.C func TestPlaceEqualWeight(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 0)) } // add 1 container 2G on node1 config := createConfig(2, 0) - assert.NoError(t, AddContainer(nodes[0], createContainer("c1", config))) - assert.Equal(t, nodes[0].UsedMemory(), 2*1024*1024*1024) + assert.NoError(t, nodes[0].AddContainer(createContainer("c1", config))) + assert.Equal(t, nodes[0].UsedMemory, 2*1024*1024*1024) // add 2 containers 1G on node2 config = createConfig(1, 0) - assert.NoError(t, AddContainer(nodes[1], createContainer("c2", config))) - assert.NoError(t, AddContainer(nodes[1], createContainer("c3", config))) - assert.Equal(t, nodes[1].UsedMemory(), int64(2*1024*1024*1024)) + assert.NoError(t, nodes[1].AddContainer(createContainer("c2", config))) + assert.NoError(t, nodes[1].AddContainer(createContainer("c3", config))) + assert.Equal(t, nodes[1].UsedMemory, int64(2*1024*1024*1024)) // add another container 1G config = createConfig(1, 0) node, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer("c4", config))) - assert.Equal(t, node.UsedMemory(), 3*1024*1024*1024) + assert.NoError(t, node.AddContainer(createContainer("c4", config))) + assert.Equal(t, node.UsedMemory, 3*1024*1024*1024) // check that the last container ended on the node with the highest number of containers - assert.Equal(t, node.ID(), nodes[1].ID()) - assert.NotEqual(t, len(nodes[0].Containers()), len(nodes[1].Containers())) + assert.Equal(t, node.ID, nodes[1].ID) + assert.NotEqual(t, len(nodes[0].Containers), len(nodes[1].Containers)) } func TestPlaceContainerMemory(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 0)) } @@ -72,25 +74,25 @@ func TestPlaceContainerMemory(t *testing.T) { config := createConfig(1, 0) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) - assert.Equal(t, node1.UsedMemory(), 1024*1024*1024) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) + assert.Equal(t, node1.UsedMemory, 1024*1024*1024) // add another container 1G config = createConfig(1, 0) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) - assert.Equal(t, node2.UsedMemory(), int64(2*1024*1024*1024)) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) + assert.Equal(t, node2.UsedMemory, int64(2*1024*1024*1024)) // check that both containers ended on the same node - assert.Equal(t, node1.ID(), node2.ID()) - assert.Equal(t, len(node1.Containers()), len(node2.Containers())) + assert.Equal(t, node1.ID, node2.ID) + assert.Equal(t, len(node1.Containers), len(node2.Containers)) } func TestPlaceContainerCPU(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 0, 2)) } @@ -99,25 +101,25 @@ func TestPlaceContainerCPU(t *testing.T) { config := createConfig(0, 1) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) - assert.Equal(t, node1.UsedCpus(), 1) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) + assert.Equal(t, node1.UsedCpus, 1) // add another container 1CPU config = createConfig(0, 1) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) - assert.Equal(t, node2.UsedCpus(), 2) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) + assert.Equal(t, node2.UsedCpus, 2) // check that both containers ended on the same node - assert.Equal(t, node1.ID(), node2.ID()) - assert.Equal(t, len(node1.Containers()), len(node2.Containers())) + assert.Equal(t, node1.ID, node2.ID) + assert.Equal(t, len(node1.Containers), len(node2.Containers)) } func TestPlaceContainerHuge(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 100; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 1)) } @@ -126,7 +128,7 @@ func TestPlaceContainerHuge(t *testing.T) { for i := 0; i < 100; i++ { node, err := s.PlaceContainer(createConfig(0, 1), nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1)))) + assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1)))) } // try to add another container 1CPU @@ -137,7 +139,7 @@ func TestPlaceContainerHuge(t *testing.T) { for i := 100; i < 200; i++ { node, err := s.PlaceContainer(createConfig(1, 0), nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0)))) + assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0)))) } // try to add another container 1G @@ -149,7 +151,7 @@ func TestPlaceContainerOvercommit(t *testing.T) { s, err := New("binpacking") assert.NoError(t, err) - nodes := []cluster.Node{createNode("node-1", 100, 1)} + nodes := []*node.Node{createNode("node-1", 100, 1)} config := createConfig(0, 0) @@ -181,7 +183,7 @@ func TestPlaceContainerOvercommit(t *testing.T) { func TestPlaceContainerDemo(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 3; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 4)) } @@ -197,44 +199,44 @@ func TestPlaceContainerDemo(t *testing.T) { config = createConfig(1, 0) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) // add another container 1G config = createConfig(1, 0) node1bis, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1bis, createContainer("c2", config))) + assert.NoError(t, node1bis.AddContainer(createContainer("c2", config))) // check that both containers ended on the same node - assert.Equal(t, node1.ID(), node1bis.ID()) - assert.Equal(t, len(node1.Containers()), len(node1bis.Containers())) + assert.Equal(t, node1.ID, node1bis.ID) + assert.Equal(t, len(node1.Containers), len(node1bis.Containers)) // add another container 2G config = createConfig(2, 0) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c3", config))) + assert.NoError(t, node2.AddContainer(createContainer("c3", config))) // check that it ends up on another node - assert.NotEqual(t, node1.ID(), node2.ID()) + assert.NotEqual(t, node1.ID, node2.ID) // add another container 1G config = createConfig(1, 0) node3, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node3, createContainer("c4", config))) + assert.NoError(t, node3.AddContainer(createContainer("c4", config))) // check that it ends up on another node - assert.NotEqual(t, node1.ID(), node3.ID()) - assert.NotEqual(t, node2.ID(), node3.ID()) + assert.NotEqual(t, node1.ID, node3.ID) + assert.NotEqual(t, node2.ID, node3.ID) // add another container 1G config = createConfig(1, 0) node3bis, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node3bis, createContainer("c5", config))) + assert.NoError(t, node3bis.AddContainer(createContainer("c5", config))) // check that it ends up on the same node - assert.Equal(t, node3.ID(), node3bis.ID()) + assert.Equal(t, node3.ID, node3bis.ID) // try to add another container config = createConfig(1, 0) @@ -244,27 +246,25 @@ func TestPlaceContainerDemo(t *testing.T) { assert.Error(t, err) // remove container in the middle - if n, ok := node2.(*FakeNode); ok { - n.containers = nil - n.usedmemory = 0 - n.usedcpus = 0 - } + node2.Containers = nil + node2.UsedMemory = 0 + node2.UsedCpus = 0 // add another container config = createConfig(1, 0) node2bis, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2bis, createContainer("c6", config))) + assert.NoError(t, node2bis.AddContainer(createContainer("c6", config))) // check it ends up on `node3` - assert.Equal(t, node2.ID(), node2bis.ID()) - assert.Equal(t, len(node2.Containers()), len(node2bis.Containers())) + assert.Equal(t, node2.ID, node2bis.ID) + assert.Equal(t, len(node2.Containers), len(node2bis.Containers)) } func TestComplexPlacement(t *testing.T) { s := &BinpackPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 4)) } @@ -273,23 +273,23 @@ func TestComplexPlacement(t *testing.T) { config := createConfig(2, 0) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) // add one container 3G config = createConfig(3, 0) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) // check that they end up on separate nodes - assert.NotEqual(t, node1.ID(), node2.ID()) + assert.NotEqual(t, node1.ID, node2.ID) // add one container 1G config = createConfig(1, 0) node3, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node3, createContainer("c3", config))) + assert.NoError(t, node3.AddContainer(createContainer("c3", config))) // check that it ends up on the same node as the 3G - assert.Equal(t, node2.ID(), node3.ID()) + assert.Equal(t, node2.ID, node3.ID) } diff --git a/scheduler/strategy/fakenode_test.go b/scheduler/strategy/fakenode_test.go deleted file mode 100644 index bd5c997c08..0000000000 --- a/scheduler/strategy/fakenode_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package strategy - -import ( - "errors" - - "github.com/docker/swarm/cluster" -) - -type FakeNode struct { - id string - name string - addr string - memory int64 - usedmemory int64 - cpus int64 - usedcpus int64 - containers []*cluster.Container -} - -func (fn *FakeNode) ID() string { return fn.id } -func (fn *FakeNode) Name() string { return fn.name } -func (fn *FakeNode) IP() string { return "" } -func (fn *FakeNode) Addr() string { return fn.addr } -func (fn *FakeNode) Images() []*cluster.Image { return nil } -func (fn *FakeNode) Image(_ string) *cluster.Image { return nil } -func (fn *FakeNode) Containers() []*cluster.Container { return fn.containers } -func (fn *FakeNode) Container(_ string) *cluster.Container { return nil } -func (fn *FakeNode) TotalCpus() int64 { return fn.cpus } -func (fn *FakeNode) UsedCpus() int64 { return fn.usedcpus } -func (fn *FakeNode) TotalMemory() int64 { return fn.memory } -func (fn *FakeNode) UsedMemory() int64 { return fn.usedmemory } -func (fn *FakeNode) Labels() map[string]string { return nil } -func (fn *FakeNode) IsHealthy() bool { return true } - -func (fn *FakeNode) AddContainer(container *cluster.Container) error { - memory := container.Info.Config.Memory - cpus := container.Info.Config.CpuShares - if fn.memory-memory < 0 || fn.cpus-cpus < 0 { - return errors.New("not enough resources") - } - fn.usedmemory = fn.usedmemory + memory - fn.usedcpus = fn.usedcpus + cpus - - fn.containers = append(fn.containers, container) - return nil -} - -func AddContainer(node cluster.Node, container *cluster.Container) error { - if n, ok := node.(*FakeNode); ok { - return n.AddContainer(container) - } - return errors.New("Not a FakeNode") -} diff --git a/scheduler/strategy/random.go b/scheduler/strategy/random.go index 3dfe4505e2..bc5a240b75 100644 --- a/scheduler/strategy/random.go +++ b/scheduler/strategy/random.go @@ -5,7 +5,7 @@ import ( "math/rand" "time" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -24,7 +24,7 @@ func (p *RandomPlacementStrategy) Name() string { } // PlaceContainer is exported -func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) { +func (p *RandomPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) { if size := len(nodes); size > 0 { return nodes[rand.Intn(size)], nil } diff --git a/scheduler/strategy/spread.go b/scheduler/strategy/spread.go index e7d75018c8..5702c97f12 100644 --- a/scheduler/strategy/spread.go +++ b/scheduler/strategy/spread.go @@ -3,7 +3,7 @@ package strategy import ( "sort" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -22,7 +22,7 @@ func (p *SpreadPlacementStrategy) Name() string { } // PlaceContainer is exported -func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) { +func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) { weightedNodes, err := weighNodes(config, nodes) if err != nil { return nil, err @@ -36,7 +36,7 @@ func (p *SpreadPlacementStrategy) PlaceContainer(config *dockerclient.ContainerC if node.Weight != bottomNode.Weight { break } - if len(node.Node.Containers()) < len(bottomNode.Node.Containers()) { + if len(node.Node.Containers) < len(bottomNode.Node.Containers) { bottomNode = node } } diff --git a/scheduler/strategy/spread_test.go b/scheduler/strategy/spread_test.go index d33d1af206..e7be210e02 100644 --- a/scheduler/strategy/spread_test.go +++ b/scheduler/strategy/spread_test.go @@ -4,46 +4,46 @@ import ( "fmt" "testing" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/stretchr/testify/assert" ) func TestSpreadPlaceEqualWeight(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 0)) } // add 1 container 2G on node1 config := createConfig(2, 0) - assert.NoError(t, AddContainer(nodes[0], createContainer("c1", config))) - assert.Equal(t, nodes[0].UsedMemory(), 2*1024*1024*1024) + assert.NoError(t, nodes[0].AddContainer(createContainer("c1", config))) + assert.Equal(t, nodes[0].UsedMemory, 2*1024*1024*1024) // add 2 containers 1G on node2 config = createConfig(1, 0) - assert.NoError(t, AddContainer(nodes[1], createContainer("c2", config))) - assert.NoError(t, AddContainer(nodes[1], createContainer("c3", config))) - assert.Equal(t, nodes[1].UsedMemory(), int64(2*1024*1024*1024)) + assert.NoError(t, nodes[1].AddContainer(createContainer("c2", config))) + assert.NoError(t, nodes[1].AddContainer(createContainer("c3", config))) + assert.Equal(t, nodes[1].UsedMemory, int64(2*1024*1024*1024)) // add another container 1G config = createConfig(1, 0) node, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer("c4", config))) - assert.Equal(t, node.UsedMemory(), 3*1024*1024*1024) + assert.NoError(t, node.AddContainer(createContainer("c4", config))) + assert.Equal(t, node.UsedMemory, 3*1024*1024*1024) // check that the last container ended on the node with the lowest number of containers - assert.Equal(t, node.ID(), nodes[0].ID()) - assert.Equal(t, len(nodes[0].Containers()), len(nodes[1].Containers())) + assert.Equal(t, node.ID, nodes[0].ID) + assert.Equal(t, len(nodes[0].Containers), len(nodes[1].Containers)) } func TestSpreadPlaceContainerMemory(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 2, 0)) } @@ -52,25 +52,25 @@ func TestSpreadPlaceContainerMemory(t *testing.T) { config := createConfig(1, 0) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) - assert.Equal(t, node1.UsedMemory(), 1024*1024*1024) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) + assert.Equal(t, node1.UsedMemory, 1024*1024*1024) // add another container 1G config = createConfig(1, 0) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) - assert.Equal(t, node2.UsedMemory(), int64(1024*1024*1024)) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) + assert.Equal(t, node2.UsedMemory, int64(1024*1024*1024)) // check that both containers ended on different node - assert.NotEqual(t, node1.ID(), node2.ID()) - assert.Equal(t, len(node1.Containers()), len(node2.Containers()), "") + assert.NotEqual(t, node1.ID, node2.ID) + assert.Equal(t, len(node1.Containers), len(node2.Containers), "") } func TestSpreadPlaceContainerCPU(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 0, 2)) } @@ -79,25 +79,25 @@ func TestSpreadPlaceContainerCPU(t *testing.T) { config := createConfig(0, 1) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) - assert.Equal(t, node1.UsedCpus(), 1) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) + assert.Equal(t, node1.UsedCpus, 1) // add another container 1CPU config = createConfig(0, 1) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) - assert.Equal(t, node2.UsedCpus(), 1) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) + assert.Equal(t, node2.UsedCpus, 1) // check that both containers ended on different node - assert.NotEqual(t, node1.ID(), node2.ID()) - assert.Equal(t, len(node1.Containers()), len(node2.Containers()), "") + assert.NotEqual(t, node1.ID, node2.ID) + assert.Equal(t, len(node1.Containers), len(node2.Containers), "") } func TestSpreadPlaceContainerHuge(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 100; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 1, 1)) } @@ -106,7 +106,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) { for i := 0; i < 100; i++ { node, err := s.PlaceContainer(createConfig(0, 1), nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1)))) + assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1)))) } // try to add another container 1CPU @@ -117,7 +117,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) { for i := 100; i < 200; i++ { node, err := s.PlaceContainer(createConfig(1, 0), nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node, createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0)))) + assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0)))) } // try to add another container 1G @@ -128,7 +128,7 @@ func TestSpreadPlaceContainerHuge(t *testing.T) { func TestSpreadPlaceContainerOvercommit(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{createNode("node-1", 100, 1)} + nodes := []*node.Node{createNode("node-1", 100, 1)} config := createConfig(0, 0) @@ -159,7 +159,7 @@ func TestSpreadPlaceContainerOvercommit(t *testing.T) { func TestSpreadComplexPlacement(t *testing.T) { s := &SpreadPlacementStrategy{} - nodes := []cluster.Node{} + nodes := []*node.Node{} for i := 0; i < 2; i++ { nodes = append(nodes, createNode(fmt.Sprintf("node-%d", i), 4, 4)) } @@ -168,23 +168,23 @@ func TestSpreadComplexPlacement(t *testing.T) { config := createConfig(2, 0) node1, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node1, createContainer("c1", config))) + assert.NoError(t, node1.AddContainer(createContainer("c1", config))) // add one container 3G config = createConfig(3, 0) node2, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node2, createContainer("c2", config))) + assert.NoError(t, node2.AddContainer(createContainer("c2", config))) // check that they end up on separate nodes - assert.NotEqual(t, node1.ID(), node2.ID()) + assert.NotEqual(t, node1.ID, node2.ID) // add one container 1G config = createConfig(1, 0) node3, err := s.PlaceContainer(config, nodes) assert.NoError(t, err) - assert.NoError(t, AddContainer(node3, createContainer("c3", config))) + assert.NoError(t, node3.AddContainer(createContainer("c3", config))) // check that it ends up on the same node as the 2G - assert.Equal(t, node1.ID(), node3.ID()) + assert.Equal(t, node1.ID, node3.ID) } diff --git a/scheduler/strategy/strategy.go b/scheduler/strategy/strategy.go index 93afcd075c..acf826aca5 100644 --- a/scheduler/strategy/strategy.go +++ b/scheduler/strategy/strategy.go @@ -4,7 +4,7 @@ import ( "errors" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) @@ -15,7 +15,7 @@ type PlacementStrategy interface { Initialize() error // Given a container configuration and a set of nodes, select the target // node where the container should be scheduled. - PlaceContainer(config *dockerclient.ContainerConfig, nodes []cluster.Node) (cluster.Node, error) + PlaceContainer(config *dockerclient.ContainerConfig, nodes []*node.Node) (*node.Node, error) } var ( diff --git a/scheduler/strategy/weighted_node.go b/scheduler/strategy/weighted_node.go index 276a99ec19..d08d9a9ed1 100644 --- a/scheduler/strategy/weighted_node.go +++ b/scheduler/strategy/weighted_node.go @@ -1,14 +1,14 @@ package strategy import ( - "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/node" "github.com/samalba/dockerclient" ) // WeightedNode represents a node in the cluster with a given weight, typically used for sorting // purposes. type weightedNode struct { - Node cluster.Node + Node *node.Node // Weight is the inherent value of this node. Weight int64 } @@ -32,12 +32,12 @@ func (n weightedNodeList) Less(i, j int) bool { return ip.Weight < jp.Weight } -func weighNodes(config *dockerclient.ContainerConfig, nodes []cluster.Node) (weightedNodeList, error) { +func weighNodes(config *dockerclient.ContainerConfig, nodes []*node.Node) (weightedNodeList, error) { weightedNodes := weightedNodeList{} for _, node := range nodes { - nodeMemory := node.TotalMemory() - nodeCpus := node.TotalCpus() + nodeMemory := node.TotalMemory + nodeCpus := node.TotalCpus // Skip nodes that are smaller than the requested resources. if nodeMemory < int64(config.Memory) || nodeCpus < config.CpuShares { @@ -50,10 +50,10 @@ func weighNodes(config *dockerclient.ContainerConfig, nodes []cluster.Node) (wei ) if config.CpuShares > 0 { - cpuScore = (node.UsedCpus() + config.CpuShares) * 100 / nodeCpus + cpuScore = (node.UsedCpus + config.CpuShares) * 100 / nodeCpus } if config.Memory > 0 { - memoryScore = (node.UsedMemory() + config.Memory) * 100 / nodeMemory + memoryScore = (node.UsedMemory + config.Memory) * 100 / nodeMemory } if cpuScore <= 100 && memoryScore <= 100 {