diff --git a/backends/backends.go b/backends/backends.go index 32e19f036f..50f3150859 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -1,12 +1,11 @@ package backends import ( - "io" "fmt" "github.com/docker/libswarm/beam" beamutils "github.com/docker/libswarm/beam/utils" + "io" "strings" - "sync" "time" ) @@ -18,18 +17,19 @@ import ( // Example: `New().Job("debug").Run()` func New() beam.Sender { backends := beamutils.NewHub() - backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { + backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) { return false, fmt.Errorf("no such backend: %s\n", strings.Join(msg.Args, " ")) }) - backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { + backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) { if len(msg.Args) > 0 && msg.Args[0] == "debug" { - debugr, debugw, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W) + debug, err := out.Send(&beam.Message{Name: "register", Ret: beam.RetPipe}) if err != nil { return false, err } + // Spawn the debug object go func() { for { - msg, msgr, msgw, err := debugr.Receive(beam.R | beam.W) + msg, err := debug.Receive(beam.Ret) if err == io.EOF { return } @@ -37,34 +37,35 @@ func New() beam.Sender { return } fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " ")) - // FIXME: goroutine? - Splice(debugw, msg, msgr, msgw) + if _, err := out.Send(msg); err != nil { + return + } } }() return false, nil } return true, nil }) - backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { + backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) { if len(msg.Args) > 0 && msg.Args[0] == "fakeclient" { - _, fakew, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W) + _, err := out.Send(&beam.Message{Name: "register", Ret: beam.NopSender{}}) if err != nil { return false, err } + // Spawm the fakeclient task + // FIXME: only do this after started? go func() { - defer fmt.Printf("[FAKECLIENT] done\n") + out.Send(&beam.Message{Name: "log", Args: []string{"fake client starting"}}) + defer out.Send(&beam.Message{Name: "log", Args: []string{"fake client terminating"}}) for { time.Sleep(1 * time.Second) - fmt.Printf("[FAKECLIENT] heartbeat\n") - _, _, err := fakew.Send(&beam.Message{Name: "log", Args: []string{"fake client reporting for duty"}}, 0) + _, err := out.Send(&beam.Message{Name: "log", Args: []string{"fake client reporting for duty"}}) if err != nil { return } - containersR, _, err := fakew.Send(&beam.Message{Name: "containers"}, beam.R) - if err != nil { + if _, err := out.Send(&beam.Message{Name: "children", Ret: beam.NopSender{}}); err != nil { return } - go beamutils.Copy(beam.NopSender{}, containersR) } }() return false, nil @@ -73,24 +74,3 @@ func New() beam.Sender { }) return backends } - -func Splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error { - dstR, dstW, err := dst.Send(msg, beam.R|beam.W) - if err != nil { - return err - } - defer dstW.Close() - var tasks sync.WaitGroup - _copy := func(dst beam.Sender, src beam.Receiver) { - tasks.Add(1) - go func() { - defer tasks.Done() - beamutils.Copy(dst, src) - dst.Close() - }() - } - _copy(dstW, r) - _copy(w, dstR) - tasks.Wait() - return nil -} diff --git a/beam/beam.go b/beam/beam.go index 7fb13119cb..d1ae87a0ed 100644 --- a/beam/beam.go +++ b/beam/beam.go @@ -6,23 +6,24 @@ import ( ) type Sender interface { - Send(msg *Message, mode int) (Receiver, Sender, error) + Send(msg *Message) (Receiver, error) Close() error } type Receiver interface { - Receive(mode int) (*Message, Receiver, Sender, error) + Receive(mode int) (*Message, error) } type Message struct { Name string Args []string Att *os.File + Ret Sender } const ( - R = 1 << (32 - 1 - iota) - W + Ret int = 1 << iota + // FIXME: use an `Att` flag to auto-close attachments by default ) type ReceiverFrom interface { @@ -37,3 +38,20 @@ var ( ErrIncompatibleSender = errors.New("incompatible sender") ErrIncompatibleReceiver = errors.New("incompatible receiver") ) + +// RetPipe is a special value for `Message.Ret`. +// When a Message is sent with `Ret=SendPipe`, the transport must +// substitute it with the writing end of a new pipe, and return the +// other end as a return value. +type retPipe struct { + NopSender +} + +var RetPipe = retPipe{} + +func (r retPipe) Equals(val Sender) bool { + if rval, ok := val.(retPipe); ok { + return rval == r + } + return false +} diff --git a/beam/beam_test.go b/beam/beam_test.go index ee0096b189..6d504f2ce7 100644 --- a/beam/beam_test.go +++ b/beam/beam_test.go @@ -5,13 +5,16 @@ import ( ) func TestModes(t *testing.T) { - if R == W { - t.Fatalf("0") - } - if R == 0 { - t.Fatalf("0") - } - if W == 0 { + if Ret == 0 { t.Fatalf("0") } } + +func TestRetPipe(t *testing.T) { + var ( + shouldBeEqual = RetPipe + ) + if RetPipe != shouldBeEqual { + t.Fatalf("%#v should equal %#v", RetPipe, shouldBeEqual) + } +} diff --git a/beam/copy.go b/beam/copy.go index 0d9543b043..dd979dc55f 100644 --- a/beam/copy.go +++ b/beam/copy.go @@ -1,57 +1,33 @@ -package utils +package beam import ( - "github.com/docker/libswarm/beam" "sync" ) -func Copy(dst beam.Sender, src beam.Receiver) (int, error) { +func Copy(dst Sender, src Receiver) (int, error) { var tasks sync.WaitGroup defer tasks.Wait() - if senderTo, ok := src.(beam.SenderTo); ok { - if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender { + if senderTo, ok := src.(SenderTo); ok { + if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender { return n, err } } - if receiverFrom, ok := dst.(beam.ReceiverFrom); ok { - if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver { + if receiverFrom, ok := dst.(ReceiverFrom); ok { + if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver { return n, err } } var ( n int ) - copyAndClose := func(dst beam.Sender, src beam.Receiver) { - if dst == nil { - return - } - defer dst.Close() - if src == nil { - return - } - Copy(dst, src) - } for { - msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W) + msg, err := src.Receive(Ret) if err != nil { return n, err } - sndR, sndW, err := dst.Send(msg, beam.R|beam.W) - if err != nil { - if rcvW != nil { - rcvW.Close() - } + if _, err := dst.Send(msg); err != nil { return n, err } - tasks.Add(2) - go func() { - copyAndClose(rcvW, sndR) - tasks.Done() - }() - go func() { - copyAndClose(sndW, rcvR) - tasks.Done() - }() n++ } return n, nil diff --git a/beam/inmem/inmem.go b/beam/inmem/inmem.go index 7b96473206..6d6a2a9d53 100644 --- a/beam/inmem/inmem.go +++ b/beam/inmem/inmem.go @@ -23,16 +23,10 @@ type pipe struct { wl sync.Mutex rerr error // if reader closed, error to give writes werr error // if writer closed, error to give reads - pmsg *pipeMessage + msg *beam.Message } -type pipeMessage struct { - msg *beam.Message - out *PipeSender - in *PipeReceiver -} - -func (p *pipe) psend(pmsg *pipeMessage) error { +func (p *pipe) psend(msg *beam.Message) error { var err error // One writer at a time. p.wl.Lock() @@ -40,10 +34,10 @@ func (p *pipe) psend(pmsg *pipeMessage) error { p.l.Lock() defer p.l.Unlock() - p.pmsg = pmsg + p.msg = msg p.rwait.Signal() for { - if p.pmsg == nil { + if p.msg == nil { break } if p.rerr != nil { @@ -55,38 +49,20 @@ func (p *pipe) psend(pmsg *pipeMessage) error { } p.wwait.Wait() } - p.pmsg = nil // in case of rerr or werr + p.msg = nil // in case of rerr or werr return err } -func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { - // Prepare the message - pmsg := &pipeMessage{msg: msg} - if mode&beam.R != 0 { - in, pmsg.out = Pipe() - defer func() { - if err != nil { - in.Close() - in = nil - pmsg.out.Close() - } - }() +func (p *pipe) send(msg *beam.Message) (ret beam.Receiver, err error) { + // Prepare nested Receiver if requested + if beam.RetPipe.Equals(msg.Ret) { + ret, msg.Ret = Pipe() } - if mode&beam.W != 0 { - pmsg.in, out = Pipe() - defer func() { - if err != nil { - out.Close() - out = nil - pmsg.in.Close() - } - }() - } - err = p.psend(pmsg) + err = p.psend(msg) return } -func (p *pipe) preceive() (*pipeMessage, error) { +func (p *pipe) preceive() (*beam.Message, error) { p.rl.Lock() defer p.rl.Unlock() @@ -96,7 +72,7 @@ func (p *pipe) preceive() (*pipeMessage, error) { if p.rerr != nil { return nil, io.ErrClosedPipe } - if p.pmsg != nil { + if p.msg != nil { break } if p.werr != nil { @@ -104,26 +80,24 @@ func (p *pipe) preceive() (*pipeMessage, error) { } p.rwait.Wait() } - pmsg := p.pmsg - p.pmsg = nil + msg := p.msg + p.msg = nil p.wwait.Signal() - return pmsg, nil + return msg, nil } -func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) { - pmsg, err := p.preceive() +func (p *pipe) receive(mode int) (*beam.Message, error) { + msg, err := p.preceive() if err != nil { - return nil, nil, nil, err + return nil, err } - if pmsg.out != nil && mode&beam.W == 0 { - pmsg.out.Close() - pmsg.out = nil + if msg.Ret == nil { + msg.Ret = beam.NopSender{} } - if pmsg.in != nil && mode&beam.R == 0 { - pmsg.in.Close() - pmsg.in = nil + if mode&beam.Ret == 0 { + msg.Ret.Close() } - return pmsg.msg, pmsg.in, pmsg.out, nil + return msg, nil } func (p *pipe) rclose(err error) { @@ -154,27 +128,8 @@ type PipeReceiver struct { p *pipe } -func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { - msg, pin, pout, err := r.p.receive(mode) - if err != nil { - return nil, nil, nil, err - } - var ( - // Always return NopReceiver/NopSender instead of nil values, - // because: - // - if they were requested in the mode, they can safely be used - // - if they were not requested, they can safely be ignored (ie no leak if they - // aren't closed) - in beam.Receiver = beam.NopReceiver{} - out beam.Sender = beam.NopSender{} - ) - if pin != nil { - in = pin - } - if pout != nil { - out = pout - } - return msg, in, out, err +func (r *PipeReceiver) Receive(mode int) (*beam.Message, error) { + return r.p.receive(mode) } func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) { @@ -215,19 +170,8 @@ type PipeSender struct { p *pipe } -func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { - pin, pout, err := w.p.send(msg, mode) - var ( - in beam.Receiver - out beam.Sender - ) - if pin != nil { - in = pin - } - if pout != nil { - out = pout - } - return in, out, err +func (w *PipeSender) Send(msg *beam.Message) (beam.Receiver, error) { + return w.p.send(msg) } func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) { diff --git a/beam/inmem/inmem_test.go b/beam/inmem/inmem_test.go index 62691eec8d..9a67acab4b 100644 --- a/beam/inmem/inmem_test.go +++ b/beam/inmem/inmem_test.go @@ -9,18 +9,33 @@ import ( "testing" ) -func TestReceiveW(t *testing.T) { +func TestRetPipe(t *testing.T) { r, w := Pipe() + defer r.Close() + defer w.Close() + wait := make(chan struct{}) go func() { - w.Send(&beam.Message{Name: "hello"}, 0) + ret, err := w.Send(&beam.Message{Name: "hello", Ret: beam.RetPipe}) + if err != nil { + t.Fatal(err) + } + msg, err := ret.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "this better not crash" { + t.Fatalf("%#v", msg) + } + close(wait) }() - _, _, ww, err := r.Receive(beam.W) + msg, err := r.Receive(beam.Ret) if err != nil { t.Fatal(err) } - if _, _, err := ww.Send(&beam.Message{Name: "this better not crash"}, 0); err != nil { + if _, err := msg.Ret.Send(&beam.Message{Name: "this better not crash"}); err != nil { t.Fatal(err) } + <-wait } func TestSimpleSend(t *testing.T) { @@ -29,7 +44,7 @@ func TestSimpleSend(t *testing.T) { defer w.Close() testutils.Timeout(t, func() { go func() { - msg, in, out, err := r.Receive(0) + msg, err := r.Receive(0) if err != nil { t.Fatal(err) } @@ -39,40 +54,13 @@ func TestSimpleSend(t *testing.T) { if msg.Args[0] != "hello world" { t.Fatalf("%#v", *msg) } - assertMode(t, in, out, 0) }() - in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0) - if err != nil { + if _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}); err != nil { t.Fatal(err) } - assertMode(t, in, out, 0) }) } -// assertMode verifies that the values of r and w match -// mode. -// If mode has the R bit set, r must be non-nil. -// If mode has the W bit set, w must be non-nil. -// -// If any of these conditions are not met, t.Fatal is called and the active -// test fails. -func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { - // If mode has the R bit set, r must be non-nil - if mode&beam.R != 0 { - if r == nil { - t.Fatalf("should be non-nil: %#v", r) - } - // Otherwise it must be nil. - } - // If mode has the W bit set, w must be non-nil - if mode&beam.W != 0 { - if w == nil { - t.Fatalf("should be non-nil: %#v", w) - } - // Otherwise it must be nil. - } -} - func TestSendReply(t *testing.T) { r, w := Pipe() defer r.Close() @@ -80,77 +68,41 @@ func TestSendReply(t *testing.T) { testutils.Timeout(t, func() { // Send go func() { - // Send a message with mode=R - in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R) + ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe}) if err != nil { t.Fatal(err) } - assertMode(t, in, out, beam.R) + if ret == nil { + t.Fatalf("ret = nil\n") + } // Read for a reply - resp, _, _, err := in.Receive(0) + msg, err := ret.Receive(0) if err != nil { t.Fatal(err) } - if resp.Args[0] != "this is the reply" { - t.Fatalf("%#v", resp) + if msg.Args[0] != "this is the reply" { + t.Fatalf("%#v", msg) } }() - // Receive a message with mode=W - msg, in, out, err := r.Receive(beam.W) + // Receive a message with mode=Ret + msg, err := r.Receive(beam.Ret) if err != nil { t.Fatal(err) } if msg.Args[0] != "this is the request" { t.Fatalf("%#v", msg) } - assertMode(t, in, out, beam.W) + if msg.Ret == nil { + t.Fatalf("%#v", msg) + } // Send a reply - _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0) + _, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}}) if err != nil { t.Fatal(err) } }) } -func TestSendNested(t *testing.T) { - r, w := Pipe() - defer r.Close() - defer w.Close() - testutils.Timeout(t, func() { - // Send - go func() { - // Send a message with mode=W - in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W) - if err != nil { - t.Fatal(err) - } - assertMode(t, in, out, beam.W) - // Send a nested message - _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0) - if err != nil { - t.Fatal(err) - } - }() - // Receive a message with mode=R - msg, in, out, err := r.Receive(beam.R) - if err != nil { - t.Fatal(err) - } - if msg.Args[0] != "this is the request" { - t.Fatalf("%#v", msg) - } - assertMode(t, in, out, beam.R) - // Read for a nested message - nested, _, _, err := in.Receive(0) - if err != nil { - t.Fatal(err) - } - if nested.Args[0] != "this is the nested message" { - t.Fatalf("%#v", nested) - } - }) -} - func TestSendFile(t *testing.T) { r, w := Pipe() defer r.Close() @@ -165,12 +117,12 @@ func TestSendFile(t *testing.T) { tmp.Seek(0, 0) testutils.Timeout(t, func() { go func() { - _, _, err := w.Send(&beam.Message{"file", []string{"path=" + tmp.Name()}, tmp}, 0) + _, err := w.Send(&beam.Message{Name: "file", Args: []string{"path=" + tmp.Name()}, Att: tmp}) if err != nil { t.Fatal(err) } }() - msg, _, _, err := r.Receive(0) + msg, err := r.Receive(0) if err != nil { t.Fatal(err) } diff --git a/beam/nop.go b/beam/nop.go index 061e5f41a5..603d9b2ec5 100644 --- a/beam/nop.go +++ b/beam/nop.go @@ -6,8 +6,8 @@ import ( type NopSender struct{} -func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) { - return NopReceiver{}, NopSender{}, nil +func (s NopSender) Send(msg *Message) (Receiver, error) { + return NopReceiver{}, nil } func (s NopSender) Close() error { @@ -16,6 +16,6 @@ func (s NopSender) Close() error { type NopReceiver struct{} -func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { - return nil, nil, nil, io.EOF +func (r NopReceiver) Receive(mode int) (*Message, error) { + return nil, io.EOF } diff --git a/beam/unix/conn.go b/beam/unix/conn.go index a06f3f5eb0..24c63647ff 100644 --- a/beam/unix/conn.go +++ b/beam/unix/conn.go @@ -50,75 +50,90 @@ func (c *Conn) Close() error { return c.UnixConn.CloseWrite() } -func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { +func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) { if msg.Att != nil { - return nil, nil, fmt.Errorf("file attachment not yet implemented in unix transport") + return nil, fmt.Errorf("file attachment not yet implemented in unix transport") } parts := []string{msg.Name} parts = append(parts, msg.Args...) b := []byte(data.EncodeList(parts)) // Setup nested streams var ( - fd *os.File - r beam.Receiver - w beam.Sender + fd *os.File + ret beam.Receiver + err error ) - if mode&(beam.R|beam.W) != 0 { + // Caller requested a return pipe + if beam.RetPipe.Equals(msg.Ret) { local, remote, err := sendablePair() if err != nil { - return nil, nil, err + return nil, err } fd = remote - if mode&beam.R != 0 { - r = &Conn{local} - } - if mode&beam.W != 0 { - w = &Conn{local} + ret = &Conn{local} + // Caller specified its own return channel + } else if msg.Ret != nil { + // The specified return channel is a unix conn: engaging cheat mode! + if retConn, ok := msg.Ret.(*Conn); ok { + fd, err = retConn.UnixConn.File() + if err != nil { + return nil, fmt.Errorf("error passing return channel: %v", err) + } + // Close duplicate fd + retConn.UnixConn.Close() + // The specified return channel is an unknown type: proxy messages. } else { - local.CloseWrite() + local, remote, err := sendablePair() + if err != nil { + return nil, fmt.Errorf("error passing return channel: %v", err) + } + fd = remote + // FIXME: do we need a reference no all these background tasks? + go func() { + // Copy messages from the remote return channel to the local return channel. + // When the remote return channel is closed, also close the local return channel. + localConn := &Conn{local} + beam.Copy(msg.Ret, localConn) + msg.Ret.Close() + localConn.Close() + }() } } - c.UnixConn.Send(b, fd) - return r, w, nil + if err := c.UnixConn.Send(b, fd); err != nil { + return nil, err + } + return ret, nil } -func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { +func (c *Conn) Receive(mode int) (*beam.Message, error) { b, fd, err := c.UnixConn.Receive() if err != nil { - return nil, nil, nil, err + return nil, err } parts, n, err := data.DecodeList(string(b)) if err != nil { - return nil, nil, nil, err + return nil, err } if n != len(b) { - return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n]) + return nil, fmt.Errorf("garbage data %#v", b[:n]) } if len(parts) == 0 { - return nil, nil, nil, fmt.Errorf("malformed message") + return nil, fmt.Errorf("malformed message") } msg := &beam.Message{Name: parts[0], Args: parts[1:]} - // Setup nested streams - var ( - r beam.Receiver - w beam.Sender - ) // Apply mode mask if fd != nil { subconn, err := FileConn(fd) if err != nil { - return nil, nil, nil, err + return nil, err } fd.Close() - if mode&beam.R != 0 { - r = &Conn{subconn} - } - if mode&beam.W != 0 { - w = &Conn{subconn} + if mode&beam.Ret != 0 { + msg.Ret = &Conn{subconn} } else { subconn.CloseWrite() } } - return msg, r, w, nil + return msg, nil } diff --git a/beam/utils/buf.go b/beam/utils/buf.go index 674d5f2b20..226c1fea38 100644 --- a/beam/utils/buf.go +++ b/beam/utils/buf.go @@ -6,9 +6,9 @@ import ( type Buffer []*beam.Message -func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { +func (buf *Buffer) Send(msg *beam.Message) (beam.Receiver, error) { (*buf) = append(*buf, msg) - return beam.NopReceiver{}, beam.NopSender{}, nil + return beam.NopReceiver{}, nil } func (buf *Buffer) Close() error { diff --git a/beam/utils/hub.go b/beam/utils/hub.go index 0d574a0042..42cc75cf8a 100644 --- a/beam/utils/hub.go +++ b/beam/utils/hub.go @@ -5,6 +5,7 @@ import ( "github.com/docker/libswarm/beam" "github.com/docker/libswarm/beam/inmem" "io" + "strings" "sync" ) @@ -12,6 +13,7 @@ import ( type Hub struct { handlers *StackSender tasks sync.WaitGroup + l sync.RWMutex } func NewHub() *Hub { @@ -20,73 +22,65 @@ func NewHub() *Hub { } } -func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { +func (hub *Hub) Send(msg *beam.Message) (ret beam.Receiver, err error) { if msg.Name == "register" { - if mode&beam.R == 0 { - return nil, nil, fmt.Errorf("register: no return channel") + if msg.Ret == nil { + return nil, fmt.Errorf("register: no return channel") } fmt.Printf("[hub] received %v\n", msg) - hYoutr, hYoutw := inmem.Pipe() - hYinr, hYinw := inmem.Pipe() + hIn := msg.Ret + if hIn == beam.RetPipe { + ret, hIn = inmem.Pipe() + } + // This queue guarantees that the first message received by the handler + // is the "register" response. + hIn = NewQueue(hIn, 1) + // Reply to the handler with a "register" call of our own, + // passing a reference to the previous handler stack. + // This allows the new handler to query previous handlers + // without creating loops. + hOut, err := hIn.Send(&beam.Message{Name: "register", Ret: beam.RetPipe}) + if err != nil { + return nil, err + } // Register the new handler on top of the others, // and get a reference to the previous stack of handlers. - prevHandlers := hub.handlers.Add(hYinw) - // Pass requests from the new handler to the previous chain of handlers - // hYout -> hXin - hub.tasks.Add(1) - go func() { - defer hub.tasks.Done() - Copy(prevHandlers, hYoutr) - hYoutr.Close() - }() - return hYinr, hYoutw, nil + prevHandlers := hub.handlers.Add(hIn) + go beam.Copy(prevHandlers, hOut) + return ret, nil } fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len()) - return hub.handlers.Send(msg, mode) -} - -func (hub *Hub) Register(dst beam.Sender) error { - in, _, err := hub.Send(&beam.Message{Name: "register"}, beam.R) - if err != nil { - return err - } - go Copy(dst, in) - return nil + return hub.handlers.Send(msg) } func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error { - in, out, err := hub.Send(&beam.Message{Name: "register"}, beam.R|beam.W) + ret, err := hub.Send(&beam.Message{Name: "register", Ret: beam.RetPipe}) if err != nil { return err } + ack, err := ret.Receive(beam.Ret) + if err != nil { + return err + } + if ack.Name == "error" { + return fmt.Errorf(strings.Join(ack.Args, ", ")) + } + if ack.Name != "register" { + return fmt.Errorf("invalid response: expected verb 'register', got '%v'", ack.Name) + } go func() { - h(in, out) - out.Close() + h(ret, ack.Ret) + ack.Ret.Close() }() return nil } -type Handler func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (pass bool, err error) +type Handler func(msg *beam.Message, out beam.Sender) (pass bool, err error) func (hub *Hub) RegisterName(name string, h Handler) error { return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error { - var tasks sync.WaitGroup - copyTask := func(dst beam.Sender, src beam.Receiver) { - tasks.Add(1) - go func() { - defer tasks.Done() - if dst == nil { - return - } - defer dst.Close() - if src == nil { - return - } - Copy(dst, src) - }() - } for { - msg, msgin, msgout, err := in.Receive(beam.R | beam.W) + msg, err := in.Receive(beam.Ret) if err == io.EOF { break } @@ -95,24 +89,19 @@ func (hub *Hub) RegisterName(name string, h Handler) error { } var pass = true if msg.Name == name || name == "" { - pass, err = h(msg, msgin, msgout, out) + pass, err = h(msg, out) if err != nil { - if _, _, err := msgout.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0); err != nil { + if _, err := msg.Ret.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}); err != nil { return err } } } if pass { - nextin, nextout, err := out.Send(msg, beam.R|beam.W) - if err != nil { + if _, err := out.Send(msg); err != nil { return err } - copyTask(nextout, msgin) - copyTask(msgout, nextin) } else { - if msgout != nil { - msgout.Close() - } + msg.Ret.Close() } } return nil diff --git a/beam/utils/hub_test.go b/beam/utils/hub_test.go index 2ec1f98011..925e2894bc 100644 --- a/beam/utils/hub_test.go +++ b/beam/utils/hub_test.go @@ -9,42 +9,49 @@ import ( func TestHubSendEmpty(t *testing.T) { hub := NewHub() // Send to empty hub should silently drop - r, w, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, beam.R|beam.W) + ret, err := hub.Send(&beam.Message{Name: "hello", Args: nil, Ret: beam.RetPipe}) // Send must not return an error if err != nil { t.Fatal(err) } - // We set beam.R, so a valid receiver must be returned - if r == nil { - t.Fatalf("%#v", r) - } - // We set beam.W, so a valid receiver must be returned - if w == nil { - t.Fatalf("%#v", w) + // We set beam.R, so a valid return pipe must be returned + if ret == nil { + t.Fatalf("%#v", ret) } } type CountSender int -func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { +func (s *CountSender) Send(msg *beam.Message) (beam.Receiver, error) { (*s)++ - return nil, nil, nil + return nil, nil } func TestHubSendOneHandler(t *testing.T) { hub := NewHub() defer hub.Close() testutils.Timeout(t, func() { - in, _, err := hub.Send(&beam.Message{Name: "register", Args: nil}, beam.R) + handlerIn, err := hub.Send(&beam.Message{Name: "register", Args: nil, Ret: beam.RetPipe}) if err != nil { t.Fatal(err) } + ack, err := handlerIn.Receive(beam.Ret) + if err != nil { + t.Fatal(err) + } + if ack.Name != "register" { + t.Fatalf("%#v", err) + } + handlerOut := ack.Ret + if handlerOut == nil { + t.Fatalf("nil handler out") + } go func() { - if _, _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, 0); err != nil { + if _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}); err != nil { t.Fatal(err) } }() - msg, _, _, err := in.Receive(0) + msg, err := handlerIn.Receive(0) if err != nil { t.Fatal(err) } diff --git a/beam/utils/stack.go b/beam/utils/stack.go index 0748d6a041..1d2c0188ce 100644 --- a/beam/utils/stack.go +++ b/beam/utils/stack.go @@ -24,9 +24,9 @@ func NewStackSender() *StackSender { } } -func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) { +func (s *StackSender) Send(msg *beam.Message) (ret beam.Receiver, err error) { completed := s.walk(func(h beam.Sender) (ok bool) { - r, w, err = h.Send(msg, mode) + ret, err = h.Send(msg) fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) if err == nil { return true @@ -35,10 +35,10 @@ func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam }) // If walk was completed, it means we didn't find a valid handler if !completed { - return r, w, err + return ret, err } // Silently drop messages if no valid backend is available. - return beam.NopSender{}.Send(msg, mode) + return beam.NopSender{}.Send(msg) } func (s *StackSender) Add(dst beam.Sender) *StackSender { diff --git a/beam/utils/stack_test.go b/beam/utils/stack_test.go index fdc35201b0..7cc53eb466 100644 --- a/beam/utils/stack_test.go +++ b/beam/utils/stack_test.go @@ -17,7 +17,7 @@ func TestStackWithPipe(t *testing.T) { s.Add(w) testutils.Timeout(t, func() { go func() { - msg, _, _, err := r.Receive(0) + msg, err := r.Receive(0) if err != nil { t.Fatal(err) } @@ -28,7 +28,7 @@ func TestStackWithPipe(t *testing.T) { t.Fatalf("%#v", msg) } }() - _, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0) + _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}) if err != nil { t.Fatal(err) } @@ -46,7 +46,7 @@ func TestStackWithPair(t *testing.T) { s.Add(w) testutils.Timeout(t, func() { go func() { - msg, _, _, err := r.Receive(0) + msg, err := r.Receive(0) if err != nil { t.Fatal(err) } @@ -57,7 +57,7 @@ func TestStackWithPair(t *testing.T) { t.Fatalf("%#v", msg) } }() - _, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0) + _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}) if err != nil { t.Fatal(err) } @@ -91,9 +91,9 @@ func TestStackAdd(t *testing.T) { if s.Len() != 2 { t.Fatalf("%#v", beforeA) } - s.Send(&beam.Message{Name: "for b", Args: nil}, 0) - beforeB.Send(&beam.Message{Name: "for a", Args: nil}, 0) - beforeA.Send(&beam.Message{Name: "for nobody", Args: nil}, 0) + s.Send(&beam.Message{Name: "for b", Args: nil}) + beforeB.Send(&beam.Message{Name: "for a", Args: nil}) + beforeA.Send(&beam.Message{Name: "for nobody", Args: nil}) if len(a) != 1 { t.Fatalf("%#v", a) } @@ -113,7 +113,7 @@ func TestStackAddBad(t *testing.T) { t.Fatalf("%#v", s) } r.Close() - if _, _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}, 0); err != nil { + if _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}); err != nil { t.Fatal(err) } if s.Len() != 1 { diff --git a/swarmd/swarmd.go b/swarmd/swarmd.go index 3bda8e79dd..355fb3c42b 100644 --- a/swarmd/swarmd.go +++ b/swarmd/swarmd.go @@ -2,14 +2,14 @@ package main import ( "fmt" - "log" "github.com/codegangsta/cli" + "github.com/docker/libswarm/backends" "github.com/docker/libswarm/beam" beamutils "github.com/docker/libswarm/beam/utils" - "github.com/docker/libswarm/backends" _ "github.com/dotcloud/docker/api/server" "github.com/flynn/go-shlex" "io" + "log" "os" "strings" ) @@ -32,7 +32,7 @@ func cmdDaemon(c *cli.Context) { } hub := beamutils.NewHub() - hub.RegisterName("log", func(msg *beam.Message, in beam.Receiver, out, next beam.Sender) (bool, error) { + hub.RegisterName("log", func(msg *beam.Message, out beam.Sender) (bool, error) { log.Printf("%s\n", strings.Join(msg.Args, " ")) // Pass through to other logging hooks return true, nil @@ -45,42 +45,31 @@ func cmdDaemon(c *cli.Context) { Fatalf("%v", err) } fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " ")) - backendr, _, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}}, beam.R) + backend, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}, Ret: beam.RetPipe}) if err != nil { Fatalf("%s: %v\n", bName, err) } - // backendr will return either 'error' or 'register'. + // backend will return either 'error' or 'register'. for { - m, mr, mw, err := backendr.Receive(beam.R|beam.W) + m, err := backend.Receive(beam.Ret) if err == io.EOF { break } if err != nil { Fatalf("error reading from backend: %v", err) } - if m.Name == "error" { - Fatalf("backend sent error: %v", strings.Join(m.Args, " ")) - } - if m.Name == "register" { - // FIXME: adapt the beam interface to allow the caller to - // (optionally) pass their own Sender/Receiver? - // Would make proxying/splicing easier. - hubr, hubw, err := hub.Send(m, beam.R|beam.W) - if err != nil { - Fatalf("error binding backend to hub: %v", err) - } - fmt.Printf("successfully registered\n") - go beamutils.Copy(hubw, mr) - go beamutils.Copy(mw, hubr) + if _, err := hub.Send(m); err != nil { + Fatalf("error binding backend to hub: %v", err) } } } - in, _, err := hub.Send(&beam.Message{Name: "start"}, beam.R) + fmt.Printf("backends loaded. Sending 'start' to the hub\n") + job, err := hub.Send(&beam.Message{Name: "start", Ret: beam.RetPipe}) if err != nil { Fatalf("%v", err) } for { - msg, _, _, err := in.Receive(0) + msg, err := job.Receive(0) if err == io.EOF { break }