Use new libchan interface

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad
2014-07-11 14:42:07 -07:00
parent 04b2a96ff8
commit cde5bab4fa
7 changed files with 209 additions and 179 deletions

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
)
@@ -91,7 +90,7 @@ func (c *Client) Error(msg string, args ...interface{}) error {
return err
}
func (c *Client) Connect() (net.Conn, error) {
func (c *Client) Connect() (io.ReadWriteCloser, error) {
ret, err := c.Send(&Message{Verb: Connect, Ret: RetPipe})
if err != nil {
return nil, err
@@ -105,13 +104,7 @@ func (c *Client) Connect() (net.Conn, error) {
if msg.Att == nil {
return nil, fmt.Errorf("missing attachment")
}
conn, err := net.FileConn(msg.Att)
if err != nil {
msg.Att.Close()
return nil, err
}
msg.Att.Close()
return conn, nil
return msg.Att, nil
}
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))

37
iowrapper/iowrapper.go Normal file
View File

@@ -0,0 +1,37 @@
package iowrapper
import (
"io"
)
func Wrap(obj interface{}) io.ReadWriteCloser {
return iowrapper{obj}
}
type iowrapper struct {
obj interface{}
}
func (w iowrapper) Read(p []byte) (int, error) {
if reader, ok := w.obj.(io.Reader); ok {
return reader.Read(p)
} else {
return 0, io.ErrClosedPipe
}
}
func (w iowrapper) Write(p []byte) (int, error) {
if writer, ok := w.obj.(io.Writer); ok {
return writer.Write(p)
} else {
return 0, io.ErrClosedPipe
}
}
func (w iowrapper) Close() error {
if closer, ok := w.obj.(io.Closer); ok {
return closer.Close()
} else {
return nil
}
}

View File

@@ -1,29 +1,33 @@
package libswarm
import (
"github.com/docker/libchan"
"github.com/docker/libchan/data"
"github.com/dmcgowan/libchan"
"fmt"
"os"
"io"
)
type Message struct {
Verb
Args []string
Ret Sender
Att *os.File
Att io.ReadWriteCloser
}
type Sender interface {
Send(msg *Message) (Receiver, error)
Close() error
Unwrap() libchan.Sender
}
type Receiver interface {
Receive(mode int) (*Message, error)
Unwrap() libchan.Receiver
}
type internalMessage struct {
Verb
Args []string
Ret libchan.Sender
Att io.ReadWriteCloser
}
type senderWrapper struct {
@@ -35,15 +39,47 @@ func WrapSender(s libchan.Sender) Sender {
}
func (s *senderWrapper) Send(msg *Message) (Receiver, error) {
recv, err := s.Sender.Send(msg.LibchanMessage())
if err != nil {
return nil, err
}
return WrapReceiver(recv), err
}
var rcvr Receiver = NopReceiver{}
func (s *senderWrapper) Unwrap() libchan.Sender {
return s.Sender
imsg := &internalMessage{
Verb: msg.Verb,
Args: msg.Args,
}
if msg.Ret != nil {
thisEnd, otherEnd, err := s.Sender.CreateNestedReceiver()
if err != nil {
return nil, err
}
imsg.Ret = otherEnd
if RetPipe.Equals(msg.Ret) {
rcvr = &receiverWrapper{thisEnd}
} else {
go Copy(msg.Ret, &receiverWrapper{thisEnd})
}
}
if msg.Att != nil {
byteStream, err := s.Sender.CreateByteStream()
if err != nil {
return nil, err
}
imsg.Att = byteStream
go func() {
io.Copy(byteStream, msg.Att)
byteStream.Close()
}()
go func() {
io.Copy(msg.Att, byteStream)
msg.Att.Close()
}()
}
return rcvr, s.Sender.Send(imsg)
}
type receiverWrapper struct {
@@ -55,115 +91,113 @@ func WrapReceiver(r libchan.Receiver) Receiver {
}
func (r *receiverWrapper) Receive(mode int) (*Message, error) {
lcm, err := r.Receiver.Receive(mode)
if err != nil {
imsg := &internalMessage{}
if err := r.Receiver.Receive(imsg); err != nil {
return nil, err
}
return DecodeLibchanMessage(lcm)
}
func (r *receiverWrapper) Unwrap() libchan.Receiver {
return r.Receiver
}
type senderUnwrapper struct {
Sender
}
func (su *senderUnwrapper) Send(lcm *libchan.Message) (libchan.Receiver, error) {
msg, err := DecodeLibchanMessage(lcm)
if err != nil {
return nil, err
var ret Sender
if imsg.Ret == nil {
ret = NopSender{}
} else {
ret = &senderWrapper{imsg.Ret}
}
recv, err := su.Sender.Send(msg)
if err != nil {
return nil, err
if mode&Ret == 0 {
if err := ret.Close(); err != nil {
return nil, err
}
}
return &receiverUnwrapper{recv}, nil
}
type receiverUnwrapper struct {
Receiver
}
func (ru *receiverUnwrapper) Receive(mode int) (*libchan.Message, error) {
msg, err := ru.Receiver.Receive(mode)
if err != nil {
return nil, err
msg := &Message{
Verb: imsg.Verb,
Args: imsg.Args,
Ret: ret,
Att: imsg.Att,
}
return msg.LibchanMessage(), nil
return msg, nil
}
func Pipe() (Receiver, Sender) {
func Pipe() (*receiverWrapper, *senderWrapper) {
r, s := libchan.Pipe()
return WrapReceiver(r), WrapSender(s)
return &receiverWrapper{r}, &senderWrapper{s}
}
func Copy(s Sender, r Receiver) (int, error) {
return libchan.Copy(s.Unwrap(), r.Unwrap())
}
func Handler(h func(msg *Message) error) Sender {
lch := libchan.Handler(func(lcm *libchan.Message) {
ret := WrapSender(lcm.Ret)
msg, err := DecodeLibchanMessage(lcm)
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
msg, err := src.Receive(Ret)
if err == io.EOF {
return n, nil
}
if err != nil {
ret.Send(&Message{Verb: Error, Args: []string{err.Error()}})
return n, err
}
if err = h(msg); err != nil {
ret.Send(&Message{Verb: Error, Args: []string{err.Error()}})
if _, err := dst.Send(msg); err != nil {
return n, err
}
})
return WrapSender(lch)
n++
}
}
var RetPipe = WrapSender(libchan.RetPipe)
var Ret = libchan.Ret
type Handler func(msg *Message) error
func (h Handler) Send(msg *Message) (Receiver, error) {
var ret Receiver
if RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
go func() {
if msg.Ret == nil {
msg.Ret = NopSender{}
}
h(msg)
msg.Ret.Close()
}()
return ret, nil
}
func (h Handler) Close() error {
return fmt.Errorf("can't close a Handler")
}
func Repeater(payload *Message) Sender {
return Handler(func(msg *Message) error {
msg.Ret.Send(payload)
return nil
})
}
var notImplementedMsg = &Message{Verb: Error, Args: []string{"not implemented"}}
var NotImplemented = WrapSender(libchan.Repeater(notImplementedMsg.LibchanMessage()))
var NotImplemented = Repeater(notImplementedMsg)
func DecodeLibchanMessage(lcm *libchan.Message) (*Message, error) {
decoded, err := data.Decode(string(lcm.Data))
if err != nil {
return nil, err
}
verbList, exists := decoded["verb"]
if !exists {
return nil, fmt.Errorf("No 'verb' key found in message data: %s", lcm.Data)
}
if len(verbList) != 1 {
return nil, fmt.Errorf("Expected exactly one verb, got %d: %#v", len(verbList), verbList)
}
verb, err := VerbFromString(verbList[0])
if err != nil {
return nil, err
}
args, exists := decoded["args"]
if !exists {
return nil, fmt.Errorf("No 'args' key found in message data: %s", lcm.Data)
}
return &Message{
Verb: verb,
Args: args,
Ret: WrapSender(lcm.Ret),
Att: lcm.Fd,
}, nil
const (
Ret int = 1 << iota
// FIXME: use an `Att` flag to auto-close attachments by default
)
type retPipe struct {
NopSender
}
func (m *Message) LibchanMessage() *libchan.Message {
encoded := data.Empty().
Set("verb", m.Verb.String()).
Set("args", m.Args...)
var RetPipe = retPipe{}
var ret libchan.Sender
if m.Ret != nil {
ret = m.Ret.Unwrap()
}
return &libchan.Message{
Data: []byte(encoded),
Ret: ret,
Fd: m.Att,
func (r retPipe) Equals(val Sender) bool {
if rval, ok := val.(retPipe); ok {
return rval == r
}
return false
}
type NopSender struct{}
func (s NopSender) Send(msg *Message) (Receiver, error) {
return NopReceiver{}, nil
}
func (s NopSender) Close() error {
return nil
}
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, error) {
return nil, io.EOF
}

View File

@@ -1,6 +1,9 @@
package libswarm
import (
"github.com/docker/libswarm/iowrapper"
"io"
"io/ioutil"
"reflect"
"testing"
@@ -29,10 +32,11 @@ func TestVerbArgs(t *testing.T) {
func TestReturnChannel(t *testing.T) {
receiver, sender := Pipe()
replyReceiver, replySender := Pipe()
go func() {
receivedMsg, err := receiver.Receive(0)
receivedMsg, err := receiver.Receive(Ret)
if err != nil {
t.Fatal(err)
}
@@ -42,8 +46,7 @@ func TestReturnChannel(t *testing.T) {
receivedMsg.Ret.Send(&Message{Verb: Set})
}()
_, err := sender.Send(&Message{Verb: Get, Ret: replySender})
if err != nil {
if _, err := sender.Send(&Message{Verb: Get, Ret: replySender}); err != nil {
t.Fatal(err)
}
@@ -63,7 +66,7 @@ func TestRetPipe(t *testing.T) {
receiver, sender := Pipe()
go func() {
receivedMsg, err := receiver.Receive(0)
receivedMsg, err := receiver.Receive(Ret)
if err != nil {
t.Fatal(err)
}
@@ -71,6 +74,7 @@ func TestRetPipe(t *testing.T) {
t.Fatalf("Didn't get a message")
}
receivedMsg.Ret.Send(&Message{Verb: Set})
receivedMsg.Ret.Close()
}()
replyReceiver, err := sender.Send(&Message{Verb: Get, Ret: RetPipe})
@@ -92,28 +96,17 @@ func TestRetPipe(t *testing.T) {
func TestAttachment(t *testing.T) {
expectedContents := "hello world\n"
f, err := ioutil.TempFile("/tmp", "libswarm-beam-TestAttachment-")
if err != nil {
t.Fatal(err)
}
defer f.Close()
if err = ioutil.WriteFile(f.Name(), []byte(expectedContents), 0700); err != nil {
t.Fatal(err)
}
if err = f.Sync(); err != nil {
t.Fatal(err)
}
r, w := io.Pipe()
receiver, sender := Pipe()
go func() {
msg, err := receiver.Receive(0)
msg, err := receiver.Receive(Ret)
if err != nil {
t.Fatal(err)
}
msg.Ret.Send(&Message{Verb: Connect, Att: f})
msg.Ret.Send(&Message{Verb: Connect, Att: iowrapper.Wrap(r)})
}()
ret, err := sender.Send(&Message{Verb: Connect, Ret: RetPipe})
@@ -125,11 +118,18 @@ func TestAttachment(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if reply == nil {
t.Fatalf("Didn't get a reply")
}
if reply.Att == nil {
t.Fatalf("Didn't get an attachment back")
}
contents, err := ioutil.ReadAll(reply.Att)
go func() {
w.Write([]byte(expectedContents))
w.Close()
}()
contents, err := ioutil.ReadAll(reply.Att.(io.Reader))
if err != nil {
t.Fatal(err)
}

View File

@@ -1,8 +1,6 @@
package libswarm
import (
"github.com/docker/libchan"
"fmt"
)
@@ -111,7 +109,3 @@ func (s *Server) Send(msg *Message) (Receiver, error) {
func (s *Server) Close() error {
return fmt.Errorf("can't close")
}
func (s *Server) Unwrap() libchan.Sender {
return &senderUnwrapper{s}
}

View File

@@ -37,18 +37,12 @@
"Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5"
},
{
"ImportPath": "github.com/docker/libchan",
"Rev": "459978d483ec79a7d8e980ebca00eb950eb64931"
"ImportPath": "github.com/dmcgowan/go/codec",
"Rev": "5d26f5fd8a4e87c22037b52bf41aad8d69e5f436"
},
{
"ImportPath": "github.com/docker/libcontainer/cgroups",
"Comment": "v1.1.0-56-gfb3d909",
"Rev": "fb3d909c288ab23f005ddbbdd8bc81c36a1cd701"
},
{
"ImportPath": "github.com/docker/libcontainer/devices",
"Comment": "v1.1.0-56-gfb3d909",
"Rev": "fb3d909c288ab23f005ddbbdd8bc81c36a1cd701"
"ImportPath": "github.com/dmcgowan/libchan",
"Rev": "a7122feb42d3e5e706373c034128e3f80252c350"
},
{
"ImportPath": "github.com/dotcloud/docker/api",
@@ -75,6 +69,16 @@
"Comment": "v0.11.1-466-g77ae37a",
"Rev": "77ae37a3836997d215ed3f1750533a9815205695"
},
{
"ImportPath": "github.com/dotcloud/docker/pkg/beam",
"Comment": "v0.11.1-466-g77ae37a",
"Rev": "77ae37a3836997d215ed3f1750533a9815205695"
},
{
"ImportPath": "github.com/dotcloud/docker/pkg/libcontainer/cgroups",
"Comment": "v0.11.1-466-g77ae37a",
"Rev": "77ae37a3836997d215ed3f1750533a9815205695"
},
{
"ImportPath": "github.com/dotcloud/docker/pkg/listenbuffer",
"Comment": "v0.11.1-466-g77ae37a",

View File

@@ -1,32 +0,0 @@
package utils
import (
"github.com/docker/libchan"
"github.com/docker/libswarm"
"io"
)
type NopSender struct{}
func (s NopSender) Send(msg *libswarm.Message) (libswarm.Receiver, error) {
return NopReceiver{}, nil
}
func (s NopSender) Close() error {
return nil
}
func (s NopSender) Unwrap() libchan.Sender {
return libchan.NopSender{}
}
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*libswarm.Message, error) {
return nil, io.EOF
}
func (r NopReceiver) Unwrap() libchan.Receiver {
return libchan.NopReceiver{}
}