diff --git a/api/README.md b/api/README.md index 6c83bb242d..d5a9d1499d 100644 --- a/api/README.md +++ b/api/README.md @@ -15,7 +15,6 @@ Some endpoints have not yet been implemented and will return a 404 error. ``` GET "/images/get" -POST "/images/load" POST "/images/create" : "docker import" flow not implement ``` diff --git a/api/handlers.go b/api/handlers.go index fa0ca63332..e6fb08b88c 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -270,6 +270,21 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { } } +// POST /images/load +func postImagesLoad(c *context, w http.ResponseWriter, r *http.Request) { + + // call cluster to load image on every node + wf := NewWriteFlusher(w) + callback := func(what, status string) { + if status == "" { + fmt.Fprintf(wf, "%s:Loading Image...\n", what) + } else { + fmt.Fprintf(wf, "%s:Loading Image... %s\n", what, status) + } + } + c.cluster.Load(r.Body, callback) +} + // GET /events func getEvents(c *context, w http.ResponseWriter, r *http.Request) { c.eventsHandler.Add(r.RemoteAddr, w) diff --git a/api/router.go b/api/router.go index 7da78a2695..420536abe9 100644 --- a/api/router.go +++ b/api/router.go @@ -48,7 +48,7 @@ var routes = map[string]map[string]handler{ "/commit": postCommit, "/build": proxyRandomAndForceRefresh, "/images/create": postImagesCreate, - "/images/load": notImplementedHandler, + "/images/load": postImagesLoad, "/images/{name:.*}/push": proxyImage, "/images/{name:.*}/tag": proxyImageAndForceRefresh, "/containers/create": postContainersCreate, diff --git a/cluster/cluster.go b/cluster/cluster.go index 3b0beaab7d..d1dbb2aa77 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,6 +2,7 @@ package cluster import ( "github.com/samalba/dockerclient" + "io" ) // Cluster is exported @@ -33,6 +34,12 @@ type Cluster interface { // `status` is the current status, like "", "in progress" or "downloaded Pull(name string, callback func(what, status string)) + // Load images + // `callback` can be called multiple time + // `what` is what is being loaded + // `status` is the current status, like "", "in progress" or "loaded" + Load(imageReader io.Reader, callback func(what, status string)) + // Return some info about the cluster, like nb or containers / images // It is pretty open, so the implementation decides what to return. Info() [][2]string diff --git a/cluster/engine.go b/cluster/engine.go index 5fa6c25e11..3c97f8f108 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "fmt" + "io" "net" "strings" "sync" @@ -415,6 +416,18 @@ func (e *Engine) Pull(image string) error { return nil } +// Load an image on the engine +func (e *Engine) Load(reader io.Reader) error { + if err := e.client.LoadImage(reader); err != nil { + return err + } + + // force fresh images + e.RefreshImages() + + return nil +} + // RegisterEventHandler registers an event handler. func (e *Engine) RegisterEventHandler(h EventHandler) error { if e.eventHandler != nil { diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 8f5d9e6c4e..d69131ccc7 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -3,6 +3,7 @@ package swarm import ( "errors" "fmt" + "io" "sort" "strings" "sync" @@ -239,6 +240,57 @@ func (c *Cluster) Pull(name string, callback func(what, status string)) { wg.Wait() } +// Load image +func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string)) { + var wg sync.WaitGroup + + c.RLock() + pipeWriters := []*io.PipeWriter{} + pipeReaders := []*io.PipeReader{} + for _, n := range c.engines { + wg.Add(1) + + pipeReader, pipeWriter := io.Pipe() + pipeReaders = append(pipeReaders, pipeReader) + pipeWriters = append(pipeWriters, pipeWriter) + + go func(reader *io.PipeReader, nn *cluster.Engine) { + defer wg.Done() + defer reader.Close() + + // call engine load image + err := nn.Load(reader) + if callback != nil { + if err != nil { + callback(nn.Name, err.Error()) + } + } + }(pipeReader, n) + } + + // create multi-writer + listWriter := []io.Writer{} + for _, pipeW := range pipeWriters { + listWriter = append(listWriter, pipeW) + } + mutiWriter := io.MultiWriter(listWriter...) + + // copy image-reader to muti-writer + _, err := io.Copy(mutiWriter, imageReader) + if err != nil { + log.Error(err) + } + + // close pipe writers + for _, pipeW := range pipeWriters { + pipeW.Close() + } + + c.RUnlock() + + wg.Wait() +} + // Containers returns all the containers in the cluster. func (c *Cluster) Containers() []*cluster.Container { c.RLock() diff --git a/test/integration/api.bats b/test/integration/api.bats index 63bae6632a..8d8f94f161 100644 --- a/test/integration/api.bats +++ b/test/integration/api.bats @@ -259,9 +259,37 @@ function teardown() { [[ "${lines[1]}" == *"Exited"* ]] } -# FIXME @test "docker load" { - skip + # temp file for saving image + IMAGE_FILE=$(mktemp) + + # create a tar file + docker pull busybox:latest + docker save -o $IMAGE_FILE busybox:latest + + start_docker 2 + swarm_manage + + run docker_swarm images -q + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 0 ] + + run docker_swarm load -i $IMAGE_FILE + [ "$status" -eq 0 ] + + # check node0 + run docker -H ${HOSTS[0]} images + [ "${#lines[@]}" -eq 2 ] + [[ "${lines[1]}" == *"busybox"* ]] + + # check node1 + run docker -H ${HOSTS[1]} images + [ "${#lines[@]}" -eq 2 ] + [[ "${lines[1]}" == *"busybox"* ]] + + rm -f $IMAGE_FILE +} + } # FIXME