From cde5bab4fa0bf4b8a21a1c98e9196c6428ca6ea2 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 11 Jul 2014 14:42:07 -0700 Subject: [PATCH] Use new libchan interface Signed-off-by: Aanand Prasad --- client.go | 11 +- iowrapper/iowrapper.go | 37 ++++++ message.go | 240 ++++++++++++++++++++++---------------- message_test.go | 38 +++--- server.go | 6 - swarmd/Godeps/Godeps.json | 24 ++-- utils/nop.go | 32 ----- 7 files changed, 209 insertions(+), 179 deletions(-) create mode 100644 iowrapper/iowrapper.go delete mode 100644 utils/nop.go diff --git a/client.go b/client.go index 518c42dccf..145ffef0aa 100644 --- a/client.go +++ b/client.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "net" "strings" ) @@ -91,7 +90,7 @@ func (c *Client) Error(msg string, args ...interface{}) error { return err } -func (c *Client) Connect() (net.Conn, error) { +func (c *Client) Connect() (io.ReadWriteCloser, error) { ret, err := c.Send(&Message{Verb: Connect, Ret: RetPipe}) if err != nil { return nil, err @@ -105,13 +104,7 @@ func (c *Client) Connect() (net.Conn, error) { if msg.Att == nil { return nil, fmt.Errorf("missing attachment") } - conn, err := net.FileConn(msg.Att) - if err != nil { - msg.Att.Close() - return nil, err - } - msg.Att.Close() - return conn, nil + return msg.Att, nil } if msg.Verb == Error { return nil, fmt.Errorf(strings.Join(msg.Args[:1], "")) diff --git a/iowrapper/iowrapper.go b/iowrapper/iowrapper.go new file mode 100644 index 0000000000..d4d1b91d93 --- /dev/null +++ b/iowrapper/iowrapper.go @@ -0,0 +1,37 @@ +package iowrapper + +import ( + "io" +) + +func Wrap(obj interface{}) io.ReadWriteCloser { + return iowrapper{obj} +} + +type iowrapper struct { + obj interface{} +} + +func (w iowrapper) Read(p []byte) (int, error) { + if reader, ok := w.obj.(io.Reader); ok { + return reader.Read(p) + } else { + return 0, io.ErrClosedPipe + } +} + +func (w iowrapper) Write(p []byte) (int, error) { + if writer, ok := w.obj.(io.Writer); ok { + return writer.Write(p) + } else { + return 0, io.ErrClosedPipe + } +} + +func (w iowrapper) Close() error { + if closer, ok := w.obj.(io.Closer); ok { + return closer.Close() + } else { + return nil + } +} diff --git a/message.go b/message.go index 0e06e95f8a..734f657750 100644 --- a/message.go +++ b/message.go @@ -1,29 +1,33 @@ package libswarm import ( - "github.com/docker/libchan" - "github.com/docker/libchan/data" + "github.com/dmcgowan/libchan" "fmt" - "os" + "io" ) type Message struct { Verb Args []string Ret Sender - Att *os.File + Att io.ReadWriteCloser } type Sender interface { Send(msg *Message) (Receiver, error) Close() error - Unwrap() libchan.Sender } type Receiver interface { Receive(mode int) (*Message, error) - Unwrap() libchan.Receiver +} + +type internalMessage struct { + Verb + Args []string + Ret libchan.Sender + Att io.ReadWriteCloser } type senderWrapper struct { @@ -35,15 +39,47 @@ func WrapSender(s libchan.Sender) Sender { } func (s *senderWrapper) Send(msg *Message) (Receiver, error) { - recv, err := s.Sender.Send(msg.LibchanMessage()) - if err != nil { - return nil, err - } - return WrapReceiver(recv), err -} + var rcvr Receiver = NopReceiver{} -func (s *senderWrapper) Unwrap() libchan.Sender { - return s.Sender + imsg := &internalMessage{ + Verb: msg.Verb, + Args: msg.Args, + } + + if msg.Ret != nil { + thisEnd, otherEnd, err := s.Sender.CreateNestedReceiver() + if err != nil { + return nil, err + } + + imsg.Ret = otherEnd + + if RetPipe.Equals(msg.Ret) { + rcvr = &receiverWrapper{thisEnd} + } else { + go Copy(msg.Ret, &receiverWrapper{thisEnd}) + } + } + + if msg.Att != nil { + byteStream, err := s.Sender.CreateByteStream() + if err != nil { + return nil, err + } + + imsg.Att = byteStream + + go func() { + io.Copy(byteStream, msg.Att) + byteStream.Close() + }() + go func() { + io.Copy(msg.Att, byteStream) + msg.Att.Close() + }() + } + + return rcvr, s.Sender.Send(imsg) } type receiverWrapper struct { @@ -55,115 +91,113 @@ func WrapReceiver(r libchan.Receiver) Receiver { } func (r *receiverWrapper) Receive(mode int) (*Message, error) { - lcm, err := r.Receiver.Receive(mode) - if err != nil { + imsg := &internalMessage{} + if err := r.Receiver.Receive(imsg); err != nil { return nil, err } - return DecodeLibchanMessage(lcm) -} - -func (r *receiverWrapper) Unwrap() libchan.Receiver { - return r.Receiver -} - -type senderUnwrapper struct { - Sender -} - -func (su *senderUnwrapper) Send(lcm *libchan.Message) (libchan.Receiver, error) { - msg, err := DecodeLibchanMessage(lcm) - if err != nil { - return nil, err + var ret Sender + if imsg.Ret == nil { + ret = NopSender{} + } else { + ret = &senderWrapper{imsg.Ret} } - recv, err := su.Sender.Send(msg) - if err != nil { - return nil, err + if mode&Ret == 0 { + if err := ret.Close(); err != nil { + return nil, err + } } - return &receiverUnwrapper{recv}, nil -} - -type receiverUnwrapper struct { - Receiver -} - -func (ru *receiverUnwrapper) Receive(mode int) (*libchan.Message, error) { - msg, err := ru.Receiver.Receive(mode) - if err != nil { - return nil, err + msg := &Message{ + Verb: imsg.Verb, + Args: imsg.Args, + Ret: ret, + Att: imsg.Att, } - return msg.LibchanMessage(), nil + return msg, nil } -func Pipe() (Receiver, Sender) { +func Pipe() (*receiverWrapper, *senderWrapper) { r, s := libchan.Pipe() - return WrapReceiver(r), WrapSender(s) + return &receiverWrapper{r}, &senderWrapper{s} } -func Copy(s Sender, r Receiver) (int, error) { - return libchan.Copy(s.Unwrap(), r.Unwrap()) -} - -func Handler(h func(msg *Message) error) Sender { - lch := libchan.Handler(func(lcm *libchan.Message) { - ret := WrapSender(lcm.Ret) - msg, err := DecodeLibchanMessage(lcm) +func Copy(dst Sender, src Receiver) (int, error) { + var n int + for { + msg, err := src.Receive(Ret) + if err == io.EOF { + return n, nil + } if err != nil { - ret.Send(&Message{Verb: Error, Args: []string{err.Error()}}) + return n, err } - if err = h(msg); err != nil { - ret.Send(&Message{Verb: Error, Args: []string{err.Error()}}) + if _, err := dst.Send(msg); err != nil { + return n, err } - }) - return WrapSender(lch) + n++ + } } -var RetPipe = WrapSender(libchan.RetPipe) -var Ret = libchan.Ret +type Handler func(msg *Message) error + +func (h Handler) Send(msg *Message) (Receiver, error) { + var ret Receiver + if RetPipe.Equals(msg.Ret) { + ret, msg.Ret = Pipe() + } + go func() { + if msg.Ret == nil { + msg.Ret = NopSender{} + } + h(msg) + msg.Ret.Close() + }() + return ret, nil +} + +func (h Handler) Close() error { + return fmt.Errorf("can't close a Handler") +} + +func Repeater(payload *Message) Sender { + return Handler(func(msg *Message) error { + msg.Ret.Send(payload) + return nil + }) +} var notImplementedMsg = &Message{Verb: Error, Args: []string{"not implemented"}} -var NotImplemented = WrapSender(libchan.Repeater(notImplementedMsg.LibchanMessage())) +var NotImplemented = Repeater(notImplementedMsg) -func DecodeLibchanMessage(lcm *libchan.Message) (*Message, error) { - decoded, err := data.Decode(string(lcm.Data)) - if err != nil { - return nil, err - } - verbList, exists := decoded["verb"] - if !exists { - return nil, fmt.Errorf("No 'verb' key found in message data: %s", lcm.Data) - } - if len(verbList) != 1 { - return nil, fmt.Errorf("Expected exactly one verb, got %d: %#v", len(verbList), verbList) - } - verb, err := VerbFromString(verbList[0]) - if err != nil { - return nil, err - } - args, exists := decoded["args"] - if !exists { - return nil, fmt.Errorf("No 'args' key found in message data: %s", lcm.Data) - } - return &Message{ - Verb: verb, - Args: args, - Ret: WrapSender(lcm.Ret), - Att: lcm.Fd, - }, nil +const ( + Ret int = 1 << iota + // FIXME: use an `Att` flag to auto-close attachments by default +) + +type retPipe struct { + NopSender } -func (m *Message) LibchanMessage() *libchan.Message { - encoded := data.Empty(). - Set("verb", m.Verb.String()). - Set("args", m.Args...) +var RetPipe = retPipe{} - var ret libchan.Sender - if m.Ret != nil { - ret = m.Ret.Unwrap() - } - - return &libchan.Message{ - Data: []byte(encoded), - Ret: ret, - Fd: m.Att, +func (r retPipe) Equals(val Sender) bool { + if rval, ok := val.(retPipe); ok { + return rval == r } + return false +} + +type NopSender struct{} + +func (s NopSender) Send(msg *Message) (Receiver, error) { + return NopReceiver{}, nil +} + +func (s NopSender) Close() error { + return nil +} + +type NopReceiver struct{} + +func (r NopReceiver) Receive(mode int) (*Message, error) { + return nil, io.EOF } diff --git a/message_test.go b/message_test.go index 9d24177b01..f9ea8e82e7 100644 --- a/message_test.go +++ b/message_test.go @@ -1,6 +1,9 @@ package libswarm import ( + "github.com/docker/libswarm/iowrapper" + + "io" "io/ioutil" "reflect" "testing" @@ -29,10 +32,11 @@ func TestVerbArgs(t *testing.T) { func TestReturnChannel(t *testing.T) { receiver, sender := Pipe() + replyReceiver, replySender := Pipe() go func() { - receivedMsg, err := receiver.Receive(0) + receivedMsg, err := receiver.Receive(Ret) if err != nil { t.Fatal(err) } @@ -42,8 +46,7 @@ func TestReturnChannel(t *testing.T) { receivedMsg.Ret.Send(&Message{Verb: Set}) }() - _, err := sender.Send(&Message{Verb: Get, Ret: replySender}) - if err != nil { + if _, err := sender.Send(&Message{Verb: Get, Ret: replySender}); err != nil { t.Fatal(err) } @@ -63,7 +66,7 @@ func TestRetPipe(t *testing.T) { receiver, sender := Pipe() go func() { - receivedMsg, err := receiver.Receive(0) + receivedMsg, err := receiver.Receive(Ret) if err != nil { t.Fatal(err) } @@ -71,6 +74,7 @@ func TestRetPipe(t *testing.T) { t.Fatalf("Didn't get a message") } receivedMsg.Ret.Send(&Message{Verb: Set}) + receivedMsg.Ret.Close() }() replyReceiver, err := sender.Send(&Message{Verb: Get, Ret: RetPipe}) @@ -92,28 +96,17 @@ func TestRetPipe(t *testing.T) { func TestAttachment(t *testing.T) { expectedContents := "hello world\n" - - f, err := ioutil.TempFile("/tmp", "libswarm-beam-TestAttachment-") - if err != nil { - t.Fatal(err) - } - defer f.Close() - if err = ioutil.WriteFile(f.Name(), []byte(expectedContents), 0700); err != nil { - t.Fatal(err) - } - if err = f.Sync(); err != nil { - t.Fatal(err) - } + r, w := io.Pipe() receiver, sender := Pipe() go func() { - msg, err := receiver.Receive(0) + msg, err := receiver.Receive(Ret) if err != nil { t.Fatal(err) } - msg.Ret.Send(&Message{Verb: Connect, Att: f}) + msg.Ret.Send(&Message{Verb: Connect, Att: iowrapper.Wrap(r)}) }() ret, err := sender.Send(&Message{Verb: Connect, Ret: RetPipe}) @@ -125,11 +118,18 @@ func TestAttachment(t *testing.T) { if err != nil { t.Fatal(err) } + if reply == nil { + t.Fatalf("Didn't get a reply") + } if reply.Att == nil { t.Fatalf("Didn't get an attachment back") } - contents, err := ioutil.ReadAll(reply.Att) + go func() { + w.Write([]byte(expectedContents)) + w.Close() + }() + contents, err := ioutil.ReadAll(reply.Att.(io.Reader)) if err != nil { t.Fatal(err) } diff --git a/server.go b/server.go index ee584d3380..e6ef7615c2 100644 --- a/server.go +++ b/server.go @@ -1,8 +1,6 @@ package libswarm import ( - "github.com/docker/libchan" - "fmt" ) @@ -111,7 +109,3 @@ func (s *Server) Send(msg *Message) (Receiver, error) { func (s *Server) Close() error { return fmt.Errorf("can't close") } - -func (s *Server) Unwrap() libchan.Sender { - return &senderUnwrapper{s} -} diff --git a/swarmd/Godeps/Godeps.json b/swarmd/Godeps/Godeps.json index ae13b072e6..12712adf58 100644 --- a/swarmd/Godeps/Godeps.json +++ b/swarmd/Godeps/Godeps.json @@ -37,18 +37,12 @@ "Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5" }, { - "ImportPath": "github.com/docker/libchan", - "Rev": "459978d483ec79a7d8e980ebca00eb950eb64931" + "ImportPath": "github.com/dmcgowan/go/codec", + "Rev": "5d26f5fd8a4e87c22037b52bf41aad8d69e5f436" }, { - "ImportPath": "github.com/docker/libcontainer/cgroups", - "Comment": "v1.1.0-56-gfb3d909", - "Rev": "fb3d909c288ab23f005ddbbdd8bc81c36a1cd701" - }, - { - "ImportPath": "github.com/docker/libcontainer/devices", - "Comment": "v1.1.0-56-gfb3d909", - "Rev": "fb3d909c288ab23f005ddbbdd8bc81c36a1cd701" + "ImportPath": "github.com/dmcgowan/libchan", + "Rev": "a7122feb42d3e5e706373c034128e3f80252c350" }, { "ImportPath": "github.com/dotcloud/docker/api", @@ -75,6 +69,16 @@ "Comment": "v0.11.1-466-g77ae37a", "Rev": "77ae37a3836997d215ed3f1750533a9815205695" }, + { + "ImportPath": "github.com/dotcloud/docker/pkg/beam", + "Comment": "v0.11.1-466-g77ae37a", + "Rev": "77ae37a3836997d215ed3f1750533a9815205695" + }, + { + "ImportPath": "github.com/dotcloud/docker/pkg/libcontainer/cgroups", + "Comment": "v0.11.1-466-g77ae37a", + "Rev": "77ae37a3836997d215ed3f1750533a9815205695" + }, { "ImportPath": "github.com/dotcloud/docker/pkg/listenbuffer", "Comment": "v0.11.1-466-g77ae37a", diff --git a/utils/nop.go b/utils/nop.go deleted file mode 100644 index 6efe9d2a72..0000000000 --- a/utils/nop.go +++ /dev/null @@ -1,32 +0,0 @@ -package utils - -import ( - "github.com/docker/libchan" - "github.com/docker/libswarm" - - "io" -) - -type NopSender struct{} - -func (s NopSender) Send(msg *libswarm.Message) (libswarm.Receiver, error) { - return NopReceiver{}, nil -} - -func (s NopSender) Close() error { - return nil -} - -func (s NopSender) Unwrap() libchan.Sender { - return libchan.NopSender{} -} - -type NopReceiver struct{} - -func (r NopReceiver) Receive(mode int) (*libswarm.Message, error) { - return nil, io.EOF -} - -func (r NopReceiver) Unwrap() libchan.Receiver { - return libchan.NopReceiver{} -}