diff --git a/Makefile b/Makefile index e6f1e79ec4..01c0785206 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,7 @@ unit_test: cd go/bytes2; go test cd go/cache; go test cd go/cgzip; go test + cd go/cmd/zkns2pdns; go test cd go/hack; go test # cd go/logfile; go test if [ -e "/usr/bin/memcached" ]; then \ diff --git a/go/cmd/zkns2pdns/pdns.go b/go/cmd/zkns2pdns/pdns.go index a3c007f67f..b66f490852 100644 --- a/go/cmd/zkns2pdns/pdns.go +++ b/go/cmd/zkns2pdns/pdns.go @@ -17,12 +17,12 @@ import ( "flag" "fmt" "io" - "net" "net/http" "os" "path" "strings" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/relog" "code.google.com/p/vitess/go/zk" "code.google.com/p/vitess/go/zk/zkns" @@ -333,19 +333,6 @@ func (pd *pdns) Serve(r io.Reader, w io.Writer) { } } -func fqdn() string { - hostname, err := os.Hostname() - if err != nil { - panic(err) - } - - cname, err := net.LookupCNAME(hostname) - if err != nil { - panic(err) - } - return strings.TrimRight(cname, ".") -} - func main() { zknsDomain := flag.String("zkns-domain", "", "The naming hierarchy portion to serve") zknsRoot := flag.String("zkns-root", "", "The root path from which to resolve") @@ -362,7 +349,8 @@ func main() { } zconn := zk.NewMetaConn(false) - zr1 := newZknsResolver(zconn, fqdn(), *zknsDomain, *zknsRoot) + fqdn := netutil.FullyQualifiedHostnameOrPanic() + zr1 := newZknsResolver(zconn, fqdn, *zknsDomain, *zknsRoot) pd := &pdns{zr1} pd.Serve(os.Stdin, os.Stdout) os.Stdout.Close() diff --git a/go/cmd/zkns2pdns/pdns_test.go b/go/cmd/zkns2pdns/pdns_test.go index 939076d5f9..6af817983b 100644 --- a/go/cmd/zkns2pdns/pdns_test.go +++ b/go/cmd/zkns2pdns/pdns_test.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/zk" "launchpad.net/gozk/zookeeper" ) @@ -41,6 +42,8 @@ const ( ]}` ) +var fqdn = netutil.FullyQualifiedHostnameOrPanic() + var zconn = &TestZkConn{map[string]string{ "/zk/test/zkns/srv": fakeSRV, "/zk/test/zkns/cname": fakeCNAME, @@ -56,10 +59,10 @@ var queries = []string{ } var results = []string{ - "OK\tzkns2pdns\nDATA\t_http.srv.zkns.test.zk\tIN\tSRV\t1\t1\t0\t0 8080 test1\nDATA\t_http.srv.zkns.test.zk\tIN\tSRV\t1\t1\t0\t0 8080 test2\nDATA\t_http.srv.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn() + " hostmaster@" + fqdn() + " 0 1800 600 3600 300\nEND\n", - "OK\tzkns2pdns\nDATA\ta.zkns.test.zk\tIN\tA\t1\t1\t0.0.0.1\nDATA\ta.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn() + " hostmaster@" + fqdn() + " 0 1800 600 3600 300\nDATA\ta.zkns.test.zk\tIN\tCNAME\t1\t1\ttest1\nEND\n", - "OK\tzkns2pdns\nDATA\tcname.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn() + " hostmaster@" + fqdn() + " 0 1800 600 3600 300\nDATA\tcname.zkns.test.zk\tIN\tCNAME\t1\t1\ttest1\nEND\n", - "OK\tzkns2pdns\nDATA\tempty.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn() + " hostmaster@" + fqdn() + " 0 1800 600 3600 300\nEND\n", + "OK\tzkns2pdns\nDATA\t_http.srv.zkns.test.zk\tIN\tSRV\t1\t1\t0\t0 8080 test1\nDATA\t_http.srv.zkns.test.zk\tIN\tSRV\t1\t1\t0\t0 8080 test2\nDATA\t_http.srv.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn + " hostmaster@" + fqdn + " 0 1800 600 3600 300\nEND\n", + "OK\tzkns2pdns\nDATA\ta.zkns.test.zk\tIN\tA\t1\t1\t0.0.0.1\nDATA\ta.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn + " hostmaster@" + fqdn + " 0 1800 600 3600 300\nDATA\ta.zkns.test.zk\tIN\tCNAME\t1\t1\ttest1\nEND\n", + "OK\tzkns2pdns\nDATA\tcname.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn + " hostmaster@" + fqdn + " 0 1800 600 3600 300\nDATA\tcname.zkns.test.zk\tIN\tCNAME\t1\t1\ttest1\nEND\n", + "OK\tzkns2pdns\nDATA\tempty.zkns.test.zk\tIN\tSOA\t1\t1\t" + fqdn + " hostmaster@" + fqdn + " 0 1800 600 3600 300\nEND\n", "OK\tzkns2pdns\nFAIL\n", } @@ -76,7 +79,7 @@ func testQuery(t *testing.T, query, result string) { } defer outpr.Close() - zr1 := newZknsResolver(zconn, fqdn(), ".zkns.test.zk", "/zk/test/zkns") + zr1 := newZknsResolver(zconn, fqdn, ".zkns.test.zk", "/zk/test/zkns") pd := &pdns{zr1} go func() { pd.Serve(inpr, outpw) @@ -166,7 +169,7 @@ func (conn *TestZkConn) Close() error { panic("Should not be used") } -func (conn *TestZkConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc ChangeFunc) error { +func (conn *TestZkConn) RetryChange(path string, flags int, acl []zookeeper.ACL, changeFunc zk.ChangeFunc) error { panic("Should not be used") } diff --git a/go/netutil/netutil.go b/go/netutil/netutil.go new file mode 100644 index 0000000000..f9d918f6bb --- /dev/null +++ b/go/netutil/netutil.go @@ -0,0 +1,140 @@ +// Copyright 2012, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +/* +This packages contains a few utility functions for network related functions. +*/ +package netutil + +import ( + "fmt" + "math/rand" + "net" + "os" + "sort" + "strconv" + "strings" +) + +// byPriorityWeight sorts records by ascending priority and weight. +type byPriorityWeight []*net.SRV + +func (s byPriorityWeight) Len() int { return len(s) } + +func (s byPriorityWeight) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s byPriorityWeight) Less(i, j int) bool { + return s[i].Priority < s[j].Priority || + (s[i].Priority == s[j].Priority && s[i].Weight < s[j].Weight) +} + +// shuffleByWeight shuffles SRV records by weight using the algorithm +// described in RFC 2782. +func (addrs byPriorityWeight) shuffleByWeight() { + sum := 0 + for _, addr := range addrs { + sum += int(addr.Weight) + } + for sum > 0 && len(addrs) > 1 { + s := 0 + n := rand.Intn(sum + 1) + for i := range addrs { + s += int(addrs[i].Weight) + if s >= n { + if i > 0 { + t := addrs[i] + copy(addrs[1:i+1], addrs[0:i]) + addrs[0] = t + } + break + } + } + sum -= int(addrs[0].Weight) + addrs = addrs[1:] + } +} + +func (addrs byPriorityWeight) sortRfc2782() { + sort.Sort(addrs) + i := 0 + for j := 1; j < len(addrs); j++ { + if addrs[i].Priority != addrs[j].Priority { + addrs[i:j].shuffleByWeight() + i = j + } + } + addrs[i:].shuffleByWeight() +} + +// SortRfc2782 reorders SRV records as specified in RFC 2782. +func SortRfc2782(srvs []*net.SRV) { + byPriorityWeight(srvs).sortRfc2782() +} + +// SplitHostPort is an extension to net.SplitHostPort that also parses the +// integer port +func SplitHostPort(addr string) (string, int, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return "", 0, err + } + p, err := strconv.ParseInt(port, 10, 16) + if err != nil { + return "", 0, err + } + return host, int(p), nil +} + +// FullyQualifiedHostname returns the full hostname with domain +func FullyQualifiedHostname() (string, error) { + hostname, err := os.Hostname() + if err != nil { + return "", err + } + + cname, err := net.LookupCNAME(hostname) + if err != nil { + return "", err + } + return strings.TrimRight(cname, "."), nil +} + +// FullyQualifiedHostnameOrPanic is the same as FullyQualifiedHostname +// but panics in case of error +func FullyQualifiedHostnameOrPanic() string { + hostname, err := FullyQualifiedHostname() + if err != nil { + panic(err) + } + return hostname +} + +// ResolveAddr can resolve an address where the host has been left +// blank, like ":3306" +func ResolveAddr(addr string) (string, error) { + host, port, err := SplitHostPort(addr) + if err != nil { + return "", err + } + if host == "" { + host, err = FullyQualifiedHostname() + if err != nil { + return "", err + } + } + return fmt.Sprintf("%v:%v", host, port), nil +} + +// ResolveIpAddr resolves the address:port part into an IP address:port pair +func ResolveIpAddr(addr string) (string, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return "", err + } + ipAddrs, err := net.LookupHost(host) + if err != nil { + return "", err + } + return net.JoinHostPort(ipAddrs[0], port), nil +} diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 7dd6b81829..794e07b964 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -15,7 +15,6 @@ import ( "errors" "fmt" "io/ioutil" - "net" "os" "os/exec" "path" @@ -24,6 +23,7 @@ import ( "time" "code.google.com/p/vitess/go/mysql" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/relog" vtenv "code.google.com/p/vitess/go/vt/env" "code.google.com/p/vitess/go/vt/hook" @@ -378,32 +378,16 @@ func deleteTopDir(dir string) (removalErr error) { } func (mysqld *Mysqld) Addr() string { - // build the hostname - hostname, err := os.Hostname() - if err != nil { - panic(err) // should never happen - } - hostname, err = net.LookupCNAME(hostname) - if err != nil { - panic(err) // should never happen - } - hostname = strings.TrimRight(hostname, ".") - - // and add the port + hostname := netutil.FullyQualifiedHostnameOrPanic() return fmt.Sprintf("%v:%v", hostname, mysqld.config.MysqlPort) } func (mysqld *Mysqld) IpAddr() string { - addr := mysqld.Addr() - host, port, err := net.SplitHostPort(addr) + addr, err := netutil.ResolveIpAddr(mysqld.Addr()) if err != nil { panic(err) // should never happen } - ipAddrs, err := net.LookupHost(host) - if err != nil { - panic(err) // should never happen - } - return net.JoinHostPort(ipAddrs[0], port) + return addr } // executes some SQL commands using a mysql command line interface process diff --git a/go/vt/naming/naming.go b/go/vt/naming/naming.go index 63ada2b1dd..a948c8a61f 100644 --- a/go/vt/naming/naming.go +++ b/go/vt/naming/naming.go @@ -23,8 +23,8 @@ import ( "fmt" "net" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/relog" - "code.google.com/p/vitess/go/zk/zkns" ) type VtnsAddr struct { @@ -102,7 +102,7 @@ func SrvEntries(addrs *VtnsAddrs, namedPort string) (srvs []*net.SRV, err error) } srvs = append(srvs, &net.SRV{Target: host, Port: uint16(port)}) } - zkns.Sort(srvs) + netutil.SortRfc2782(srvs) if srvErr != nil && len(srvs) == 0 { return nil, fmt.Errorf("SrvEntries failed: no valid endpoints found") } diff --git a/go/vt/tabletmanager/agent.go b/go/vt/tabletmanager/agent.go index ba436f6e94..4d1fde6e6a 100644 --- a/go/vt/tabletmanager/agent.go +++ b/go/vt/tabletmanager/agent.go @@ -16,15 +16,14 @@ package tabletmanager import ( "flag" "fmt" - "net" "os" "os/exec" "path" - "strconv" "strings" "sync" "code.google.com/p/vitess/go/jscfg" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/relog" "code.google.com/p/vitess/go/vt/env" "code.google.com/p/vitess/go/vt/naming" @@ -219,73 +218,21 @@ func (agent *ActionAgent) verifyServingAddrs() error { return agent.ts.UpdateTabletEndpoint(agent.Tablet().Tablet.Cell, agent.Tablet().Keyspace, agent.Tablet().Shard, agent.Tablet().Type, addr) } -func splitHostPort(addr string) (string, int, error) { - host, port, err := net.SplitHostPort(addr) - if err != nil { - return "", 0, err - } - p, err := strconv.ParseInt(port, 10, 16) - if err != nil { - return "", 0, err - } - return host, int(p), nil -} - -func fqdn() (string, error) { - hostname, err := os.Hostname() - if err != nil { - return "", err - } - - cname, err := net.LookupCNAME(hostname) - if err != nil { - return "", err - } - return strings.TrimRight(cname, "."), nil -} - -// Resolve an address where the host has been left blank, like ":3306" -func resolveAddr(addr string) (string, error) { - host, port, err := splitHostPort(addr) - if err != nil { - return "", err - } - if host == "" { - host, err = fqdn() - if err != nil { - return "", err - } - } - return fmt.Sprintf("%v:%v", host, port), nil -} - -func resolveIpAddr(addr string) (string, error) { - host, port, err := net.SplitHostPort(addr) - if err != nil { - return "", err - } - ipAddrs, err := net.LookupHost(host) - if err != nil { - return "", err - } - return net.JoinHostPort(ipAddrs[0], port), nil -} - func VtnsAddrForTablet(tablet *Tablet) (*naming.VtnsAddr, error) { - host, port, err := splitHostPort(tablet.Addr) + host, port, err := netutil.SplitHostPort(tablet.Addr) if err != nil { return nil, err } entry := naming.NewAddr(tablet.Uid, host, 0) entry.NamedPortMap["_vtocc"] = port if tablet.SecureAddr != "" { - host, port, err = splitHostPort(tablet.SecureAddr) + host, port, err = netutil.SplitHostPort(tablet.SecureAddr) if err != nil { return nil, err } entry.NamedPortMap["_vts"] = port } - host, port, err = splitHostPort(tablet.MysqlAddr) + host, port, err = netutil.SplitHostPort(tablet.MysqlAddr) if err != nil { return nil, err } @@ -304,21 +251,21 @@ func (agent *ActionAgent) Start(bindAddr, secureAddr, mysqlAddr string) error { return err } - bindAddr, err = resolveAddr(bindAddr) + bindAddr, err = netutil.ResolveAddr(bindAddr) if err != nil { return err } if secureAddr != "" { - secureAddr, err = resolveAddr(secureAddr) + secureAddr, err = netutil.ResolveAddr(secureAddr) if err != nil { return err } } - mysqlAddr, err = resolveAddr(mysqlAddr) + mysqlAddr, err = netutil.ResolveAddr(mysqlAddr) if err != nil { return err } - mysqlIpAddr, err := resolveIpAddr(mysqlAddr) + mysqlIpAddr, err := netutil.ResolveIpAddr(mysqlAddr) if err != nil { return err } diff --git a/go/vt/tabletmanager/tablet.go b/go/vt/tabletmanager/tablet.go index ac2249e11f..02899de14e 100644 --- a/go/vt/tabletmanager/tablet.go +++ b/go/vt/tabletmanager/tablet.go @@ -10,6 +10,7 @@ import ( "path" "code.google.com/p/vitess/go/jscfg" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/relog" "code.google.com/p/vitess/go/vt/key" "code.google.com/p/vitess/go/vt/naming" @@ -86,7 +87,7 @@ func (tablet *Tablet) Json() string { } func (tablet *Tablet) Hostname() string { - host, _, err := splitHostPort(tablet.Addr) + host, _, err := netutil.SplitHostPort(tablet.Addr) if err != nil { panic(err) // should not happen, Addr was checked at creation } @@ -135,11 +136,11 @@ func NewTablet(cell string, uid uint32, parent naming.TabletAlias, vtAddr, mysql } // check the values for vtAddr and mysqlAddr are correct - _, _, err := splitHostPort(vtAddr) + _, _, err := netutil.SplitHostPort(vtAddr) if err != nil { return nil, err } - _, _, err = splitHostPort(mysqlAddr) + _, _, err = netutil.SplitHostPort(mysqlAddr) if err != nil { return nil, err } diff --git a/go/zk/zkctl/zkconf.go b/go/zk/zkctl/zkconf.go index 928f5b9d02..b030397b75 100644 --- a/go/zk/zkctl/zkconf.go +++ b/go/zk/zkctl/zkconf.go @@ -12,13 +12,12 @@ import ( "bytes" "fmt" "io/ioutil" - "net" - "os" "path" "strconv" "strings" "text/template" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/vt/env" ) @@ -162,7 +161,7 @@ func MakeZkConfigFromString(cmdLine string, myId uint32) *ZkConfig { } zkConfig.Servers = append(zkConfig.Servers, zkServer) } - hostname := fqdn() + hostname := netutil.FullyQualifiedHostnameOrPanic() for _, zkServer := range zkConfig.Servers { if (myId > 0 && myId == zkServer.ServerId) || (myId == 0 && zkServer.Hostname == hostname) { zkConfig.ServerId = zkServer.ServerId @@ -175,16 +174,3 @@ func MakeZkConfigFromString(cmdLine string, myId uint32) *ZkConfig { } return zkConfig } - -func fqdn() string { - hostname, err := os.Hostname() - if err != nil { - panic(err) - } - - cname, err := net.LookupCNAME(hostname) - if err != nil { - panic(err) - } - return strings.TrimRight(cname, ".") -} diff --git a/go/zk/zkns/zkns.go b/go/zk/zkns/zkns.go index c22654559f..1ca7a80514 100644 --- a/go/zk/zkns/zkns.go +++ b/go/zk/zkns/zkns.go @@ -7,11 +7,10 @@ package zkns import ( "encoding/json" "fmt" - "math/rand" "net" - "sort" "strings" + "code.google.com/p/vitess/go/netutil" "code.google.com/p/vitess/go/zk" ) @@ -71,62 +70,6 @@ func ReadAddrs(zconn zk.Conn, zkPath string) (*ZknsAddrs, error) { return addrs, nil } -// byPriorityWeight sorts records by ascending priority and weight. -type byPriorityWeight []*net.SRV - -func (s byPriorityWeight) Len() int { return len(s) } - -func (s byPriorityWeight) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s byPriorityWeight) Less(i, j int) bool { - return s[i].Priority < s[j].Priority || - (s[i].Priority == s[j].Priority && s[i].Weight < s[j].Weight) -} - -// shuffleByWeight shuffles SRV records by weight using the algorithm -// described in RFC 2782. -func (addrs byPriorityWeight) shuffleByWeight() { - sum := 0 - for _, addr := range addrs { - sum += int(addr.Weight) - } - for sum > 0 && len(addrs) > 1 { - s := 0 - n := rand.Intn(sum + 1) - for i := range addrs { - s += int(addrs[i].Weight) - if s >= n { - if i > 0 { - t := addrs[i] - copy(addrs[1:i+1], addrs[0:i]) - addrs[0] = t - } - break - } - } - sum -= int(addrs[0].Weight) - addrs = addrs[1:] - } -} - -// sort reorders SRV records as specified in RFC 2782. -func (addrs byPriorityWeight) sort() { - sort.Sort(addrs) - i := 0 - for j := 1; j < len(addrs); j++ { - if addrs[i].Priority != addrs[j].Priority { - addrs[i:j].shuffleByWeight() - i = j - } - } - addrs[i:].shuffleByWeight() -} - -// sort reorders SRV records as specified in RFC 2782. -func Sort(srvs []*net.SRV) { - byPriorityWeight(srvs).sort() -} - // zkPath is the path to a json file in zk. It can also reference a // named port: /zk/cell/zkns/path:_named_port func LookupName(zconn zk.Conn, zkPath string) ([]*net.SRV, error) { @@ -153,6 +96,6 @@ func LookupName(zconn zk.Conn, zkPath string) ([]*net.SRV, error) { } srvs = append(srvs, srv) } - Sort(srvs) + netutil.SortRfc2782(srvs) return srvs, nil } diff --git a/py/vtdb/update_stream_service.py b/py/vtdb/update_stream_service.py index 6f28ed1f47..0816064a3c 100755 --- a/py/vtdb/update_stream_service.py +++ b/py/vtdb/update_stream_service.py @@ -60,6 +60,7 @@ class UpdateStreamResponse(object): self.format() def format(self): + print "AAAAAAAAAAAAAAAAAAAAAAAA", self.raw_response if self.raw_response['Error'] == "": self.Error = None else: @@ -87,8 +88,8 @@ class UpdateStreamConnection(object): except gorpc.GoRpcError as e: raise dbexceptions.OperationalError(*e.args) - except: - logging.exception('gorpc low-level error') + except Exception as e: + logging.exception('gorpc low-level error: %s', str(e)) raise return update_stream_response.BinlogPosition, update_stream_response.EventData, update_stream_response.Error