revdial/v2: add new simpler, non-multiplexing revdial implementation

The old revdial has a simple multiplexing protocol that was like
HTTP/2 but without flow control, etc. But it was too simple (no flow
control) and too complex. Instead, just use one TCP connection per
reverse dialed connection. For now, the NAT'ed machine needs to go
re-connect for each incoming connection, but in practice that's just
once.

The old implementation is retained for now until all the buildlets are
updated.

Updates golang/go#31639

Change-Id: Id94c98d2949e695b677531b1221a827573543085
Reviewed-on: https://go-review.googlesource.com/c/build/+/174082
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
This commit is contained in:
Brad Fitzpatrick 2019-04-25 22:45:09 +00:00
Родитель 3ae2235d28
Коммит 4f0f4bb614
7 изменённых файлов: 552 добавлений и 103 удалений

Просмотреть файл

@ -8,6 +8,7 @@ package buildlet // import "golang.org/x/build/buildlet"
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
@ -22,7 +23,6 @@ import (
"sync"
"time"
"context"
"golang.org/x/oauth2"
)
@ -117,8 +117,14 @@ func (c *Client) SetHTTPClient(httpClient *http.Client) {
}
// SetDialer sets the function that creates a new connection to the buildlet.
// By default, net.Dial is used.
func (c *Client) SetDialer(dialer func() (net.Conn, error)) {
// By default, net.Dialer.DialContext is used.
//
// TODO(bradfitz): this is only used for ssh connections to buildlets,
// which previously required the client to do its own net.Dial +
// upgrade request. But now that the net/http client supports
// read/write bodies for protocol upgrades, we could change how ssh
// works and delete this.
func (c *Client) SetDialer(dialer func(context.Context) (net.Conn, error)) {
c.dialer = dialer
}
@ -138,11 +144,11 @@ type Client struct {
ipPort string // required, unless remoteBuildlet+baseURL is set
tls KeyPair
httpClient *http.Client
dialer func() (net.Conn, error) // nil means to use net.Dial
baseURL string // optional baseURL (used by remote buildlets)
authUser string // defaults to "gomote", if password is non-empty
password string // basic auth password or empty for none
remoteBuildlet string // non-empty if for remote buildlets
dialer func(context.Context) (net.Conn, error) // nil means to use net.Dialer.DialContext
baseURL string // optional baseURL (used by remote buildlets)
authUser string // defaults to "gomote", if password is non-empty
password string // basic auth password or empty for none
remoteBuildlet string // non-empty if for remote buildlets
closeFuncs []func() // optional extra code to run on close
releaseMode bool
@ -752,29 +758,30 @@ func (c *Client) ListDir(dir string, opts ListDirOpts, fn func(DirEntry)) error
return sc.Err()
}
func (c *Client) getDialer() func() (net.Conn, error) {
func (c *Client) getDialer() func(context.Context) (net.Conn, error) {
if c.dialer != nil {
return c.dialer
}
return c.dialWithNetDial
}
func (c *Client) dialWithNetDial() (net.Conn, error) {
// TODO: contexts? the tedious part will be adding it to
// revdial.Dial. For now just do a 5 second timeout. Probably
// fine. This is currently only used for ssh connections.
d := net.Dialer{Timeout: 5 * time.Second}
return d.Dial("tcp", c.ipPort)
func (c *Client) dialWithNetDial(ctx context.Context) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "tcp", c.ipPort)
}
// ConnectSSH opens an SSH connection to the buildlet for the given username.
// The authorizedPubKey must be a line from an ~/.ssh/authorized_keys file
// and correspond to the private key to be used to communicate over the net.Conn.
func (c *Client) ConnectSSH(user, authorizedPubKey string) (net.Conn, error) {
conn, err := c.getDialer()()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := c.getDialer()(ctx)
if err != nil {
return nil, fmt.Errorf("error dialing HTTP connection before SSH upgrade: %v", err)
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
req, err := http.NewRequest("POST", "/connect-ssh", nil)
if err != nil {
conn.Close()
@ -794,8 +801,10 @@ func (c *Client) ConnectSSH(user, authorizedPubKey string) (net.Conn, error) {
}
if res.StatusCode != http.StatusSwitchingProtocols {
slurp, _ := ioutil.ReadAll(res.Body)
conn.Close()
return nil, fmt.Errorf("unexpected /connect-ssh response: %v, %s", res.Status, slurp)
}
conn.SetDeadline(time.Time{})
return conn, nil
}

Просмотреть файл

@ -11,6 +11,8 @@ FORCE:
compile:
GOOS=aix GOARCH=ppc64 go install golang.org/x/build/cmd/buildlet
GOOS=darwin GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=dragonfly GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=freebsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=freebsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=linux GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=linux GOARCH=arm go install golang.org/x/build/cmd/buildlet
@ -23,6 +25,7 @@ compile:
GOOS=netbsd GOARCH=386 go install golang.org/x/build/cmd/buildlet
GOOS=netbsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=netbsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=arm go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=386 go install golang.org/x/build/cmd/buildlet
GOOS=openbsd GOARCH=amd64 go install golang.org/x/build/cmd/buildlet
GOOS=plan9 GOARCH=386 go install golang.org/x/build/cmd/buildlet
@ -42,6 +45,14 @@ buildlet.darwin-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.dragonfly-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.freebsd-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.freebsd-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
@ -66,6 +77,10 @@ buildlet.netbsd-386: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.openbsd-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.openbsd-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
@ -78,6 +93,14 @@ buildlet.plan9-386: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.plan9-arm: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.plan9-amd64: FORCE
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@
buildlet.windows-arm: FORCE buildlet_windows.go
go install golang.org/x/build/cmd/upload
upload --verbose --osarch=$@ --file=go:golang.org/x/build/cmd/buildlet --public --cacheable=false go-builder-data/$@

Просмотреть файл

@ -75,7 +75,8 @@ var (
// 18: set TMPDIR and GOCACHE
// 21: GO_BUILDER_SET_GOPROXY=coordinator support
// 22: TrimSpace the reverse buildlet's gobuildkey contents
const buildletVersion = 22
// 23: revdial v2
const buildletVersion = 23
func defaultListenAddr() string {
if runtime.GOOS == "darwin" {

Просмотреть файл

@ -6,11 +6,10 @@ package main
import (
"bufio"
"context"
"crypto/hmac"
"crypto/md5"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
@ -26,8 +25,7 @@ import (
"strings"
"time"
"golang.org/x/build"
"golang.org/x/build/revdial"
"golang.org/x/build/revdial/v2"
)
// mode is either a BuildConfig or HostConfig name (map key in x/build/dashboard/builders.go)
@ -127,56 +125,41 @@ func dialCoordinator() error {
goproxyHandler = newProxyToCoordinatorHandler(*reverseType, key)
}
caCert := build.ProdCoordinatorCA
addr := *coordinator
if addr == "farmer.golang.org" {
addr = "farmer.golang.org:443"
}
if devMode {
caCert = build.DevCoordinatorCA
}
var caPool *x509.CertPool
if runtime.GOOS == "windows" {
// No SystemCertPool on Windows. But we don't run
// Windows in reverse mode anyway. So just don't set
// caPool, which will cause tls.Config to use the
// system verification.
} else {
var err error
caPool, err = x509.SystemCertPool()
dial := func(ctx context.Context) (net.Conn, error) {
log.Printf("Dialing coordinator %s ...", addr)
t0 := time.Now()
tcpConn, err := dialCoordinatorTCP(ctx, addr)
if err != nil {
return fmt.Errorf("SystemCertPool: %v", err)
log.Printf("buildlet: reverse dial coordinator (%q) error after %v: %v", addr, time.Since(t0).Round(time.Second/100), err)
return nil, err
}
// Temporarily accept our own CA. This predates LetsEncrypt.
// Our old self-signed cert expires April 4th, 2017.
// We can remove this after golang.org/issue/16442 is fixed.
if !caPool.AppendCertsFromPEM([]byte(caCert)) {
return errors.New("failed to append coordinator CA certificate")
log.Printf("Dialed coordinator %s.", addr)
serverName := strings.TrimSuffix(addr, ":443")
log.Printf("Doing TLS handshake with coordinator (verifying hostname %q)...", serverName)
tcpConn.SetDeadline(time.Now().Add(30 * time.Second))
config := &tls.Config{
ServerName: serverName,
InsecureSkipVerify: devMode,
}
conn := tls.Client(tcpConn, config)
if err := conn.Handshake(); err != nil {
return nil, fmt.Errorf("failed to handshake with coordinator: %v", err)
}
tcpConn.SetDeadline(time.Time{})
return conn, nil
}
log.Printf("Dialing coordinator %s ...", addr)
tcpConn, err := dialCoordinatorTCP(addr)
conn, err := dial(context.Background())
if err != nil {
return err
}
serverName := strings.TrimSuffix(addr, ":443")
log.Printf("Doing TLS handshake with coordinator (verifying hostname %q)...", serverName)
tcpConn.SetDeadline(time.Now().Add(30 * time.Second))
config := &tls.Config{
ServerName: serverName,
RootCAs: caPool,
InsecureSkipVerify: devMode,
}
conn := tls.Client(tcpConn, config)
if err := conn.Handshake(); err != nil {
return fmt.Errorf("failed to handshake with coordinator: %v", err)
}
tcpConn.SetDeadline(time.Time{})
bufr := bufio.NewReader(conn)
bufw := bufio.NewWriter(conn)
log.Printf("Registering reverse mode with coordinator...")
req, err := http.NewRequest("GET", "/reverse", nil)
@ -192,9 +175,13 @@ func dialCoordinator() error {
req.Header["X-Go-Builder-Key"] = keys
req.Header.Set("X-Go-Builder-Hostname", *hostname)
req.Header.Set("X-Go-Builder-Version", strconv.Itoa(buildletVersion))
if err := req.Write(conn); err != nil {
req.Header.Set("X-Revdial-Version", "2")
if err := req.Write(bufw); err != nil {
return fmt.Errorf("coordinator /reverse request failed: %v", err)
}
if err := bufw.Flush(); err != nil {
return fmt.Errorf("coordinator /reverse request flush failed: %v", err)
}
resp, err := http.ReadResponse(bufr, req)
if err != nil {
return fmt.Errorf("coordinator /reverse response failed: %v", err)
@ -206,12 +193,9 @@ func dialCoordinator() error {
log.Printf("Connected to coordinator; reverse dialing active")
srv := &http.Server{}
ln := revdial.NewListener(bufio.NewReadWriter(
bufio.NewReader(conn),
bufio.NewWriter(deadlinePerWriteConn{conn, 60 * time.Second}),
))
ln := revdial.NewListener(conn, dial)
err = srv.Serve(ln)
if ln.Closed() {
if ln.Closed() { // TODO: this actually wants to know whether an error-free Close was called
return nil
}
return fmt.Errorf("http.Serve on reverse connection complete: %v", err)
@ -224,8 +208,8 @@ var coordDialer = &net.Dialer{
// dialCoordinatorTCP returns a TCP connection to the coordinator, making
// a CONNECT request to a proxy as a fallback.
func dialCoordinatorTCP(addr string) (net.Conn, error) {
tcpConn, err := coordDialer.Dial("tcp", addr)
func dialCoordinatorTCP(ctx context.Context, addr string) (net.Conn, error) {
tcpConn, err := coordDialer.DialContext(ctx, "tcp", addr)
if err != nil {
// If we had problems connecting to the TCP addr
// directly, perhaps there's a proxy in the way. See
@ -234,20 +218,21 @@ func dialCoordinatorTCP(addr string) (net.Conn, error) {
req, _ := http.NewRequest("GET", "https://"+addr, nil)
proxyURL, _ := http.ProxyFromEnvironment(req)
if proxyURL != nil {
return dialCoordinatorViaCONNECT(addr, proxyURL)
return dialCoordinatorViaCONNECT(ctx, addr, proxyURL)
}
return nil, err
}
return tcpConn, nil
}
func dialCoordinatorViaCONNECT(addr string, proxy *url.URL) (net.Conn, error) {
func dialCoordinatorViaCONNECT(ctx context.Context, addr string, proxy *url.URL) (net.Conn, error) {
proxyAddr := proxy.Host
if proxy.Port() == "" {
proxyAddr = net.JoinHostPort(proxyAddr, "80")
}
log.Printf("dialing proxy %q ...", proxyAddr)
c, err := net.Dial("tcp", proxyAddr)
var d net.Dialer
c, err := d.DialContext(ctx, "tcp", proxyAddr)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddr, err)
}
@ -273,17 +258,6 @@ func dialCoordinatorViaCONNECT(addr string, proxy *url.URL) (net.Conn, error) {
return c, nil
}
type deadlinePerWriteConn struct {
net.Conn
writeTimeout time.Duration
}
func (c deadlinePerWriteConn) Write(p []byte) (n int, err error) {
c.Conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
defer c.Conn.SetWriteDeadline(time.Time{})
return c.Conn.Write(p)
}
func devBuilderKey(builder string) string {
h := hmac.New(md5.New, []byte("gophers rule"))
io.WriteString(h, builder)

Просмотреть файл

@ -60,6 +60,7 @@ import (
"golang.org/x/build/internal/sourcecache"
"golang.org/x/build/livelog"
"golang.org/x/build/maintner/maintnerd/apipb"
revdialv2 "golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
"golang.org/x/crypto/acme/autocert"
perfstorage "golang.org/x/perf/storage"
@ -303,6 +304,7 @@ func main() {
http.HandleFunc("/builders", handleBuilders)
http.HandleFunc("/temporarylogs", handleLogs)
http.HandleFunc("/reverse", handleReverse)
http.Handle("/revdial", revdialv2.ConnHandler())
http.HandleFunc("/style.css", handleStyleCSS)
http.HandleFunc("/try", serveTryStatus(false))
http.HandleFunc("/try.json", serveTryStatus(true))

Просмотреть файл

@ -41,13 +41,13 @@ import (
"net"
"net/http"
"sort"
"strings"
"sync"
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/revdial"
revdialv2 "golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
)
@ -483,6 +483,19 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
hostType := r.Header.Get("X-Go-Host-Type")
modes := r.Header["X-Go-Builder-Type"] // old way
gobuildkeys := r.Header["X-Go-Builder-Key"]
buildletVersion := r.Header.Get("X-Go-Builder-Version")
revDialVersion := r.Header.Get("X-Revdial-Version")
switch revDialVersion {
case "":
// Old.
revDialVersion = "1"
case "2":
// Current.
default:
http.Error(w, "unknown revdial version", http.StatusBadRequest)
return
}
// Convert the new argument style (X-Go-Host-Type) into the
// old way, to minimize changes in the rest of this code.
@ -506,20 +519,6 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
}
}
// Silently pretend that "gomacmini-*.local" doesn't want to do darwin-amd64-10_10 and
// darwin-386-10_10 anymore.
// TODO(bradfitz): remove this hack after we reconfigure those machines.
if strings.HasPrefix(hostname, "gomacmini-") && strings.HasSuffix(hostname, ".local") {
var filtered []string
for _, m := range modes {
if m == "darwin-amd64-10_10" || m == "darwin-386-10_10" {
continue
}
filtered = append(filtered, m)
}
modes = filtered
}
// For older builders using the buildlet's -reverse flag only,
// collapse their builder modes down into a singular hostType.
legacyNote := ""
@ -534,21 +533,40 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
return
}
revDialer := revdial.NewDialer(bufrw, conn)
log.Printf("Registering reverse buildlet %q (%s) for host type %v%s",
hostname, r.RemoteAddr, hostType, legacyNote)
if err := (&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn); err != nil {
log.Printf("error writing upgrade response to reverse buildlet %s (%s) at %s: %v", hostname, hostType, r.RemoteAddr, err)
conn.Close()
return
}
(&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn)
log.Printf("Registering reverse buildlet %q (%s) for host type %v %s; buildletVersion=%v; revDialVersion=%v",
hostname, r.RemoteAddr, hostType, legacyNote, buildletVersion, revDialVersion)
var dialer func(context.Context) (net.Conn, error)
var revDialerDone <-chan struct{}
switch revDialVersion {
case "1":
revDialer := revdial.NewDialer(bufrw, conn)
revDialerDone = revDialer.Done()
dialer = func(ctx context.Context) (net.Conn, error) {
// ignoring context.
return revDialer.Dial()
}
case "2":
revDialer := revdialv2.NewDialer(conn, "/revdial")
revDialerDone = revDialer.Done()
dialer = revDialer.Dial
}
client := buildlet.NewClient(hostname, buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
return revDialer.Dial()
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer(ctx)
},
},
})
client.SetDialer(revDialer.Dial)
client.SetDialer(dialer)
client.SetDescription(fmt.Sprintf("reverse peer %s/%s for host type %v", hostname, r.RemoteAddr, hostType))
var isDead struct {
@ -567,7 +585,7 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
// conn) detects that the remote went away, close the buildlet
// client proactively show
go func() {
<-revDialer.Done()
<-revDialerDone
isDead.Lock()
defer isDead.Unlock()
if !isDead.v {
@ -592,7 +610,7 @@ func handleReverse(w http.ResponseWriter, r *http.Request) {
now := time.Now()
b := &reverseBuildlet{
hostname: hostname,
version: r.Header.Get("X-Go-Builder-Version"),
version: buildletVersion,
hostType: hostType,
client: client,
conn: conn,

422
revdial/v2/revdial.go Normal file
Просмотреть файл

@ -0,0 +1,422 @@
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package revdial implements a Dialer and Listener which work together
// to turn an accepted connection (for instance, a Hijacked HTTP request) into
// a Dialer which can then create net.Conns connecting back to the original
// dialer, which then gets a net.Listener accepting those conns.
//
// This is basically a very minimal SOCKS5 client & server.
//
// The motivation is that sometimes you want to run a server on a
// machine deep inside a NAT. Rather than connecting to the machine
// directly (which you can't, because of the NAT), you have the
// sequestered machine connect out to a public machine. Both sides
// then use revdial and the public machine can become a client for the
// NATed machine.
package revdial
import (
"bufio"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"strings"
"sync"
"time"
)
// dialerUniqParam is the parameter name of the GET URL form value
// containing the Dialer's random unique ID.
const dialerUniqParam = "revdial.dialer"
// The Dialer can create new connections.
type Dialer struct {
conn net.Conn // hijacked client conn
path string // e.g. "/revdial"
uniqID string
pickupPath string // path + uniqID: "/revdial?revdial.dialer="+uniqID
incomingConn chan net.Conn
pickupFailed chan error
connReady chan bool
donec chan struct{}
closeOnce sync.Once
}
var (
dmapMu sync.Mutex
dialers = map[string]*Dialer{}
)
// NewDialer returns the side of the connection which will initiate
// new connections. This will typically be the side which did the HTTP
// Hijack. The connection is (typically) the hijacked HTTP client
// connection. The connPath is the HTTP path and optional query (but
// without scheme or host) on the dialer where the ConnHandler is
// mounted.
func NewDialer(c net.Conn, connPath string) *Dialer {
d := &Dialer{
path: connPath,
uniqID: newUniqID(),
conn: c,
donec: make(chan struct{}),
connReady: make(chan bool),
incomingConn: make(chan net.Conn),
pickupFailed: make(chan error),
}
join := "?"
if strings.Contains(connPath, "?") {
join = "&"
}
d.pickupPath = connPath + join + dialerUniqParam + "=" + d.uniqID
d.register()
go d.serve()
return d
}
func newUniqID() string {
buf := make([]byte, 16)
rand.Read(buf)
return fmt.Sprintf("%x", buf)
}
func (d *Dialer) register() {
dmapMu.Lock()
defer dmapMu.Unlock()
dialers[d.uniqID] = d
}
func (d *Dialer) unregister() {
dmapMu.Lock()
defer dmapMu.Unlock()
delete(dialers, d.uniqID)
}
// Done returns a channel which is closed when d is closed (either by
// this process on purpose, by a local error, or close or error from
// the peer).
func (d *Dialer) Done() <-chan struct{} { return d.donec }
// Close closes the Dialer.
func (d *Dialer) Close() error {
d.closeOnce.Do(d.close)
return nil
}
func (d *Dialer) close() {
d.unregister()
d.conn.Close()
close(d.donec)
}
// Dial creates a new connection back to the Listener.
func (d *Dialer) Dial(ctx context.Context) (net.Conn, error) {
// First, tell serve that we want a connection:
select {
case d.connReady <- true:
case <-d.donec:
return nil, errors.New("revdial.Dialer closed")
case <-ctx.Done():
return nil, ctx.Err()
}
// Then pick it up:
select {
case c := <-d.incomingConn:
return c, nil
case err := <-d.pickupFailed:
return nil, err
case <-d.donec:
return nil, errors.New("revdial.Dialer closed")
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (d *Dialer) matchConn(c net.Conn) {
select {
case d.incomingConn <- c:
case <-d.donec:
}
}
// serve blocks and runs the control message loop, keeping the peer
// alive and notifying the peer when new connections are available.
func (d *Dialer) serve() error {
defer d.Close()
go func() {
defer d.Close()
br := bufio.NewReader(d.conn)
for {
line, err := br.ReadSlice('\n')
if err != nil {
return
}
var msg controlMsg
if err := json.Unmarshal(line, &msg); err != nil {
log.Printf("revdial.Dialer read invalid JSON: %q: %v", line, err)
return
}
switch msg.Command {
case "pickup-failed":
err := fmt.Errorf("revdial listener failed to pick up connection: %v", msg.Err)
select {
case d.pickupFailed <- err:
case <-d.donec:
return
}
}
}
}()
for {
if err := d.sendMessage(controlMsg{Command: "keep-alive"}); err != nil {
return err
}
t := time.NewTimer(30 * time.Second)
select {
case <-t.C:
continue
case <-d.connReady:
t.Stop()
if err := d.sendMessage(controlMsg{
Command: "conn-ready",
ConnPath: d.pickupPath,
}); err != nil {
return err
}
case <-d.donec:
t.Stop()
return errors.New("revdial.Dialer closed")
}
}
}
func (d *Dialer) sendMessage(m controlMsg) error {
j, _ := json.Marshal(m)
d.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
j = append(j, '\n')
_, err := d.conn.Write(j)
d.conn.SetWriteDeadline(time.Time{})
return err
}
// NewListener returns a new Listener, accepting connections which
// arrive from the provided server connection, which should be after
// any necessary authentication (usually after an HTTP exchange).
//
// The provided dialServer func is responsible for connecting back to
// the server and doing TLS setup.
func NewListener(serverConn net.Conn, dialServer func(context.Context) (net.Conn, error)) *Listener {
ln := &Listener{
sc: serverConn,
dial: dialServer,
connc: make(chan net.Conn, 8), // arbitrary
donec: make(chan struct{}),
}
go ln.run()
return ln
}
var _ net.Listener = (*Listener)(nil)
// Listener is a net.Listener, returning new connections which arrive
// from a corresponding Dialer.
type Listener struct {
sc net.Conn
connc chan net.Conn
donec chan struct{}
dial func(context.Context) (net.Conn, error)
writec chan<- []byte
mu sync.Mutex // guards below, closing connc, and writing to rw
readErr error
closed bool
}
type controlMsg struct {
Command string `json:"command,omitempty"` // "keep-alive", "conn-ready", "pickup-failed"
ConnPath string `json:"connPath,omitempty"` // conn pick-up URL path for "conn-url", "pickup-failed"
Err string `json:"err,omitempty"`
}
// run reads control messages from the public server forever until the connection dies, which
// then closes the listener.
func (ln *Listener) run() {
defer ln.Close()
// Write loop
writec := make(chan []byte, 8)
ln.writec = writec
go func() {
for {
select {
case <-ln.donec:
return
case msg := <-writec:
if _, err := ln.sc.Write(msg); err != nil {
log.Printf("revdial.Listener: error writing message to server: %v", err)
ln.Close()
return
}
}
}
}()
// Read loop
br := bufio.NewReader(ln.sc)
for {
line, err := br.ReadSlice('\n')
if err != nil {
return
}
var msg controlMsg
if err := json.Unmarshal(line, &msg); err != nil {
log.Printf("revdial.Listener read invalid JSON: %q: %v", line, err)
return
}
switch msg.Command {
case "keep-alive":
// Occasional no-op message from server to keep
// us alive through NAT timeouts.
case "conn-ready":
go ln.grabConn(msg.ConnPath)
default:
// Ignore unknown messages
}
}
}
func (ln *Listener) sendMessage(m controlMsg) {
j, _ := json.Marshal(m)
j = append(j, '\n')
ln.writec <- j
}
func (ln *Listener) grabConn(path string) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
c, err := ln.dial(ctx)
if err != nil {
ln.sendMessage(controlMsg{Command: "pickup-failed", ConnPath: path, Err: err.Error()})
return
}
failPickup := func(err error) {
c.Close()
log.Printf("revdial.Listener: failed to pick up connection to %s: %v", path, err)
ln.sendMessage(controlMsg{Command: "pickup-failed", ConnPath: path, Err: err.Error()})
}
req, _ := http.NewRequest("GET", path, nil)
if err := req.Write(c); err != nil {
failPickup(err)
return
}
bufr := bufio.NewReader(c)
resp, err := http.ReadResponse(bufr, req)
if err != nil {
failPickup(err)
return
}
if resp.StatusCode != 101 {
failPickup(fmt.Errorf("non-101 response %v", resp.Status))
return
}
select {
case ln.connc <- c:
case <-ln.donec:
}
}
// Closed reports whether the listener has been closed.
func (ln *Listener) Closed() bool {
ln.mu.Lock()
defer ln.mu.Unlock()
return ln.closed
}
// Accept blocks and returns a new connection, or an error.
func (ln *Listener) Accept() (net.Conn, error) {
c, ok := <-ln.connc
if !ok {
ln.mu.Lock()
err, closed := ln.readErr, ln.closed
ln.mu.Unlock()
if err != nil && !closed {
return nil, fmt.Errorf("revdial: Listener closed; %v", err)
}
return nil, ErrListenerClosed
}
return c, nil
}
// ErrListenerClosed is returned by Accept after Close has been called.
var ErrListenerClosed = errors.New("revdial: Listener closed")
// Close closes the Listener, making future Accept calls return an
// error.
func (ln *Listener) Close() error {
ln.mu.Lock()
defer ln.mu.Unlock()
if ln.closed {
return nil
}
go ln.sc.Close()
ln.closed = true
close(ln.connc)
close(ln.donec)
return nil
}
// Addr returns a dummy address. This exists only to conform to the
// net.Listener interface.
func (ln *Listener) Addr() net.Addr { return fakeAddr{} }
type fakeAddr struct{}
func (fakeAddr) Network() string { return "revdial" }
func (fakeAddr) String() string { return "revdialconn" }
// ConnHandler returns the HTTP handler that needs to be mounted somewhere
// that the Listeners can dial out and get to. A dialer to connect to it
// is given to NewListener and the path to reach it is given to NewDialer
// to use in messages to the listener.
func ConnHandler() http.Handler {
return http.HandlerFunc(connHandler)
}
func connHandler(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil {
http.Error(w, "handler requires TLS", http.StatusInternalServerError)
return
}
if r.Method != "GET" {
w.Header().Set("Allow", "GET")
http.Error(w, "expected GET request to revdial conn handler", http.StatusMethodNotAllowed)
return
}
dialerUniq := r.FormValue(dialerUniqParam)
dmapMu.Lock()
d, ok := dialers[dialerUniq]
dmapMu.Unlock()
if !ok {
http.Error(w, "unknown dialer", http.StatusBadRequest)
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
(&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn)
d.matchConn(conn)
}