From 3e4e74c5a1c189fef7b26d03673329f39e5c7fb7 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Fri, 15 May 2015 13:34:33 -0700 Subject: [PATCH] store: Close channels in case of errors in Watch/WatchTree. Signed-off-by: Andrea Luzzardi --- pkg/store/consul.go | 8 ++++++-- pkg/store/etcd.go | 30 ++++++++++++++++++++++-------- pkg/store/zookeeper.go | 4 ++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 2b40236c4c..32cddf0a93 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -143,6 +143,8 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro watchCh := make(chan *KVPair) go func() { + defer close(watchCh) + opts := &api.QueryOptions{} for { // Check if we should quit @@ -153,7 +155,7 @@ func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, erro } pair, meta, err := kv.Get(key, opts) if err != nil { - log.WithField("backend", "consul").Error(err) + log.Errorf("consul: %v", err) return } opts.WaitIndex = meta.LastIndex @@ -177,6 +179,8 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP watchCh := make(chan []*KVPair) go func() { + defer close(watchCh) + opts := &api.QueryOptions{} for { // Check if we should quit @@ -188,7 +192,7 @@ func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVP pairs, meta, err := kv.List(prefix, opts) if err != nil { - log.WithField("name", "consul").Error(err) + log.Errorf("consul: %v", err) return } kv := []*KVPair{} diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 10bdf6553b..4e09e69512 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -138,7 +138,12 @@ func (s *Etcd) Exists(key string) (bool, error) { // Providing a non-nil stopCh can be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) { key = normalize(key) - watchCh := make(chan *KVPair) + + // Get the current value + current, err := s.Get(key) + if err != nil { + return nil, err + } // Start an etcd watch. // Note: etcd will send the current value through the channel. @@ -148,11 +153,13 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // Adapter goroutine: The goal here is to convert wathever format etcd is // using into our interface. + watchCh := make(chan *KVPair) go func() { + defer close(watchCh) + // Push the current value through the channel. - if v, err := s.Get(key); err != nil { - watchCh <- v - } + watchCh <- current + for { select { case result := <-etcdWatchCh: @@ -176,7 +183,12 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // Providing a non-nil stopCh can be used to stop watching. func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error) { prefix = normalize(prefix) - watchCh := make(chan []*KVPair) + + // Get the current value + current, err := s.List(prefix) + if err != nil { + return nil, err + } // Start an etcd watch. etcdWatchCh := make(chan *etcd.Response) @@ -185,11 +197,13 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai // Adapter goroutine: The goal here is to convert wathever format etcd is // using into our interface. + watchCh := make(chan []*KVPair) go func() { + defer close(watchCh) + // Push the current value through the channel. - if list, err := s.List(prefix); err != nil { - watchCh <- list - } + watchCh <- current + for { select { case <-etcdWatchCh: diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index 0a0592a44e..fa324036ae 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -114,6 +114,8 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, e // Catch zk notifications and fire changes into the channel. watchCh := make(chan *KVPair) go func() { + defer close(watchCh) + // GetW returns the current value before setting the watch. watchCh <- &KVPair{key, resp, uint64(meta.Mzxid)} for { @@ -148,6 +150,8 @@ func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []* // Catch zk notifications and fire changes into the channel. watchCh := make(chan []*KVPair) go func() { + defer close(watchCh) + // GetW returns the current value before setting the watch. kv := []*KVPair{} for _, item := range entries {