diff --git a/container.go b/container.go index e4c8f0635c..2a0231b40d 100644 --- a/container.go +++ b/container.go @@ -171,14 +171,14 @@ func (container *Container) startPty() error { // Copy the PTYs to our broadcasters go func() { - defer container.stdout.Close() + defer container.stdout.CloseWriters() Debugf("[startPty] Begin of stdout pipe") io.Copy(container.stdout, stdoutMaster) Debugf("[startPty] End of stdout pipe") }() go func() { - defer container.stderr.Close() + defer container.stderr.CloseWriters() Debugf("[startPty] Begin of stderr pipe") io.Copy(container.stderr, stderrMaster) Debugf("[startPty] End of stderr pipe") @@ -391,10 +391,10 @@ func (container *Container) monitor() { Debugf("%s: Error close stdin: %s", container.Id, err) } } - if err := container.stdout.Close(); err != nil { + if err := container.stdout.CloseWriters(); err != nil { Debugf("%s: Error close stdout: %s", container.Id, err) } - if err := container.stderr.Close(); err != nil { + if err := container.stderr.CloseWriters(); err != nil { Debugf("%s: Error close stderr: %s", container.Id, err) } diff --git a/utils.go b/utils.go index dcd224adc5..e295239f7f 100644 --- a/utils.go +++ b/utils.go @@ -2,7 +2,6 @@ package docker import ( "bytes" - "container/list" "errors" "fmt" "github.com/dotcloud/docker/rcli" @@ -215,52 +214,48 @@ func (r *bufReader) Close() error { } type writeBroadcaster struct { - writers *list.List + mu sync.Mutex + writers map[io.WriteCloser]struct{} } func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) { - w.writers.PushBack(writer) + w.mu.Lock() + w.writers[writer] = struct{}{} + w.mu.Unlock() } // FIXME: Is that function used? +// FIXME: This relies on the concrete writer type used having equality operator func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) { - for e := w.writers.Front(); e != nil; e = e.Next() { - v := e.Value.(io.Writer) - if v == writer { - w.writers.Remove(e) - return - } - } + w.mu.Lock() + delete(w.writers, writer) + w.mu.Unlock() } func (w *writeBroadcaster) Write(p []byte) (n int, err error) { - failed := []*list.Element{} - for e := w.writers.Front(); e != nil; e = e.Next() { - writer := e.Value.(io.Writer) + w.mu.Lock() + defer w.mu.Unlock() + for writer := range w.writers { if n, err := writer.Write(p); err != nil || n != len(p) { // On error, evict the writer - failed = append(failed, e) + delete(w.writers, writer) } } - // We cannot remove while iterating, so it has to be done in - // a separate step - for _, e := range failed { - w.writers.Remove(e) - } return len(p), nil } -func (w *writeBroadcaster) Close() error { - for e := w.writers.Front(); e != nil; e = e.Next() { - writer := e.Value.(io.WriteCloser) +func (w *writeBroadcaster) CloseWriters() error { + w.mu.Lock() + defer w.mu.Unlock() + for writer := range w.writers { writer.Close() } - w.writers.Init() + w.writers = make(map[io.WriteCloser]struct{}) return nil } func newWriteBroadcaster() *writeBroadcaster { - return &writeBroadcaster{list.New()} + return &writeBroadcaster{writers: make(map[io.WriteCloser]struct{})} } func getTotalUsedFds() int { diff --git a/utils_test.go b/utils_test.go index 192b042ba2..c15084f61e 100644 --- a/utils_test.go +++ b/utils_test.go @@ -122,7 +122,29 @@ func TestWriteBroadcaster(t *testing.T) { t.Errorf("Buffer contains %v", bufferC.String()) } - writer.Close() + writer.CloseWriters() +} + +type devNullCloser int + +func (d devNullCloser) Close() error { + return nil +} + +func (d devNullCloser) Write(buf []byte) (int, error) { + return len(buf), nil +} + +// This test checks for races. It is only useful when run with the race detector. +func TestRaceWriteBroadcaster(t *testing.T) { + writer := newWriteBroadcaster() + c := make(chan bool) + go func() { + writer.AddWriter(devNullCloser(0)) + c <- true + }() + writer.Write([]byte("hello")) + <-c } // Test the behavior of TruncIndex, an index for querying IDs from a non-conflicting prefix.