Simplify the beam API

* Callback pattern that is easier to grok: each message has an optional nested return channel, instead of 2 nested streams
* Return channel is in the Message structure: less arguments and return values to pass around
* Caller can request a pipe as return channel, or pass its own. This makes proxying and advanced plumbing operations much easier.

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes
2014-06-02 06:15:28 +00:00
parent f86db62eae
commit 3265abfa2e
14 changed files with 259 additions and 386 deletions

View File

@@ -1,12 +1,11 @@
package backends
import (
"io"
"fmt"
"github.com/docker/libswarm/beam"
beamutils "github.com/docker/libswarm/beam/utils"
"io"
"strings"
"sync"
"time"
)
@@ -18,18 +17,19 @@ import (
// Example: `New().Job("debug").Run()`
func New() beam.Sender {
backends := beamutils.NewHub()
backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) {
return false, fmt.Errorf("no such backend: %s\n", strings.Join(msg.Args, " "))
})
backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) {
if len(msg.Args) > 0 && msg.Args[0] == "debug" {
debugr, debugw, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
debug, err := out.Send(&beam.Message{Name: "register", Ret: beam.RetPipe})
if err != nil {
return false, err
}
// Spawn the debug object
go func() {
for {
msg, msgr, msgw, err := debugr.Receive(beam.R | beam.W)
msg, err := debug.Receive(beam.Ret)
if err == io.EOF {
return
}
@@ -37,34 +37,35 @@ func New() beam.Sender {
return
}
fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " "))
// FIXME: goroutine?
Splice(debugw, msg, msgr, msgw)
if _, err := out.Send(msg); err != nil {
return
}
}
}()
return false, nil
}
return true, nil
})
backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
backends.RegisterName("cd", func(msg *beam.Message, out beam.Sender) (bool, error) {
if len(msg.Args) > 0 && msg.Args[0] == "fakeclient" {
_, fakew, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
_, err := out.Send(&beam.Message{Name: "register", Ret: beam.NopSender{}})
if err != nil {
return false, err
}
// Spawm the fakeclient task
// FIXME: only do this after started?
go func() {
defer fmt.Printf("[FAKECLIENT] done\n")
out.Send(&beam.Message{Name: "log", Args: []string{"fake client starting"}})
defer out.Send(&beam.Message{Name: "log", Args: []string{"fake client terminating"}})
for {
time.Sleep(1 * time.Second)
fmt.Printf("[FAKECLIENT] heartbeat\n")
_, _, err := fakew.Send(&beam.Message{Name: "log", Args: []string{"fake client reporting for duty"}}, 0)
_, err := out.Send(&beam.Message{Name: "log", Args: []string{"fake client reporting for duty"}})
if err != nil {
return
}
containersR, _, err := fakew.Send(&beam.Message{Name: "containers"}, beam.R)
if err != nil {
if _, err := out.Send(&beam.Message{Name: "children", Ret: beam.NopSender{}}); err != nil {
return
}
go beamutils.Copy(beam.NopSender{}, containersR)
}
}()
return false, nil
@@ -73,24 +74,3 @@ func New() beam.Sender {
})
return backends
}
func Splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error {
dstR, dstW, err := dst.Send(msg, beam.R|beam.W)
if err != nil {
return err
}
defer dstW.Close()
var tasks sync.WaitGroup
_copy := func(dst beam.Sender, src beam.Receiver) {
tasks.Add(1)
go func() {
defer tasks.Done()
beamutils.Copy(dst, src)
dst.Close()
}()
}
_copy(dstW, r)
_copy(w, dstR)
tasks.Wait()
return nil
}

View File

@@ -6,23 +6,24 @@ import (
)
type Sender interface {
Send(msg *Message, mode int) (Receiver, Sender, error)
Send(msg *Message) (Receiver, error)
Close() error
}
type Receiver interface {
Receive(mode int) (*Message, Receiver, Sender, error)
Receive(mode int) (*Message, error)
}
type Message struct {
Name string
Args []string
Att *os.File
Ret Sender
}
const (
R = 1 << (32 - 1 - iota)
W
Ret int = 1 << iota
// FIXME: use an `Att` flag to auto-close attachments by default
)
type ReceiverFrom interface {
@@ -37,3 +38,20 @@ var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)
// RetPipe is a special value for `Message.Ret`.
// When a Message is sent with `Ret=SendPipe`, the transport must
// substitute it with the writing end of a new pipe, and return the
// other end as a return value.
type retPipe struct {
NopSender
}
var RetPipe = retPipe{}
func (r retPipe) Equals(val Sender) bool {
if rval, ok := val.(retPipe); ok {
return rval == r
}
return false
}

View File

@@ -5,13 +5,16 @@ import (
)
func TestModes(t *testing.T) {
if R == W {
t.Fatalf("0")
}
if R == 0 {
t.Fatalf("0")
}
if W == 0 {
if Ret == 0 {
t.Fatalf("0")
}
}
func TestRetPipe(t *testing.T) {
var (
shouldBeEqual = RetPipe
)
if RetPipe != shouldBeEqual {
t.Fatalf("%#v should equal %#v", RetPipe, shouldBeEqual)
}
}

View File

@@ -1,57 +1,33 @@
package utils
package beam
import (
"github.com/docker/libswarm/beam"
"sync"
)
func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
func Copy(dst Sender, src Receiver) (int, error) {
var tasks sync.WaitGroup
defer tasks.Wait()
if senderTo, ok := src.(beam.SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender {
if senderTo, ok := src.(SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
return n, err
}
}
if receiverFrom, ok := dst.(beam.ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver {
if receiverFrom, ok := dst.(ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
return n, err
}
}
var (
n int
)
copyAndClose := func(dst beam.Sender, src beam.Receiver) {
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}
for {
msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W)
msg, err := src.Receive(Ret)
if err != nil {
return n, err
}
sndR, sndW, err := dst.Send(msg, beam.R|beam.W)
if err != nil {
if rcvW != nil {
rcvW.Close()
}
if _, err := dst.Send(msg); err != nil {
return n, err
}
tasks.Add(2)
go func() {
copyAndClose(rcvW, sndR)
tasks.Done()
}()
go func() {
copyAndClose(sndW, rcvR)
tasks.Done()
}()
n++
}
return n, nil

View File

@@ -23,16 +23,10 @@ type pipe struct {
wl sync.Mutex
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
pmsg *pipeMessage
msg *beam.Message
}
type pipeMessage struct {
msg *beam.Message
out *PipeSender
in *PipeReceiver
}
func (p *pipe) psend(pmsg *pipeMessage) error {
func (p *pipe) psend(msg *beam.Message) error {
var err error
// One writer at a time.
p.wl.Lock()
@@ -40,10 +34,10 @@ func (p *pipe) psend(pmsg *pipeMessage) error {
p.l.Lock()
defer p.l.Unlock()
p.pmsg = pmsg
p.msg = msg
p.rwait.Signal()
for {
if p.pmsg == nil {
if p.msg == nil {
break
}
if p.rerr != nil {
@@ -55,38 +49,20 @@ func (p *pipe) psend(pmsg *pipeMessage) error {
}
p.wwait.Wait()
}
p.pmsg = nil // in case of rerr or werr
p.msg = nil // in case of rerr or werr
return err
}
func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) {
// Prepare the message
pmsg := &pipeMessage{msg: msg}
if mode&beam.R != 0 {
in, pmsg.out = Pipe()
defer func() {
if err != nil {
in.Close()
in = nil
pmsg.out.Close()
}
}()
func (p *pipe) send(msg *beam.Message) (ret beam.Receiver, err error) {
// Prepare nested Receiver if requested
if beam.RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
if mode&beam.W != 0 {
pmsg.in, out = Pipe()
defer func() {
if err != nil {
out.Close()
out = nil
pmsg.in.Close()
}
}()
}
err = p.psend(pmsg)
err = p.psend(msg)
return
}
func (p *pipe) preceive() (*pipeMessage, error) {
func (p *pipe) preceive() (*beam.Message, error) {
p.rl.Lock()
defer p.rl.Unlock()
@@ -96,7 +72,7 @@ func (p *pipe) preceive() (*pipeMessage, error) {
if p.rerr != nil {
return nil, io.ErrClosedPipe
}
if p.pmsg != nil {
if p.msg != nil {
break
}
if p.werr != nil {
@@ -104,26 +80,24 @@ func (p *pipe) preceive() (*pipeMessage, error) {
}
p.rwait.Wait()
}
pmsg := p.pmsg
p.pmsg = nil
msg := p.msg
p.msg = nil
p.wwait.Signal()
return pmsg, nil
return msg, nil
}
func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) {
pmsg, err := p.preceive()
func (p *pipe) receive(mode int) (*beam.Message, error) {
msg, err := p.preceive()
if err != nil {
return nil, nil, nil, err
return nil, err
}
if pmsg.out != nil && mode&beam.W == 0 {
pmsg.out.Close()
pmsg.out = nil
if msg.Ret == nil {
msg.Ret = beam.NopSender{}
}
if pmsg.in != nil && mode&beam.R == 0 {
pmsg.in.Close()
pmsg.in = nil
if mode&beam.Ret == 0 {
msg.Ret.Close()
}
return pmsg.msg, pmsg.in, pmsg.out, nil
return msg, nil
}
func (p *pipe) rclose(err error) {
@@ -154,27 +128,8 @@ type PipeReceiver struct {
p *pipe
}
func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
msg, pin, pout, err := r.p.receive(mode)
if err != nil {
return nil, nil, nil, err
}
var (
// Always return NopReceiver/NopSender instead of nil values,
// because:
// - if they were requested in the mode, they can safely be used
// - if they were not requested, they can safely be ignored (ie no leak if they
// aren't closed)
in beam.Receiver = beam.NopReceiver{}
out beam.Sender = beam.NopSender{}
)
if pin != nil {
in = pin
}
if pout != nil {
out = pout
}
return msg, in, out, err
func (r *PipeReceiver) Receive(mode int) (*beam.Message, error) {
return r.p.receive(mode)
}
func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) {
@@ -215,19 +170,8 @@ type PipeSender struct {
p *pipe
}
func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
pin, pout, err := w.p.send(msg, mode)
var (
in beam.Receiver
out beam.Sender
)
if pin != nil {
in = pin
}
if pout != nil {
out = pout
}
return in, out, err
func (w *PipeSender) Send(msg *beam.Message) (beam.Receiver, error) {
return w.p.send(msg)
}
func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) {

View File

@@ -9,18 +9,33 @@ import (
"testing"
)
func TestReceiveW(t *testing.T) {
func TestRetPipe(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
wait := make(chan struct{})
go func() {
w.Send(&beam.Message{Name: "hello"}, 0)
ret, err := w.Send(&beam.Message{Name: "hello", Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "this better not crash" {
t.Fatalf("%#v", msg)
}
close(wait)
}()
_, _, ww, err := r.Receive(beam.W)
msg, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if _, _, err := ww.Send(&beam.Message{Name: "this better not crash"}, 0); err != nil {
if _, err := msg.Ret.Send(&beam.Message{Name: "this better not crash"}); err != nil {
t.Fatal(err)
}
<-wait
}
func TestSimpleSend(t *testing.T) {
@@ -29,7 +44,7 @@ func TestSimpleSend(t *testing.T) {
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, in, out, err := r.Receive(0)
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
@@ -39,40 +54,13 @@ func TestSimpleSend(t *testing.T) {
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
assertMode(t, in, out, 0)
}()
in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
if err != nil {
if _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}); err != nil {
t.Fatal(err)
}
assertMode(t, in, out, 0)
})
}
// assertMode verifies that the values of r and w match
// mode.
// If mode has the R bit set, r must be non-nil.
// If mode has the W bit set, w must be non-nil.
//
// If any of these conditions are not met, t.Fatal is called and the active
// test fails.
func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil
if mode&beam.R != 0 {
if r == nil {
t.Fatalf("should be non-nil: %#v", r)
}
// Otherwise it must be nil.
}
// If mode has the W bit set, w must be non-nil
if mode&beam.W != 0 {
if w == nil {
t.Fatalf("should be non-nil: %#v", w)
}
// Otherwise it must be nil.
}
}
func TestSendReply(t *testing.T) {
r, w := Pipe()
defer r.Close()
@@ -80,77 +68,41 @@ func TestSendReply(t *testing.T) {
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.R)
if ret == nil {
t.Fatalf("ret = nil\n")
}
// Read for a reply
resp, _, _, err := in.Receive(0)
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if resp.Args[0] != "this is the reply" {
t.Fatalf("%#v", resp)
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=W
msg, in, out, err := r.Receive(beam.W)
// Receive a message with mode=Ret
msg, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.W)
if msg.Ret == nil {
t.Fatalf("%#v", msg)
}
// Send a reply
_, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
_, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendNested(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=W
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.W)
// Send a nested message
_, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil {
t.Fatal(err)
}
}()
// Receive a message with mode=R
msg, in, out, err := r.Receive(beam.R)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.R)
// Read for a nested message
nested, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if nested.Args[0] != "this is the nested message" {
t.Fatalf("%#v", nested)
}
})
}
func TestSendFile(t *testing.T) {
r, w := Pipe()
defer r.Close()
@@ -165,12 +117,12 @@ func TestSendFile(t *testing.T) {
tmp.Seek(0, 0)
testutils.Timeout(t, func() {
go func() {
_, _, err := w.Send(&beam.Message{"file", []string{"path=" + tmp.Name()}, tmp}, 0)
_, err := w.Send(&beam.Message{Name: "file", Args: []string{"path=" + tmp.Name()}, Att: tmp})
if err != nil {
t.Fatal(err)
}
}()
msg, _, _, err := r.Receive(0)
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}

View File

@@ -6,8 +6,8 @@ import (
type NopSender struct{}
func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) {
return NopReceiver{}, NopSender{}, nil
func (s NopSender) Send(msg *Message) (Receiver, error) {
return NopReceiver{}, nil
}
func (s NopSender) Close() error {
@@ -16,6 +16,6 @@ func (s NopSender) Close() error {
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) {
return nil, nil, nil, io.EOF
func (r NopReceiver) Receive(mode int) (*Message, error) {
return nil, io.EOF
}

View File

@@ -50,75 +50,90 @@ func (c *Conn) Close() error {
return c.UnixConn.CloseWrite()
}
func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) {
if msg.Att != nil {
return nil, nil, fmt.Errorf("file attachment not yet implemented in unix transport")
return nil, fmt.Errorf("file attachment not yet implemented in unix transport")
}
parts := []string{msg.Name}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
var (
fd *os.File
r beam.Receiver
w beam.Sender
fd *os.File
ret beam.Receiver
err error
)
if mode&(beam.R|beam.W) != 0 {
// Caller requested a return pipe
if beam.RetPipe.Equals(msg.Ret) {
local, remote, err := sendablePair()
if err != nil {
return nil, nil, err
return nil, err
}
fd = remote
if mode&beam.R != 0 {
r = &Conn{local}
}
if mode&beam.W != 0 {
w = &Conn{local}
ret = &Conn{local}
// Caller specified its own return channel
} else if msg.Ret != nil {
// The specified return channel is a unix conn: engaging cheat mode!
if retConn, ok := msg.Ret.(*Conn); ok {
fd, err = retConn.UnixConn.File()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
// Close duplicate fd
retConn.UnixConn.Close()
// The specified return channel is an unknown type: proxy messages.
} else {
local.CloseWrite()
local, remote, err := sendablePair()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
fd = remote
// FIXME: do we need a reference no all these background tasks?
go func() {
// Copy messages from the remote return channel to the local return channel.
// When the remote return channel is closed, also close the local return channel.
localConn := &Conn{local}
beam.Copy(msg.Ret, localConn)
msg.Ret.Close()
localConn.Close()
}()
}
}
c.UnixConn.Send(b, fd)
return r, w, nil
if err := c.UnixConn.Send(b, fd); err != nil {
return nil, err
}
return ret, nil
}
func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
func (c *Conn) Receive(mode int) (*beam.Message, error) {
b, fd, err := c.UnixConn.Receive()
if err != nil {
return nil, nil, nil, err
return nil, err
}
parts, n, err := data.DecodeList(string(b))
if err != nil {
return nil, nil, nil, err
return nil, err
}
if n != len(b) {
return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n])
return nil, fmt.Errorf("garbage data %#v", b[:n])
}
if len(parts) == 0 {
return nil, nil, nil, fmt.Errorf("malformed message")
return nil, fmt.Errorf("malformed message")
}
msg := &beam.Message{Name: parts[0], Args: parts[1:]}
// Setup nested streams
var (
r beam.Receiver
w beam.Sender
)
// Apply mode mask
if fd != nil {
subconn, err := FileConn(fd)
if err != nil {
return nil, nil, nil, err
return nil, err
}
fd.Close()
if mode&beam.R != 0 {
r = &Conn{subconn}
}
if mode&beam.W != 0 {
w = &Conn{subconn}
if mode&beam.Ret != 0 {
msg.Ret = &Conn{subconn}
} else {
subconn.CloseWrite()
}
}
return msg, r, w, nil
return msg, nil
}

View File

@@ -6,9 +6,9 @@ import (
type Buffer []*beam.Message
func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
func (buf *Buffer) Send(msg *beam.Message) (beam.Receiver, error) {
(*buf) = append(*buf, msg)
return beam.NopReceiver{}, beam.NopSender{}, nil
return beam.NopReceiver{}, nil
}
func (buf *Buffer) Close() error {

View File

@@ -5,6 +5,7 @@ import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/inmem"
"io"
"strings"
"sync"
)
@@ -12,6 +13,7 @@ import (
type Hub struct {
handlers *StackSender
tasks sync.WaitGroup
l sync.RWMutex
}
func NewHub() *Hub {
@@ -20,73 +22,65 @@ func NewHub() *Hub {
}
}
func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
func (hub *Hub) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Name == "register" {
if mode&beam.R == 0 {
return nil, nil, fmt.Errorf("register: no return channel")
if msg.Ret == nil {
return nil, fmt.Errorf("register: no return channel")
}
fmt.Printf("[hub] received %v\n", msg)
hYoutr, hYoutw := inmem.Pipe()
hYinr, hYinw := inmem.Pipe()
hIn := msg.Ret
if hIn == beam.RetPipe {
ret, hIn = inmem.Pipe()
}
// This queue guarantees that the first message received by the handler
// is the "register" response.
hIn = NewQueue(hIn, 1)
// Reply to the handler with a "register" call of our own,
// passing a reference to the previous handler stack.
// This allows the new handler to query previous handlers
// without creating loops.
hOut, err := hIn.Send(&beam.Message{Name: "register", Ret: beam.RetPipe})
if err != nil {
return nil, err
}
// Register the new handler on top of the others,
// and get a reference to the previous stack of handlers.
prevHandlers := hub.handlers.Add(hYinw)
// Pass requests from the new handler to the previous chain of handlers
// hYout -> hXin
hub.tasks.Add(1)
go func() {
defer hub.tasks.Done()
Copy(prevHandlers, hYoutr)
hYoutr.Close()
}()
return hYinr, hYoutw, nil
prevHandlers := hub.handlers.Add(hIn)
go beam.Copy(prevHandlers, hOut)
return ret, nil
}
fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len())
return hub.handlers.Send(msg, mode)
}
func (hub *Hub) Register(dst beam.Sender) error {
in, _, err := hub.Send(&beam.Message{Name: "register"}, beam.R)
if err != nil {
return err
}
go Copy(dst, in)
return nil
return hub.handlers.Send(msg)
}
func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error {
in, out, err := hub.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
ret, err := hub.Send(&beam.Message{Name: "register", Ret: beam.RetPipe})
if err != nil {
return err
}
ack, err := ret.Receive(beam.Ret)
if err != nil {
return err
}
if ack.Name == "error" {
return fmt.Errorf(strings.Join(ack.Args, ", "))
}
if ack.Name != "register" {
return fmt.Errorf("invalid response: expected verb 'register', got '%v'", ack.Name)
}
go func() {
h(in, out)
out.Close()
h(ret, ack.Ret)
ack.Ret.Close()
}()
return nil
}
type Handler func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (pass bool, err error)
type Handler func(msg *beam.Message, out beam.Sender) (pass bool, err error)
func (hub *Hub) RegisterName(name string, h Handler) error {
return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error {
var tasks sync.WaitGroup
copyTask := func(dst beam.Sender, src beam.Receiver) {
tasks.Add(1)
go func() {
defer tasks.Done()
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}()
}
for {
msg, msgin, msgout, err := in.Receive(beam.R | beam.W)
msg, err := in.Receive(beam.Ret)
if err == io.EOF {
break
}
@@ -95,24 +89,19 @@ func (hub *Hub) RegisterName(name string, h Handler) error {
}
var pass = true
if msg.Name == name || name == "" {
pass, err = h(msg, msgin, msgout, out)
pass, err = h(msg, out)
if err != nil {
if _, _, err := msgout.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0); err != nil {
if _, err := msg.Ret.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}); err != nil {
return err
}
}
}
if pass {
nextin, nextout, err := out.Send(msg, beam.R|beam.W)
if err != nil {
if _, err := out.Send(msg); err != nil {
return err
}
copyTask(nextout, msgin)
copyTask(msgout, nextin)
} else {
if msgout != nil {
msgout.Close()
}
msg.Ret.Close()
}
}
return nil

View File

@@ -9,42 +9,49 @@ import (
func TestHubSendEmpty(t *testing.T) {
hub := NewHub()
// Send to empty hub should silently drop
r, w, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, beam.R|beam.W)
ret, err := hub.Send(&beam.Message{Name: "hello", Args: nil, Ret: beam.RetPipe})
// Send must not return an error
if err != nil {
t.Fatal(err)
}
// We set beam.R, so a valid receiver must be returned
if r == nil {
t.Fatalf("%#v", r)
}
// We set beam.W, so a valid receiver must be returned
if w == nil {
t.Fatalf("%#v", w)
// We set beam.R, so a valid return pipe must be returned
if ret == nil {
t.Fatalf("%#v", ret)
}
}
type CountSender int
func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
func (s *CountSender) Send(msg *beam.Message) (beam.Receiver, error) {
(*s)++
return nil, nil, nil
return nil, nil
}
func TestHubSendOneHandler(t *testing.T) {
hub := NewHub()
defer hub.Close()
testutils.Timeout(t, func() {
in, _, err := hub.Send(&beam.Message{Name: "register", Args: nil}, beam.R)
handlerIn, err := hub.Send(&beam.Message{Name: "register", Args: nil, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
ack, err := handlerIn.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if ack.Name != "register" {
t.Fatalf("%#v", err)
}
handlerOut := ack.Ret
if handlerOut == nil {
t.Fatalf("nil handler out")
}
go func() {
if _, _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, 0); err != nil {
if _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}); err != nil {
t.Fatal(err)
}
}()
msg, _, _, err := in.Receive(0)
msg, err := handlerIn.Receive(0)
if err != nil {
t.Fatal(err)
}

View File

@@ -24,9 +24,9 @@ func NewStackSender() *StackSender {
}
}
func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) {
func (s *StackSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
completed := s.walk(func(h beam.Sender) (ok bool) {
r, w, err = h.Send(msg, mode)
ret, err = h.Send(msg)
fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err)
if err == nil {
return true
@@ -35,10 +35,10 @@ func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam
})
// If walk was completed, it means we didn't find a valid handler
if !completed {
return r, w, err
return ret, err
}
// Silently drop messages if no valid backend is available.
return beam.NopSender{}.Send(msg, mode)
return beam.NopSender{}.Send(msg)
}
func (s *StackSender) Add(dst beam.Sender) *StackSender {

View File

@@ -17,7 +17,7 @@ func TestStackWithPipe(t *testing.T) {
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, _, _, err := r.Receive(0)
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
@@ -28,7 +28,7 @@ func TestStackWithPipe(t *testing.T) {
t.Fatalf("%#v", msg)
}
}()
_, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0)
_, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
@@ -46,7 +46,7 @@ func TestStackWithPair(t *testing.T) {
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, _, _, err := r.Receive(0)
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
@@ -57,7 +57,7 @@ func TestStackWithPair(t *testing.T) {
t.Fatalf("%#v", msg)
}
}()
_, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0)
_, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
@@ -91,9 +91,9 @@ func TestStackAdd(t *testing.T) {
if s.Len() != 2 {
t.Fatalf("%#v", beforeA)
}
s.Send(&beam.Message{Name: "for b", Args: nil}, 0)
beforeB.Send(&beam.Message{Name: "for a", Args: nil}, 0)
beforeA.Send(&beam.Message{Name: "for nobody", Args: nil}, 0)
s.Send(&beam.Message{Name: "for b", Args: nil})
beforeB.Send(&beam.Message{Name: "for a", Args: nil})
beforeA.Send(&beam.Message{Name: "for nobody", Args: nil})
if len(a) != 1 {
t.Fatalf("%#v", a)
}
@@ -113,7 +113,7 @@ func TestStackAddBad(t *testing.T) {
t.Fatalf("%#v", s)
}
r.Close()
if _, _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}, 0); err != nil {
if _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}); err != nil {
t.Fatal(err)
}
if s.Len() != 1 {

View File

@@ -2,14 +2,14 @@ package main
import (
"fmt"
"log"
"github.com/codegangsta/cli"
"github.com/docker/libswarm/backends"
"github.com/docker/libswarm/beam"
beamutils "github.com/docker/libswarm/beam/utils"
"github.com/docker/libswarm/backends"
_ "github.com/dotcloud/docker/api/server"
"github.com/flynn/go-shlex"
"io"
"log"
"os"
"strings"
)
@@ -32,7 +32,7 @@ func cmdDaemon(c *cli.Context) {
}
hub := beamutils.NewHub()
hub.RegisterName("log", func(msg *beam.Message, in beam.Receiver, out, next beam.Sender) (bool, error) {
hub.RegisterName("log", func(msg *beam.Message, out beam.Sender) (bool, error) {
log.Printf("%s\n", strings.Join(msg.Args, " "))
// Pass through to other logging hooks
return true, nil
@@ -45,42 +45,31 @@ func cmdDaemon(c *cli.Context) {
Fatalf("%v", err)
}
fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " "))
backendr, _, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}}, beam.R)
backend, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}, Ret: beam.RetPipe})
if err != nil {
Fatalf("%s: %v\n", bName, err)
}
// backendr will return either 'error' or 'register'.
// backend will return either 'error' or 'register'.
for {
m, mr, mw, err := backendr.Receive(beam.R|beam.W)
m, err := backend.Receive(beam.Ret)
if err == io.EOF {
break
}
if err != nil {
Fatalf("error reading from backend: %v", err)
}
if m.Name == "error" {
Fatalf("backend sent error: %v", strings.Join(m.Args, " "))
}
if m.Name == "register" {
// FIXME: adapt the beam interface to allow the caller to
// (optionally) pass their own Sender/Receiver?
// Would make proxying/splicing easier.
hubr, hubw, err := hub.Send(m, beam.R|beam.W)
if err != nil {
Fatalf("error binding backend to hub: %v", err)
}
fmt.Printf("successfully registered\n")
go beamutils.Copy(hubw, mr)
go beamutils.Copy(mw, hubr)
if _, err := hub.Send(m); err != nil {
Fatalf("error binding backend to hub: %v", err)
}
}
}
in, _, err := hub.Send(&beam.Message{Name: "start"}, beam.R)
fmt.Printf("backends loaded. Sending 'start' to the hub\n")
job, err := hub.Send(&beam.Message{Name: "start", Ret: beam.RetPipe})
if err != nil {
Fatalf("%v", err)
}
for {
msg, _, _, err := in.Receive(0)
msg, err := job.Receive(0)
if err == io.EOF {
break
}