Merge pull request #16228 from duglin/ContextualizeEvents

Add context.RequestID to event stream
This commit is contained in:
David Calavera
2015-09-24 14:16:22 -07:00
68 changed files with 737 additions and 565 deletions

View File

@@ -45,7 +45,7 @@ func (s *Server) getContainersJSON(ctx context.Context, w http.ResponseWriter, r
config.Limit = limit
}
containers, err := s.daemon.Containers(config)
containers, err := s.daemon.Containers(ctx, config)
if err != nil {
return err
}
@@ -83,7 +83,7 @@ func (s *Server) getContainersStats(ctx context.Context, w http.ResponseWriter,
Version: version,
}
return s.daemon.ContainerStats(vars["name"], config)
return s.daemon.ContainerStats(ctx, vars["name"], config)
}
func (s *Server) getContainersLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@@ -118,7 +118,7 @@ func (s *Server) getContainersLogs(ctx context.Context, w http.ResponseWriter, r
closeNotifier = notifier.CloseNotify()
}
c, err := s.daemon.Get(vars["name"])
c, err := s.daemon.Get(ctx, vars["name"])
if err != nil {
return err
}
@@ -140,7 +140,7 @@ func (s *Server) getContainersLogs(ctx context.Context, w http.ResponseWriter, r
Stop: closeNotifier,
}
if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
if err := s.daemon.ContainerLogs(ctx, c, logsConfig); err != nil {
// The client may be expecting all of the data we're sending to
// be multiplexed, so send it through OutStream, which will
// have been set up to handle that if needed.
@@ -155,7 +155,7 @@ func (s *Server) getContainersExport(ctx context.Context, w http.ResponseWriter,
return fmt.Errorf("Missing parameter")
}
return s.daemon.ContainerExport(vars["name"], w)
return s.daemon.ContainerExport(ctx, vars["name"], w)
}
func (s *Server) postContainersStart(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@@ -183,7 +183,7 @@ func (s *Server) postContainersStart(ctx context.Context, w http.ResponseWriter,
hostConfig = c
}
if err := s.daemon.ContainerStart(vars["name"], hostConfig); err != nil {
if err := s.daemon.ContainerStart(ctx, vars["name"], hostConfig); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
@@ -200,7 +200,7 @@ func (s *Server) postContainersStop(ctx context.Context, w http.ResponseWriter,
seconds, _ := strconv.Atoi(r.Form.Get("t"))
if err := s.daemon.ContainerStop(vars["name"], seconds); err != nil {
if err := s.daemon.ContainerStop(ctx, vars["name"], seconds); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
@@ -227,7 +227,7 @@ func (s *Server) postContainersKill(ctx context.Context, w http.ResponseWriter,
}
}
if err := s.daemon.ContainerKill(name, uint64(sig)); err != nil {
if err := s.daemon.ContainerKill(ctx, name, uint64(sig)); err != nil {
theErr, isDerr := err.(errcode.ErrorCoder)
isStopped := isDerr && theErr.ErrorCode() == derr.ErrorCodeNotRunning
@@ -254,7 +254,7 @@ func (s *Server) postContainersRestart(ctx context.Context, w http.ResponseWrite
timeout, _ := strconv.Atoi(r.Form.Get("t"))
if err := s.daemon.ContainerRestart(vars["name"], timeout); err != nil {
if err := s.daemon.ContainerRestart(ctx, vars["name"], timeout); err != nil {
return err
}
@@ -271,7 +271,7 @@ func (s *Server) postContainersPause(ctx context.Context, w http.ResponseWriter,
return err
}
if err := s.daemon.ContainerPause(vars["name"]); err != nil {
if err := s.daemon.ContainerPause(ctx, vars["name"]); err != nil {
return err
}
@@ -288,7 +288,7 @@ func (s *Server) postContainersUnpause(ctx context.Context, w http.ResponseWrite
return err
}
if err := s.daemon.ContainerUnpause(vars["name"]); err != nil {
if err := s.daemon.ContainerUnpause(ctx, vars["name"]); err != nil {
return err
}
@@ -302,7 +302,7 @@ func (s *Server) postContainersWait(ctx context.Context, w http.ResponseWriter,
return fmt.Errorf("Missing parameter")
}
status, err := s.daemon.ContainerWait(vars["name"], -1*time.Second)
status, err := s.daemon.ContainerWait(ctx, vars["name"], -1*time.Second)
if err != nil {
return err
}
@@ -317,7 +317,7 @@ func (s *Server) getContainersChanges(ctx context.Context, w http.ResponseWriter
return fmt.Errorf("Missing parameter")
}
changes, err := s.daemon.ContainerChanges(vars["name"])
changes, err := s.daemon.ContainerChanges(ctx, vars["name"])
if err != nil {
return err
}
@@ -334,7 +334,7 @@ func (s *Server) getContainersTop(ctx context.Context, w http.ResponseWriter, r
return err
}
procList, err := s.daemon.ContainerTop(vars["name"], r.Form.Get("ps_args"))
procList, err := s.daemon.ContainerTop(ctx, vars["name"], r.Form.Get("ps_args"))
if err != nil {
return err
}
@@ -352,7 +352,7 @@ func (s *Server) postContainerRename(ctx context.Context, w http.ResponseWriter,
name := vars["name"]
newName := r.Form.Get("name")
if err := s.daemon.ContainerRename(name, newName); err != nil {
if err := s.daemon.ContainerRename(ctx, name, newName); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
@@ -378,7 +378,7 @@ func (s *Server) postContainersCreate(ctx context.Context, w http.ResponseWriter
version := ctx.Version()
adjustCPUShares := version.LessThan("1.19")
container, warnings, err := s.daemon.ContainerCreate(name, config, hostConfig, adjustCPUShares)
container, warnings, err := s.daemon.ContainerCreate(ctx, name, config, hostConfig, adjustCPUShares)
if err != nil {
return err
}
@@ -404,7 +404,7 @@ func (s *Server) deleteContainers(ctx context.Context, w http.ResponseWriter, r
RemoveLink: boolValue(r, "link"),
}
if err := s.daemon.ContainerRm(name, config); err != nil {
if err := s.daemon.ContainerRm(ctx, name, config); err != nil {
// Force a 404 for the empty string
if strings.Contains(strings.ToLower(err.Error()), "prefix can't be empty") {
return fmt.Errorf("no such id: \"\"")
@@ -434,7 +434,7 @@ func (s *Server) postContainersResize(ctx context.Context, w http.ResponseWriter
return err
}
return s.daemon.ContainerResize(vars["name"], height, width)
return s.daemon.ContainerResize(ctx, vars["name"], height, width)
}
func (s *Server) postContainersAttach(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@@ -446,7 +446,7 @@ func (s *Server) postContainersAttach(ctx context.Context, w http.ResponseWriter
}
containerName := vars["name"]
if !s.daemon.Exists(containerName) {
if !s.daemon.Exists(ctx, containerName) {
return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
}
@@ -472,7 +472,7 @@ func (s *Server) postContainersAttach(ctx context.Context, w http.ResponseWriter
Stream: boolValue(r, "stream"),
}
if err := s.daemon.ContainerAttachWithLogs(containerName, attachWithLogsConfig); err != nil {
if err := s.daemon.ContainerAttachWithLogs(ctx, containerName, attachWithLogsConfig); err != nil {
fmt.Fprintf(outStream, "Error attaching: %s\n", err)
}
@@ -488,7 +488,7 @@ func (s *Server) wsContainersAttach(ctx context.Context, w http.ResponseWriter,
}
containerName := vars["name"]
if !s.daemon.Exists(containerName) {
if !s.daemon.Exists(ctx, containerName) {
return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
}
@@ -503,7 +503,7 @@ func (s *Server) wsContainersAttach(ctx context.Context, w http.ResponseWriter,
Stream: boolValue(r, "stream"),
}
if err := s.daemon.ContainerWsAttachWithLogs(containerName, wsAttachWithLogsConfig); err != nil {
if err := s.daemon.ContainerWsAttachWithLogs(ctx, containerName, wsAttachWithLogsConfig); err != nil {
logrus.Errorf("Error attaching websocket: %s", err)
}
})

View File

@@ -32,7 +32,7 @@ func (s *Server) postContainersCopy(ctx context.Context, w http.ResponseWriter,
return fmt.Errorf("Path cannot be empty")
}
data, err := s.daemon.ContainerCopy(vars["name"], cfg.Resource)
data, err := s.daemon.ContainerCopy(ctx, vars["name"], cfg.Resource)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "no such id") {
w.WriteHeader(http.StatusNotFound)
@@ -74,7 +74,7 @@ func (s *Server) headContainersArchive(ctx context.Context, w http.ResponseWrite
return err
}
stat, err := s.daemon.ContainerStatPath(v.name, v.path)
stat, err := s.daemon.ContainerStatPath(ctx, v.name, v.path)
if err != nil {
return err
}
@@ -88,7 +88,7 @@ func (s *Server) getContainersArchive(ctx context.Context, w http.ResponseWriter
return err
}
tarArchive, stat, err := s.daemon.ContainerArchivePath(v.name, v.path)
tarArchive, stat, err := s.daemon.ContainerArchivePath(ctx, v.name, v.path)
if err != nil {
return err
}
@@ -111,5 +111,5 @@ func (s *Server) putContainersArchive(ctx context.Context, w http.ResponseWriter
}
noOverwriteDirNonDir := boolValue(r, "noOverwriteDirNonDir")
return s.daemon.ContainerExtractToDir(v.name, v.path, noOverwriteDirNonDir, r.Body)
return s.daemon.ContainerExtractToDir(ctx, v.name, v.path, noOverwriteDirNonDir, r.Body)
}

View File

@@ -45,7 +45,7 @@ func (s *Server) getVersion(ctx context.Context, w http.ResponseWriter, r *http.
}
func (s *Server) getInfo(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
info, err := s.daemon.SystemInfo()
info, err := s.daemon.SystemInfo(ctx)
if err != nil {
return err
}
@@ -120,7 +120,7 @@ func (s *Server) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
enc := json.NewEncoder(outStream)
getContainerID := func(cn string) string {
c, err := d.Get(cn)
c, err := d.Get(ctx, cn)
if err != nil {
return ""
}

View File

@@ -19,7 +19,7 @@ func (s *Server) getExecByID(ctx context.Context, w http.ResponseWriter, r *http
return fmt.Errorf("Missing parameter 'id'")
}
eConfig, err := s.daemon.ContainerExecInspect(vars["id"])
eConfig, err := s.daemon.ContainerExecInspect(ctx, vars["id"])
if err != nil {
return err
}
@@ -47,7 +47,7 @@ func (s *Server) postContainerExecCreate(ctx context.Context, w http.ResponseWri
}
// Register an instance of Exec in container.
id, err := s.daemon.ContainerExecCreate(execConfig)
id, err := s.daemon.ContainerExecCreate(ctx, execConfig)
if err != nil {
logrus.Errorf("Error setting up exec command in container %s: %s", name, err)
return err
@@ -100,7 +100,7 @@ func (s *Server) postContainerExecStart(ctx context.Context, w http.ResponseWrit
}
// Now run the user process in container.
if err := s.daemon.ContainerExecStart(execName, stdin, stdout, stderr); err != nil {
if err := s.daemon.ContainerExecStart(ctx, execName, stdin, stdout, stderr); err != nil {
fmt.Fprintf(outStream, "Error running exec in container: %v\n", err)
}
return nil
@@ -123,5 +123,5 @@ func (s *Server) postContainerExecResize(ctx context.Context, w http.ResponseWri
return err
}
return s.daemon.ContainerExecResize(vars["name"], height, width)
return s.daemon.ContainerExecResize(ctx, vars["name"], height, width)
}

View File

@@ -55,7 +55,7 @@ func (s *Server) postCommit(ctx context.Context, w http.ResponseWriter, r *http.
Config: c,
}
imgID, err := builder.Commit(cname, s.daemon, commitCfg)
imgID, err := builder.Commit(ctx, cname, s.daemon, commitCfg)
if err != nil {
return err
}
@@ -112,7 +112,7 @@ func (s *Server) postImagesCreate(ctx context.Context, w http.ResponseWriter, r
OutStream: output,
}
err = s.daemon.Repositories().Pull(image, tag, imagePullConfig)
err = s.daemon.Repositories(ctx).Pull(ctx, image, tag, imagePullConfig)
} else { //import
if tag == "" {
repo, tag = parsers.ParseRepositoryTag(repo)
@@ -124,12 +124,12 @@ func (s *Server) postImagesCreate(ctx context.Context, w http.ResponseWriter, r
// generated from the download to be available to the output
// stream processing below
var newConfig *runconfig.Config
newConfig, err = builder.BuildFromConfig(s.daemon, &runconfig.Config{}, r.Form["changes"])
newConfig, err = builder.BuildFromConfig(ctx, s.daemon, &runconfig.Config{}, r.Form["changes"])
if err != nil {
return err
}
err = s.daemon.Repositories().Import(src, repo, tag, message, r.Body, output, newConfig)
err = s.daemon.Repositories(ctx).Import(ctx, src, repo, tag, message, r.Body, output, newConfig)
}
if err != nil {
if !output.Flushed() {
@@ -184,7 +184,7 @@ func (s *Server) postImagesPush(ctx context.Context, w http.ResponseWriter, r *h
w.Header().Set("Content-Type", "application/json")
if err := s.daemon.Repositories().Push(name, imagePushConfig); err != nil {
if err := s.daemon.Repositories(ctx).Push(ctx, name, imagePushConfig); err != nil {
if !output.Flushed() {
return err
}
@@ -212,7 +212,7 @@ func (s *Server) getImagesGet(ctx context.Context, w http.ResponseWriter, r *htt
names = r.Form["names"]
}
if err := s.daemon.Repositories().ImageExport(names, output); err != nil {
if err := s.daemon.Repositories(ctx).ImageExport(names, output); err != nil {
if !output.Flushed() {
return err
}
@@ -223,7 +223,7 @@ func (s *Server) getImagesGet(ctx context.Context, w http.ResponseWriter, r *htt
}
func (s *Server) postImagesLoad(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
return s.daemon.Repositories().Load(r.Body, w)
return s.daemon.Repositories(ctx).Load(r.Body, w)
}
func (s *Server) deleteImages(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@@ -243,7 +243,7 @@ func (s *Server) deleteImages(ctx context.Context, w http.ResponseWriter, r *htt
force := boolValue(r, "force")
prune := !boolValue(r, "noprune")
list, err := s.daemon.ImageDelete(name, force, prune)
list, err := s.daemon.ImageDelete(ctx, name, force, prune)
if err != nil {
return err
}
@@ -256,7 +256,7 @@ func (s *Server) getImagesByName(ctx context.Context, w http.ResponseWriter, r *
return fmt.Errorf("Missing parameter")
}
imageInspect, err := s.daemon.Repositories().Lookup(vars["name"])
imageInspect, err := s.daemon.Repositories(ctx).Lookup(vars["name"])
if err != nil {
return err
}
@@ -346,7 +346,7 @@ func (s *Server) postBuild(ctx context.Context, w http.ResponseWriter, r *http.R
}()
}
if err := builder.Build(s.daemon, buildConfig); err != nil {
if err := builder.Build(ctx, s.daemon, buildConfig); err != nil {
// Do not write the error in the http output if it's still empty.
// This prevents from writing a 200(OK) when there is an interal error.
if !output.Flushed() {
@@ -364,7 +364,7 @@ func (s *Server) getImagesJSON(ctx context.Context, w http.ResponseWriter, r *ht
}
// FIXME: The filter parameter could just be a match filter
images, err := s.daemon.Repositories().Images(r.Form.Get("filters"), r.Form.Get("filter"), boolValue(r, "all"))
images, err := s.daemon.Repositories(ctx).Images(r.Form.Get("filters"), r.Form.Get("filter"), boolValue(r, "all"))
if err != nil {
return err
}
@@ -378,7 +378,7 @@ func (s *Server) getImagesHistory(ctx context.Context, w http.ResponseWriter, r
}
name := vars["name"]
history, err := s.daemon.Repositories().History(name)
history, err := s.daemon.Repositories(ctx).History(name)
if err != nil {
return err
}
@@ -398,10 +398,10 @@ func (s *Server) postImagesTag(ctx context.Context, w http.ResponseWriter, r *ht
tag := r.Form.Get("tag")
force := boolValue(r, "force")
name := vars["name"]
if err := s.daemon.Repositories().Tag(repo, tag, name, force); err != nil {
if err := s.daemon.Repositories(ctx).Tag(repo, tag, name, force); err != nil {
return err
}
s.daemon.EventsService.Log("tag", utils.ImageReference(repo, tag), "")
s.daemon.EventsService.Log(ctx, "tag", utils.ImageReference(repo, tag), "")
w.WriteHeader(http.StatusCreated)
return nil
}

View File

@@ -20,11 +20,11 @@ func (s *Server) getContainersByName(ctx context.Context, w http.ResponseWriter,
switch {
case version.LessThan("1.20"):
json, err = s.daemon.ContainerInspectPre120(vars["name"])
json, err = s.daemon.ContainerInspectPre120(ctx, vars["name"])
case version.Equal("1.20"):
json, err = s.daemon.ContainerInspect120(vars["name"])
json, err = s.daemon.ContainerInspect120(ctx, vars["name"])
default:
json, err = s.daemon.ContainerInspect(vars["name"])
json, err = s.daemon.ContainerInspect(ctx, vars["name"])
}
if err != nil {

View File

@@ -18,6 +18,7 @@ import (
"github.com/docker/docker/context"
"github.com/docker/docker/daemon"
"github.com/docker/docker/pkg/sockets"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/utils"
)
@@ -41,12 +42,12 @@ type Server struct {
}
// New returns a new instance of the server based on the specified configuration.
func New(cfg *Config) *Server {
func New(ctx context.Context, cfg *Config) *Server {
srv := &Server{
cfg: cfg,
start: make(chan struct{}),
}
srv.router = createRouter(srv)
srv.router = createRouter(ctx, srv)
return srv
}
@@ -290,7 +291,7 @@ func (s *Server) initTCPSocket(addr string) (l net.Listener, err error) {
return
}
func (s *Server) makeHTTPHandler(localMethod string, localRoute string, localHandler HTTPAPIFunc) http.HandlerFunc {
func (s *Server) makeHTTPHandler(ctx context.Context, localMethod string, localRoute string, localHandler HTTPAPIFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// log the handler generation
logrus.Debugf("Calling %s %s", localMethod, localRoute)
@@ -302,7 +303,8 @@ func (s *Server) makeHTTPHandler(localMethod string, localRoute string, localHan
// apply to all requests. Data that is specific to the
// immediate function being called should still be passed
// as 'args' on the function call.
ctx := context.Background()
reqID := stringid.TruncateID(stringid.GenerateNonCryptoID())
ctx = context.WithValue(ctx, context.RequestID, reqID)
handlerFunc := s.handleWithGlobalMiddlewares(localHandler)
if err := handlerFunc(ctx, w, r, mux.Vars(r)); err != nil {
@@ -314,7 +316,7 @@ func (s *Server) makeHTTPHandler(localMethod string, localRoute string, localHan
// createRouter initializes the main router the server uses.
// we keep enableCors just for legacy usage, need to be removed in the future
func createRouter(s *Server) *mux.Router {
func createRouter(ctx context.Context, s *Server) *mux.Router {
r := mux.NewRouter()
if os.Getenv("DEBUG") != "" {
profilerSetup(r, "/debug/")
@@ -394,7 +396,7 @@ func createRouter(s *Server) *mux.Router {
localMethod := method
// build the handler function
f := s.makeHTTPHandler(localMethod, localRoute, localFct)
f := s.makeHTTPHandler(ctx, localMethod, localRoute, localFct)
// add the new route
if localRoute == "" {

View File

@@ -2,8 +2,12 @@
package server
func (s *Server) registerSubRouter() {
httpHandler := s.daemon.NetworkAPIRouter()
import (
"github.com/docker/docker/context"
)
func (s *Server) registerSubRouter(ctx context.Context) {
httpHandler := s.daemon.NetworkAPIRouter(ctx)
subrouter := s.router.PathPrefix("/v{version:[0-9.]+}/networks").Subrouter()
subrouter.Methods("GET", "POST", "PUT", "DELETE").HandlerFunc(httpHandler)

View File

@@ -2,5 +2,9 @@
package server
func (s *Server) registerSubRouter() {
import (
"github.com/docker/docker/context"
)
func (s *Server) registerSubRouter(ctx context.Context) {
}

View File

@@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"github.com/docker/docker/context"
"github.com/docker/docker/daemon"
"github.com/docker/docker/pkg/sockets"
"github.com/docker/libnetwork/portallocator"
@@ -63,10 +64,10 @@ func (s *Server) newServer(proto, addr string) ([]serverCloser, error) {
// AcceptConnections allows clients to connect to the API server.
// Referenced Daemon is notified about this server, and waits for the
// daemon acknowledgement before the incoming connections are accepted.
func (s *Server) AcceptConnections(d *daemon.Daemon) {
func (s *Server) AcceptConnections(ctx context.Context, d *daemon.Daemon) {
// Tell the init daemon we are accepting requests
s.daemon = d
s.registerSubRouter()
s.registerSubRouter(ctx)
go systemdDaemon.SdNotify("READY=1")
// close the lock so the listeners start accepting connections
select {

View File

@@ -7,6 +7,7 @@ import (
"net"
"net/http"
"github.com/docker/docker/context"
"github.com/docker/docker/daemon"
)
@@ -42,9 +43,9 @@ func (s *Server) newServer(proto, addr string) ([]serverCloser, error) {
}
// AcceptConnections allows router to start listening for the incoming requests.
func (s *Server) AcceptConnections(d *daemon.Daemon) {
func (s *Server) AcceptConnections(ctx context.Context, d *daemon.Daemon) {
s.daemon = d
s.registerSubRouter()
s.registerSubRouter(ctx)
// close the lock so the listeners start accepting connections
select {
case <-s.start:

View File

@@ -13,7 +13,7 @@ func (s *Server) getVolumesList(ctx context.Context, w http.ResponseWriter, r *h
return err
}
volumes, err := s.daemon.Volumes(r.Form.Get("filters"))
volumes, err := s.daemon.Volumes(ctx, r.Form.Get("filters"))
if err != nil {
return err
}
@@ -25,7 +25,7 @@ func (s *Server) getVolumeByName(ctx context.Context, w http.ResponseWriter, r *
return err
}
v, err := s.daemon.VolumeInspect(vars["name"])
v, err := s.daemon.VolumeInspect(ctx, vars["name"])
if err != nil {
return err
}
@@ -46,7 +46,7 @@ func (s *Server) postVolumesCreate(ctx context.Context, w http.ResponseWriter, r
return err
}
volume, err := s.daemon.VolumeCreate(req.Name, req.Driver, req.DriverOpts)
volume, err := s.daemon.VolumeCreate(ctx, req.Name, req.Driver, req.DriverOpts)
if err != nil {
return err
}
@@ -57,7 +57,7 @@ func (s *Server) deleteVolumes(ctx context.Context, w http.ResponseWriter, r *ht
if err := parseForm(r); err != nil {
return err
}
if err := s.daemon.VolumeRm(vars["name"]); err != nil {
if err := s.daemon.VolumeRm(ctx, vars["name"]); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)