This commit is contained in:
Hongchao Deng 2016-10-06 15:12:29 -07:00
Родитель e08dc8e685
Коммит 0219a41f26
1 изменённых файлов: 43 добавлений и 25 удалений

Просмотреть файл

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
@ -30,9 +29,14 @@ var (
}
)
type rawEvent struct {
Type string
Object json.RawMessage
}
type Event struct {
Type string
Object spec.EtcdCluster
Object *spec.EtcdCluster
}
type Controller struct {
@ -181,51 +185,65 @@ func (c *Controller) createTPR() error {
}
func monitorEtcdCluster(host, ns string, httpClient *http.Client, watchVersion string) (<-chan *Event, <-chan error) {
events := make(chan *Event)
eventCh := make(chan *Event)
// On unexpected error case, controller should exit
errc := make(chan error, 1)
errCh := make(chan error, 1)
go func() {
for {
resp, err := k8sutil.WatchETCDCluster(host, ns, httpClient, watchVersion)
if err != nil {
errc <- err
errCh <- err
return
}
if resp.StatusCode != 200 {
resp.Body.Close()
errc <- errors.New("Invalid status code: " + resp.Status)
errCh <- errors.New("Invalid status code: " + resp.Status)
return
}
log.Printf("watching at %v", watchVersion)
decoder := json.NewDecoder(resp.Body)
for {
decoder := json.NewDecoder(resp.Body)
ev := new(Event)
err = decoder.Decode(ev)
ev, err := pollEvent(decoder)
if err != nil {
if err == io.EOF {
if err == io.EOF { // apiserver will close stream periodically
break
}
b, readErr := ioutil.ReadAll(decoder.Buffered())
if readErr != nil {
log.Errorf("fail to read from json decoder buffer: %v", readErr)
}
log.Errorf("failed to decode event (%s) from apiserver: %v", b, err)
errc <- err
errCh <- err
return
}
if ev.Type == "ERROR" {
// TODO: We couldn't decode error status from watch stream on apiserver.
// Working around by restart and go through recover path.
// We strive to fix it in k8s upstream.
log.Fatal("unkown watch error from apiserver")
}
log.Printf("etcd cluster event: %v %#v", ev.Type, ev.Object)
log.Printf("etcd cluster event: %v %v", ev.Type, ev.Object.Spec)
watchVersion = ev.Object.ObjectMeta.ResourceVersion
events <- ev
eventCh <- ev
}
resp.Body.Close()
}
}()
return events, errc
return eventCh, errCh
}
func pollEvent(decoder *json.Decoder) (*Event, error) {
re := &rawEvent{}
err := decoder.Decode(re)
if err != nil {
if err == io.EOF {
return nil, err
}
log.Errorf("fail to decode raw event from apiserver: %v", err)
return nil, err
}
if re.Type == "ERROR" {
log.Fatalf("watch error from apiserver. (TODO: separate transient and fatal error)\n"+
"Error message: %s", re.Object)
}
ev := &Event{
Type: re.Type,
Object: &spec.EtcdCluster{},
}
err = json.Unmarshal(re.Object, ev.Object)
if err != nil {
log.Errorf("fail to unmarshal EtcdCluster object from data (%s): %v", re.Object, err)
return nil, err
}
return ev, nil
}