From 0bdfcfaa33b4945fdfd6ff9ea82bf3c5aa63848d Mon Sep 17 00:00:00 2001 From: Robert Obryk Date: Sat, 30 Mar 2013 14:37:06 +0100 Subject: [PATCH 1/3] Make writeBroadcaster safe for concurrent use. --- utils.go | 12 +++++++++++- utils_test.go | 22 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/utils.go b/utils.go index fbbad34081..f6afb9f23c 100644 --- a/utils.go +++ b/utils.go @@ -213,15 +213,21 @@ func (r *bufReader) Close() error { } type writeBroadcaster struct { + mu sync.Mutex writers *list.List } func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) { + w.mu.Lock() w.writers.PushBack(writer) + w.mu.Unlock() } // FIXME: Is that function used? +// FIXME: This relies on the concrete writer type used having equality operator func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) { + w.mu.Lock() + defer w.mu.Unlock() for e := w.writers.Front(); e != nil; e = e.Next() { v := e.Value.(io.Writer) if v == writer { @@ -232,6 +238,8 @@ func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) { } func (w *writeBroadcaster) Write(p []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() failed := []*list.Element{} for e := w.writers.Front(); e != nil; e = e.Next() { writer := e.Value.(io.Writer) @@ -249,6 +257,8 @@ func (w *writeBroadcaster) Write(p []byte) (n int, err error) { } func (w *writeBroadcaster) Close() error { + w.mu.Lock() + defer w.mu.Unlock() for e := w.writers.Front(); e != nil; e = e.Next() { writer := e.Value.(io.WriteCloser) writer.Close() @@ -258,5 +268,5 @@ func (w *writeBroadcaster) Close() error { } func newWriteBroadcaster() *writeBroadcaster { - return &writeBroadcaster{list.New()} + return &writeBroadcaster{writers: list.New()} } diff --git a/utils_test.go b/utils_test.go index dbdcda434c..c32b7bc7f9 100644 --- a/utils_test.go +++ b/utils_test.go @@ -124,3 +124,25 @@ func TestWriteBroadcaster(t *testing.T) { writer.Close() } + +type devNullCloser int + +func (d devNullCloser) Close() error { + return nil +} + +func (d devNullCloser) Write(buf []byte) (int, error) { + return len(buf), nil +} + +// This test checks for races. It is only useful when run with the race detector. +func TestRaceWriteBroadcaster(t *testing.T) { + writer := newWriteBroadcaster() + c := make(chan bool) + go func() { + writer.AddWriter(devNullCloser(0)) + c <- true + }() + writer.Write([]byte("hello")) + <-c +} From 75ba07cb3a7fcf5d6cc9f1120fd3468a9658be67 Mon Sep 17 00:00:00 2001 From: Robert Obryk Date: Sat, 30 Mar 2013 14:44:00 +0100 Subject: [PATCH 2/3] Swapped a map for a list in writeBroadcaster. --- utils.go | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/utils.go b/utils.go index f6afb9f23c..f3a0134612 100644 --- a/utils.go +++ b/utils.go @@ -2,7 +2,6 @@ package docker import ( "bytes" - "container/list" "errors" "fmt" "github.com/dotcloud/docker/rcli" @@ -214,12 +213,12 @@ func (r *bufReader) Close() error { type writeBroadcaster struct { mu sync.Mutex - writers *list.List + writers map[io.WriteCloser]struct{} } func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) { w.mu.Lock() - w.writers.PushBack(writer) + w.writers[writer] = struct{}{} w.mu.Unlock() } @@ -227,46 +226,32 @@ func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) { // FIXME: This relies on the concrete writer type used having equality operator func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) { w.mu.Lock() - defer w.mu.Unlock() - for e := w.writers.Front(); e != nil; e = e.Next() { - v := e.Value.(io.Writer) - if v == writer { - w.writers.Remove(e) - return - } - } + delete(w.writers, writer) + w.mu.Unlock() } func (w *writeBroadcaster) Write(p []byte) (n int, err error) { w.mu.Lock() defer w.mu.Unlock() - failed := []*list.Element{} - for e := w.writers.Front(); e != nil; e = e.Next() { - writer := e.Value.(io.Writer) + for writer := range w.writers { if n, err := writer.Write(p); err != nil || n != len(p) { // On error, evict the writer - failed = append(failed, e) + delete(w.writers, writer) } } - // We cannot remove while iterating, so it has to be done in - // a separate step - for _, e := range failed { - w.writers.Remove(e) - } return len(p), nil } func (w *writeBroadcaster) Close() error { w.mu.Lock() defer w.mu.Unlock() - for e := w.writers.Front(); e != nil; e = e.Next() { - writer := e.Value.(io.WriteCloser) + for writer := range w.writers { writer.Close() } - w.writers.Init() + w.writers = make(map[io.WriteCloser]struct{}) return nil } func newWriteBroadcaster() *writeBroadcaster { - return &writeBroadcaster{writers: list.New()} + return &writeBroadcaster{writers: make(map[io.WriteCloser]struct{})} } From a83d87abd442dda5f2d67a2701aae9c7b891e68a Mon Sep 17 00:00:00 2001 From: Robert Obryk Date: Tue, 2 Apr 2013 10:45:17 +0200 Subject: [PATCH 3/3] Renamed writeBroadcaster.Close() to CloseWriters(). --- container.go | 8 ++++---- utils.go | 2 +- utils_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/container.go b/container.go index d128ce98dd..7414e4c45b 100644 --- a/container.go +++ b/container.go @@ -161,14 +161,14 @@ func (container *Container) startPty() error { // Copy the PTYs to our broadcasters go func() { - defer container.stdout.Close() + defer container.stdout.CloseWriters() Debugf("[startPty] Begin of stdout pipe") io.Copy(container.stdout, stdoutMaster) Debugf("[startPty] End of stdout pipe") }() go func() { - defer container.stderr.Close() + defer container.stderr.CloseWriters() Debugf("[startPty] Begin of stderr pipe") io.Copy(container.stderr, stderrMaster) Debugf("[startPty] End of stderr pipe") @@ -374,10 +374,10 @@ func (container *Container) monitor() { if err := container.releaseNetwork(); err != nil { log.Printf("%v: Failed to release network: %v", container.Id, err) } - if err := container.stdout.Close(); err != nil { + if err := container.stdout.CloseWriters(); err != nil { Debugf("%s: Error close stdout: %s", container.Id, err) } - if err := container.stderr.Close(); err != nil { + if err := container.stderr.CloseWriters(); err != nil { Debugf("%s: Error close stderr: %s", container.Id, err) } if err := container.Unmount(); err != nil { diff --git a/utils.go b/utils.go index f3a0134612..0f11f77b47 100644 --- a/utils.go +++ b/utils.go @@ -242,7 +242,7 @@ func (w *writeBroadcaster) Write(p []byte) (n int, err error) { return len(p), nil } -func (w *writeBroadcaster) Close() error { +func (w *writeBroadcaster) CloseWriters() error { w.mu.Lock() defer w.mu.Unlock() for writer := range w.writers { diff --git a/utils_test.go b/utils_test.go index c32b7bc7f9..c70aedec79 100644 --- a/utils_test.go +++ b/utils_test.go @@ -122,7 +122,7 @@ func TestWriteBroadcaster(t *testing.T) { t.Errorf("Buffer contains %v", bufferC.String()) } - writer.Close() + writer.CloseWriters() } type devNullCloser int