зеркало из https://github.com/Azure/ARO-RP.git
centralizes route definition into portal (#2740)
This commit is contained in:
Родитель
6f29107380
Коммит
2eafd4e17c
|
@ -76,7 +76,7 @@ func TestClusterList(t *testing.T) {
|
|||
}
|
||||
|
||||
aadAuthenticatedRouter := mux.NewRouter()
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter)
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter, nil, nil, nil)
|
||||
w := httptest.NewRecorder()
|
||||
aadAuthenticatedRouter.ServeHTTP(w, req)
|
||||
|
||||
|
@ -197,7 +197,7 @@ func TestClusterDetail(t *testing.T) {
|
|||
}
|
||||
|
||||
aadAuthenticatedRouter := mux.NewRouter()
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter)
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter, nil, nil, nil)
|
||||
w := httptest.NewRecorder()
|
||||
aadAuthenticatedRouter.ServeHTTP(w, req)
|
||||
|
||||
|
|
|
@ -50,13 +50,13 @@ func (p *testPortal) DumpLogs(t *testing.T) {
|
|||
}
|
||||
|
||||
func (p *testPortal) Run(ctx context.Context) error {
|
||||
err := p.p.setupRouter()
|
||||
router, err := p.p.setupRouter(nil, nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s := &http.Server{
|
||||
Handler: frontendmiddleware.Lowercase(p.p.baseRouter),
|
||||
Handler: frontendmiddleware.Lowercase(router),
|
||||
ReadTimeout: 10 * time.Second,
|
||||
IdleTimeout: 2 * time.Minute,
|
||||
ErrorLog: log.New(p.p.log.Writer(), "", 0),
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
clientcmdv1 "k8s.io/client-go/tools/clientcmd/api/v1"
|
||||
|
||||
|
@ -32,18 +31,22 @@ const (
|
|||
kubeconfigNewTimeout = 6 * time.Hour
|
||||
)
|
||||
|
||||
type kubeconfig struct {
|
||||
log *logrus.Entry
|
||||
baseAccessLog *logrus.Entry
|
||||
type Kubeconfig struct {
|
||||
Log *logrus.Entry
|
||||
BaseAccessLog *logrus.Entry
|
||||
Audit *logrus.Entry
|
||||
|
||||
servingCert *x509.Certificate
|
||||
elevatedGroupIDs []string
|
||||
|
||||
dbOpenShiftClusters database.OpenShiftClusters
|
||||
dbPortal database.Portal
|
||||
DbPortal database.Portal
|
||||
|
||||
dialer proxy.Dialer
|
||||
clientCache clientcache.ClientCache
|
||||
Env env.Core
|
||||
|
||||
ReverseProxy *httputil.ReverseProxy
|
||||
}
|
||||
|
||||
func New(baseLog *logrus.Entry,
|
||||
|
@ -55,42 +58,35 @@ func New(baseLog *logrus.Entry,
|
|||
dbOpenShiftClusters database.OpenShiftClusters,
|
||||
dbPortal database.Portal,
|
||||
dialer proxy.Dialer,
|
||||
aadAuthenticatedRouter,
|
||||
unauthenticatedRouter *mux.Router) *kubeconfig {
|
||||
k := &kubeconfig{
|
||||
log: baseLog,
|
||||
baseAccessLog: baseAccessLog,
|
||||
) *Kubeconfig {
|
||||
k := &Kubeconfig{
|
||||
Log: baseLog,
|
||||
BaseAccessLog: baseAccessLog,
|
||||
Audit: audit,
|
||||
|
||||
servingCert: servingCert,
|
||||
elevatedGroupIDs: elevatedGroupIDs,
|
||||
|
||||
dbOpenShiftClusters: dbOpenShiftClusters,
|
||||
dbPortal: dbPortal,
|
||||
DbPortal: dbPortal,
|
||||
|
||||
dialer: dialer,
|
||||
clientCache: clientcache.New(time.Hour),
|
||||
Env: env,
|
||||
}
|
||||
|
||||
rp := &httputil.ReverseProxy{
|
||||
k.ReverseProxy = &httputil.ReverseProxy{
|
||||
Director: k.director,
|
||||
Transport: roundtripper.RoundTripperFunc(k.roundTripper),
|
||||
ErrorLog: log.New(k.log.Writer(), "", 0),
|
||||
ErrorLog: log.New(k.Log.Writer(), "", 0),
|
||||
}
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/new").HandlerFunc(k.new)
|
||||
|
||||
bearerAuthenticatedRouter := unauthenticatedRouter.NewRoute().Subrouter()
|
||||
bearerAuthenticatedRouter.Use(middleware.Bearer(k.dbPortal))
|
||||
bearerAuthenticatedRouter.Use(middleware.Log(env, audit, k.baseAccessLog))
|
||||
|
||||
bearerAuthenticatedRouter.PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/proxy/").Handler(rp)
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
// new creates a new PortalDocument allowing kubeconfig access to a cluster for
|
||||
// New creates a New PortalDocument allowing kubeconfig access to a cluster for
|
||||
// 6 hours and returns a kubeconfig with the temporary credentials
|
||||
func (k *kubeconfig) new(w http.ResponseWriter, r *http.Request) {
|
||||
func (k *Kubeconfig) New(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
resourceID := strings.Join(strings.Split(r.URL.Path, "/")[:9], "/")
|
||||
|
@ -101,7 +97,7 @@ func (k *kubeconfig) new(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
elevated := len(middleware.GroupsIntersect(k.elevatedGroupIDs, ctx.Value(middleware.ContextKeyGroups).([]string))) > 0
|
||||
|
||||
token := k.dbPortal.NewUUID()
|
||||
token := k.DbPortal.NewUUID()
|
||||
portalDoc := &api.PortalDocument{
|
||||
ID: token,
|
||||
TTL: int(kubeconfigNewTimeout / time.Second),
|
||||
|
@ -114,7 +110,7 @@ func (k *kubeconfig) new(w http.ResponseWriter, r *http.Request) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err := k.dbPortal.Create(ctx, portalDoc)
|
||||
_, err := k.DbPortal.Create(ctx, portalDoc)
|
||||
if err != nil {
|
||||
k.internalServerError(w, err)
|
||||
return
|
||||
|
@ -136,12 +132,12 @@ func (k *kubeconfig) new(w http.ResponseWriter, r *http.Request) {
|
|||
_, _ = w.Write(b)
|
||||
}
|
||||
|
||||
func (k *kubeconfig) internalServerError(w http.ResponseWriter, err error) {
|
||||
k.log.Warn(err)
|
||||
func (k *Kubeconfig) internalServerError(w http.ResponseWriter, err error) {
|
||||
k.Log.Warn(err)
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
func (k *kubeconfig) makeKubeconfig(server, token string) ([]byte, error) {
|
||||
func (k *Kubeconfig) makeKubeconfig(server, token string) ([]byte, error) {
|
||||
return json.MarshalIndent(&clientcmdv1.Config{
|
||||
APIVersion: "v1",
|
||||
Kind: "Config",
|
||||
|
|
|
@ -148,7 +148,7 @@ func TestNew(t *testing.T) {
|
|||
_, audit := testlog.NewAudit()
|
||||
_, baseLog := testlog.New()
|
||||
_, baseAccessLog := testlog.New()
|
||||
_ = New(baseLog, audit, _env, baseAccessLog, servingCert, elevatedGroupIDs, nil, dbPortal, nil, aadAuthenticatedRouter, &mux.Router{})
|
||||
k := New(baseLog, audit, _env, baseAccessLog, servingCert, elevatedGroupIDs, nil, dbPortal, nil)
|
||||
|
||||
if tt.r != nil {
|
||||
tt.r(r)
|
||||
|
@ -156,6 +156,8 @@ func TestNew(t *testing.T) {
|
|||
|
||||
w := responsewriter.New(r)
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/new").HandlerFunc(k.New)
|
||||
|
||||
aadAuthenticatedRouter.ServeHTTP(w, r)
|
||||
|
||||
portalClient.SetError(nil)
|
||||
|
|
|
@ -42,7 +42,7 @@ const (
|
|||
// Unfortunately the signature of httputil.ReverseProxy.Director does not allow
|
||||
// us to return values. We get around this limitation slightly naughtily by
|
||||
// storing return information in the request context.
|
||||
func (k *kubeconfig) director(r *http.Request) {
|
||||
func (k *Kubeconfig) director(r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
portalDoc, _ := ctx.Value(middleware.ContextKeyPortalDoc).(*api.PortalDocument)
|
||||
|
@ -93,7 +93,7 @@ func (k *kubeconfig) director(r *http.Request) {
|
|||
|
||||
// cli returns an appropriately configured HTTP client for forwarding the
|
||||
// incoming request to a cluster
|
||||
func (k *kubeconfig) cli(ctx context.Context, resourceID string, elevated bool) (*http.Client, error) {
|
||||
func (k *Kubeconfig) cli(ctx context.Context, resourceID string, elevated bool) (*http.Client, error) {
|
||||
openShiftDoc, err := k.dbOpenShiftClusters.Get(ctx, resourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -154,7 +154,7 @@ func (k *kubeconfig) cli(ctx context.Context, resourceID string, elevated bool)
|
|||
// roundTripper is called by ReverseProxy to make the onward request happen. We
|
||||
// check if we had an error earlier and return that if we did. Otherwise we dig
|
||||
// out the client and call it.
|
||||
func (k *kubeconfig) roundTripper(r *http.Request) (*http.Response, error) {
|
||||
func (k *Kubeconfig) roundTripper(r *http.Request) (*http.Response, error) {
|
||||
if resp, ok := r.Context().Value(contextKeyResponse).(*http.Response); ok {
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -172,9 +172,9 @@ func (k *kubeconfig) roundTripper(r *http.Request) (*http.Response, error) {
|
|||
return resp, err
|
||||
}
|
||||
|
||||
func (k *kubeconfig) error(r *http.Request, statusCode int, err error) {
|
||||
func (k *Kubeconfig) error(r *http.Request, statusCode int, err error) {
|
||||
if err != nil {
|
||||
k.log.Warn(err)
|
||||
k.Log.Warn(err)
|
||||
}
|
||||
|
||||
w := responsewriter.New(r)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/Azure/ARO-RP/pkg/api"
|
||||
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
|
||||
"github.com/Azure/ARO-RP/pkg/portal/middleware"
|
||||
"github.com/Azure/ARO-RP/pkg/portal/util/responsewriter"
|
||||
"github.com/Azure/ARO-RP/pkg/util/azureclient"
|
||||
mock_env "github.com/Azure/ARO-RP/pkg/util/mocks/env"
|
||||
|
@ -392,12 +393,16 @@ func TestProxy(t *testing.T) {
|
|||
tt.mocks(dialer)
|
||||
}
|
||||
|
||||
unauthenticatedRouter := &mux.Router{}
|
||||
|
||||
_, audit := testlog.NewAudit()
|
||||
_, baseLog := testlog.New()
|
||||
_, baseAccessLog := testlog.New()
|
||||
_ = New(baseLog, audit, _env, baseAccessLog, nil, nil, dbOpenShiftClusters, dbPortal, dialer, &mux.Router{}, unauthenticatedRouter)
|
||||
k := New(baseLog, audit, _env, baseAccessLog, nil, nil, dbOpenShiftClusters, dbPortal, dialer)
|
||||
|
||||
unauthenticatedRouter := &mux.Router{}
|
||||
unauthenticatedRouter.Use(middleware.Bearer(k.DbPortal))
|
||||
unauthenticatedRouter.Use(middleware.Log(k.Env, audit, k.BaseAccessLog))
|
||||
|
||||
unauthenticatedRouter.PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/proxy/").Handler(k.ReverseProxy)
|
||||
|
||||
if tt.r != nil {
|
||||
tt.r(r)
|
||||
|
|
|
@ -44,16 +44,13 @@ type Runnable interface {
|
|||
}
|
||||
|
||||
type portal struct {
|
||||
env env.Core
|
||||
audit *logrus.Entry
|
||||
log *logrus.Entry
|
||||
baseAccessLog *logrus.Entry
|
||||
l net.Listener
|
||||
sshl net.Listener
|
||||
verifier oidc.Verifier
|
||||
baseRouter *mux.Router
|
||||
authenticatedRouter *mux.Router
|
||||
publicRouter *mux.Router
|
||||
env env.Core
|
||||
audit *logrus.Entry
|
||||
log *logrus.Entry
|
||||
baseAccessLog *logrus.Entry
|
||||
l net.Listener
|
||||
sshl net.Listener
|
||||
verifier oidc.Verifier
|
||||
|
||||
hostname string
|
||||
servingKey *rsa.PrivateKey
|
||||
|
@ -132,35 +129,32 @@ func NewPortal(env env.Core,
|
|||
}
|
||||
}
|
||||
|
||||
func (p *portal) setupRouter() error {
|
||||
if p.baseRouter != nil {
|
||||
return fmt.Errorf("can't setup twice")
|
||||
}
|
||||
|
||||
func (p *portal) setupRouter(kconfig *kubeconfig.Kubeconfig, prom *prometheus.Prometheus, sshStruct *ssh.SSH) (*mux.Router, error) {
|
||||
r := mux.NewRouter()
|
||||
r.Use(middleware.Panic(p.log))
|
||||
|
||||
assetv1, err := assets.EmbeddedFiles.ReadFile("v1/build/index.html")
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
assetv2, err := assets.EmbeddedFiles.ReadFile("v2/build/index.html")
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.templateV1, err = template.New("index.html").Parse(string(assetv1))
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.templateV2, err = template.New("index.html").Parse(string(assetv2))
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
unauthenticatedRouter := r.NewRoute().Subrouter()
|
||||
bearerRoutes(unauthenticatedRouter, kconfig)
|
||||
p.unauthenticatedRoutes(unauthenticatedRouter)
|
||||
|
||||
allGroups := append([]string{}, p.groupIDs...)
|
||||
|
@ -168,7 +162,7 @@ func (p *portal) setupRouter() error {
|
|||
|
||||
p.aad, err = middleware.NewAAD(p.log, p.audit, p.env, p.baseAccessLog, p.hostname, p.sessionKey, p.clientID, p.clientKey, p.clientCerts, allGroups, unauthenticatedRouter, p.verifier)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
aadAuthenticatedRouter := r.NewRoute().Subrouter()
|
||||
|
@ -177,31 +171,27 @@ func (p *portal) setupRouter() error {
|
|||
aadAuthenticatedRouter.Use(p.aad.CheckAuthentication)
|
||||
aadAuthenticatedRouter.Use(csrf.Protect(p.sessionKey, csrf.SameSite(csrf.SameSiteStrictMode), csrf.MaxAge(0), csrf.Path("/")))
|
||||
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter)
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter, prom, kconfig, sshStruct)
|
||||
|
||||
p.baseRouter = r
|
||||
p.publicRouter = unauthenticatedRouter
|
||||
p.authenticatedRouter = aadAuthenticatedRouter
|
||||
|
||||
return nil
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (p *portal) setupServices() error {
|
||||
ssh, err := ssh.New(p.env, p.log, p.baseAccessLog, p.sshl, p.sshKey, p.elevatedGroupIDs, p.dbOpenShiftClusters, p.dbPortal, p.dialer, p.authenticatedRouter)
|
||||
func (p *portal) setupServices() (*kubeconfig.Kubeconfig, *prometheus.Prometheus, *ssh.SSH, error) {
|
||||
ssh, err := ssh.New(p.env, p.log, p.baseAccessLog, p.sshl, p.sshKey, p.elevatedGroupIDs, p.dbOpenShiftClusters, p.dbPortal, p.dialer)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
err = ssh.Run()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
kubeconfig.New(p.log, p.audit, p.env, p.baseAccessLog, p.servingCerts[0], p.elevatedGroupIDs, p.dbOpenShiftClusters, p.dbPortal, p.dialer, p.authenticatedRouter, p.publicRouter)
|
||||
k := kubeconfig.New(p.log, p.audit, p.env, p.baseAccessLog, p.servingCerts[0], p.elevatedGroupIDs, p.dbOpenShiftClusters, p.dbPortal, p.dialer)
|
||||
|
||||
prometheus.New(p.log, p.dbOpenShiftClusters, p.dialer, p.authenticatedRouter)
|
||||
prom := prometheus.New(p.log, p.dbOpenShiftClusters, p.dialer)
|
||||
|
||||
return nil
|
||||
return k, prom, ssh, nil
|
||||
}
|
||||
|
||||
func (p *portal) Run(ctx context.Context) error {
|
||||
|
@ -232,19 +222,17 @@ func (p *portal) Run(ctx context.Context) error {
|
|||
config.Certificates[0].Certificate = append(config.Certificates[0].Certificate, cert.Raw)
|
||||
}
|
||||
|
||||
if p.baseRouter == nil {
|
||||
err := p.setupRouter()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.setupServices()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k, prom, sshStruct, err := p.setupServices()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
router, err := p.setupRouter(k, prom, sshStruct)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s := &http.Server{
|
||||
Handler: frontendmiddleware.Lowercase(p.baseRouter),
|
||||
Handler: frontendmiddleware.Lowercase(router),
|
||||
ReadTimeout: 10 * time.Second,
|
||||
IdleTimeout: 2 * time.Minute,
|
||||
ErrorLog: log.New(p.log.Writer(), "", 0),
|
||||
|
@ -256,13 +244,23 @@ func (p *portal) Run(ctx context.Context) error {
|
|||
return s.Serve(tls.NewListener(p.l, config))
|
||||
}
|
||||
|
||||
func bearerRoutes(r *mux.Router, k *kubeconfig.Kubeconfig) {
|
||||
if k != nil {
|
||||
bearerAuthenticatedRouter := r.NewRoute().Subrouter()
|
||||
bearerAuthenticatedRouter.Use(middleware.Bearer(k.DbPortal))
|
||||
bearerAuthenticatedRouter.Use(middleware.Log(k.Env, k.Audit, k.BaseAccessLog))
|
||||
|
||||
bearerAuthenticatedRouter.PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/proxy/").Handler(k.ReverseProxy)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *portal) unauthenticatedRoutes(r *mux.Router) {
|
||||
logger := middleware.Log(p.env, p.audit, p.baseAccessLog)
|
||||
|
||||
r.Methods(http.MethodGet).Path("/healthz/ready").Handler(logger(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})))
|
||||
}
|
||||
|
||||
func (p *portal) aadAuthenticatedRoutes(r *mux.Router) {
|
||||
func (p *portal) aadAuthenticatedRoutes(r *mux.Router, prom *prometheus.Prometheus, kconfig *kubeconfig.Kubeconfig, sshStruct *ssh.SSH) {
|
||||
var names []string
|
||||
|
||||
err := fs.WalkDir(assets.EmbeddedFiles, ".", func(path string, entry fs.DirEntry, err error) error {
|
||||
|
@ -308,6 +306,24 @@ func (p *portal) aadAuthenticatedRoutes(r *mux.Router) {
|
|||
r.Path("/api/{subscription}/{resourceGroup}/{clusterName}/machines").HandlerFunc(p.machines)
|
||||
r.Path("/api/{subscription}/{resourceGroup}/{clusterName}/machine-sets").HandlerFunc(p.machineSets)
|
||||
r.Path("/api/{subscription}/{resourceGroup}/{clusterName}").HandlerFunc(p.clusterInfo)
|
||||
|
||||
// prometheus
|
||||
if prom != nil {
|
||||
r.Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path += "/"
|
||||
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
|
||||
})
|
||||
|
||||
r.PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus/").Handler(prom.ReverseProxy)
|
||||
}
|
||||
|
||||
//kubeconfig
|
||||
if kconfig != nil {
|
||||
r.Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/kubeconfig/new").HandlerFunc(kconfig.New)
|
||||
}
|
||||
|
||||
// ssh
|
||||
r.Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/ssh/new").HandlerFunc(sshStruct.New)
|
||||
}
|
||||
|
||||
func (p *portal) index(w http.ResponseWriter, r *http.Request) {
|
||||
|
|
|
@ -5,11 +5,9 @@ package prometheus
|
|||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/Azure/ARO-RP/pkg/database"
|
||||
|
@ -18,20 +16,22 @@ import (
|
|||
"github.com/Azure/ARO-RP/pkg/util/roundtripper"
|
||||
)
|
||||
|
||||
type prometheus struct {
|
||||
type Prometheus struct {
|
||||
log *logrus.Entry
|
||||
|
||||
dbOpenShiftClusters database.OpenShiftClusters
|
||||
|
||||
dialer proxy.Dialer
|
||||
clientCache clientcache.ClientCache
|
||||
|
||||
ReverseProxy *httputil.ReverseProxy
|
||||
}
|
||||
|
||||
func New(baseLog *logrus.Entry,
|
||||
dbOpenShiftClusters database.OpenShiftClusters,
|
||||
dialer proxy.Dialer,
|
||||
aadAuthenticatedRouter *mux.Router) *prometheus {
|
||||
p := &prometheus{
|
||||
) *Prometheus {
|
||||
p := &Prometheus{
|
||||
log: baseLog,
|
||||
|
||||
dbOpenShiftClusters: dbOpenShiftClusters,
|
||||
|
@ -39,20 +39,12 @@ func New(baseLog *logrus.Entry,
|
|||
dialer: dialer,
|
||||
clientCache: clientcache.New(time.Hour),
|
||||
}
|
||||
|
||||
rp := &httputil.ReverseProxy{
|
||||
Director: p.director,
|
||||
Transport: roundtripper.RoundTripperFunc(p.roundTripper),
|
||||
ModifyResponse: p.modifyResponse,
|
||||
p.ReverseProxy = &httputil.ReverseProxy{
|
||||
Director: p.Director,
|
||||
Transport: roundtripper.RoundTripperFunc(p.RoundTripper),
|
||||
ModifyResponse: p.ModifyResponse,
|
||||
ErrorLog: log.New(p.log.Writer(), "", 0),
|
||||
}
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path += "/"
|
||||
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
|
||||
})
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus/").Handler(rp)
|
||||
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -33,7 +33,8 @@ const (
|
|||
contextKeyResponse
|
||||
)
|
||||
|
||||
func (p *prometheus) director(r *http.Request) {
|
||||
// Director modifies the request to point to the clusters prometheus instance
|
||||
func (p *Prometheus) Director(r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
resourceID := strings.Join(strings.Split(r.URL.Path, "/")[:9], "/")
|
||||
|
@ -68,7 +69,7 @@ func (p *prometheus) director(r *http.Request) {
|
|||
*r = *r.WithContext(context.WithValue(ctx, contextKeyClient, cli))
|
||||
}
|
||||
|
||||
func (p *prometheus) cli(ctx context.Context, resourceID string) (*http.Client, error) {
|
||||
func (p *Prometheus) cli(ctx context.Context, resourceID string) (*http.Client, error) {
|
||||
openShiftDoc, err := p.dbOpenShiftClusters.Get(ctx, resourceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -88,7 +89,7 @@ func (p *prometheus) cli(ctx context.Context, resourceID string) (*http.Client,
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (p *prometheus) roundTripper(r *http.Request) (*http.Response, error) {
|
||||
func (p *Prometheus) RoundTripper(r *http.Request) (*http.Response, error) {
|
||||
if resp, ok := r.Context().Value(contextKeyResponse).(*http.Response); ok {
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -97,12 +98,12 @@ func (p *prometheus) roundTripper(r *http.Request) (*http.Response, error) {
|
|||
return cli.Do(r)
|
||||
}
|
||||
|
||||
// modifyResponse: unfortunately Prometheus serves HTML files containing just a
|
||||
// ModifyResponse: unfortunately Prometheus serves HTML files containing just a
|
||||
// couple of absolute links. Given that we're serving Prometheus under
|
||||
// /subscriptions/.../clusterName/prometheus, we need to dig these out and
|
||||
// rewrite them. This is a hack which hopefully goes away once we forward all
|
||||
// metrics to Kusto.
|
||||
func (p *prometheus) modifyResponse(r *http.Response) error {
|
||||
func (p *Prometheus) ModifyResponse(r *http.Response) error {
|
||||
mediaType, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
||||
if mediaType != "text/html" {
|
||||
return nil
|
||||
|
@ -169,7 +170,7 @@ func walk(n *html.Node, f func(*html.Node)) {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *prometheus) error(r *http.Request, statusCode int, err error) {
|
||||
func (p *Prometheus) error(r *http.Request, statusCode int, err error) {
|
||||
if err != nil {
|
||||
p.log.Print(err)
|
||||
}
|
||||
|
|
|
@ -277,16 +277,22 @@ func TestProxy(t *testing.T) {
|
|||
tt.mocks(dialer)
|
||||
}
|
||||
|
||||
prom := New(logrus.NewEntry(logrus.StandardLogger()), dbOpenShiftClusters, dialer)
|
||||
aadAuthenticatedRouter := &mux.Router{}
|
||||
|
||||
New(logrus.NewEntry(logrus.StandardLogger()), dbOpenShiftClusters, dialer, aadAuthenticatedRouter)
|
||||
|
||||
if tt.r != nil {
|
||||
tt.r(r)
|
||||
}
|
||||
|
||||
w := responsewriter.New(r)
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path += "/"
|
||||
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
|
||||
})
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().PathPrefix("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/prometheus/").Handler(prom.ReverseProxy)
|
||||
|
||||
aadAuthenticatedRouter.ServeHTTP(w, r)
|
||||
|
||||
resp := w.Response()
|
||||
|
@ -346,9 +352,9 @@ func TestModifyResponse(t *testing.T) {
|
|||
Body: io.NopCloser(strings.NewReader(tt.body)),
|
||||
}
|
||||
|
||||
p := &prometheus{}
|
||||
p := &Prometheus{}
|
||||
|
||||
err := p.modifyResponse(r)
|
||||
err := p.ModifyResponse(r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ func TestRegionListPublic(t *testing.T) {
|
|||
}
|
||||
|
||||
aadAuthenticatedRouter := mux.NewRouter()
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter)
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter, nil, nil, nil)
|
||||
w := httptest.NewRecorder()
|
||||
aadAuthenticatedRouter.ServeHTTP(w, req)
|
||||
|
||||
|
@ -251,7 +251,7 @@ func TestRegionListFF(t *testing.T) {
|
|||
}
|
||||
|
||||
aadAuthenticatedRouter := mux.NewRouter()
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter)
|
||||
p.aadAuthenticatedRoutes(aadAuthenticatedRouter, nil, nil, nil)
|
||||
w := httptest.NewRecorder()
|
||||
aadAuthenticatedRouter.ServeHTTP(w, req)
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ const (
|
|||
sshTimeout = time.Hour // never allow a connection to live longer than an hour.
|
||||
)
|
||||
|
||||
func (s *ssh) Run() error {
|
||||
func (s *SSH) Run() error {
|
||||
go func() {
|
||||
defer recover.Panic(s.log)
|
||||
|
||||
|
@ -79,7 +79,7 @@ func (s *ssh) Run() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *ssh) newConn(ctx context.Context, clientConn net.Conn) error {
|
||||
func (s *SSH) newConn(ctx context.Context, clientConn net.Conn) error {
|
||||
defer clientConn.Close()
|
||||
|
||||
config := &cryptossh.ServerConfig{}
|
||||
|
@ -195,7 +195,7 @@ func (s *ssh) newConn(ctx context.Context, clientConn net.Conn) error {
|
|||
|
||||
// proxyConn handles incoming new channel and administrative requests. It calls
|
||||
// newChannel to handle new channels, each on a new goroutine.
|
||||
func (s *ssh) proxyConn(ctx context.Context, accessLog *logrus.Entry, keyring agent.Agent, upstreamConn, downstreamConn cryptossh.Conn, upstreamNewChannels, downstreamNewChannels <-chan cryptossh.NewChannel, upstreamRequests, downstreamRequests <-chan *cryptossh.Request) error {
|
||||
func (s *SSH) proxyConn(ctx context.Context, accessLog *logrus.Entry, keyring agent.Agent, upstreamConn, downstreamConn cryptossh.Conn, upstreamNewChannels, downstreamNewChannels <-chan cryptossh.NewChannel, upstreamRequests, downstreamRequests <-chan *cryptossh.Request) error {
|
||||
timer := time.NewTimer(sshTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
|
@ -256,7 +256,7 @@ func (s *ssh) proxyConn(ctx context.Context, accessLog *logrus.Entry, keyring ag
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ssh) handleAgent(accessLog *logrus.Entry, nc cryptossh.NewChannel, keyring agent.Agent) error {
|
||||
func (s *SSH) handleAgent(accessLog *logrus.Entry, nc cryptossh.NewChannel, keyring agent.Agent) error {
|
||||
ch, rs, err := nc.Accept()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -277,7 +277,7 @@ func (s *ssh) handleAgent(accessLog *logrus.Entry, nc cryptossh.NewChannel, keyr
|
|||
// newChannel handles an incoming request to create a new channel. If the
|
||||
// channel creation is successful, it calls proxyChannel to proxy the channel
|
||||
// between SRE and cluster.
|
||||
func (s *ssh) newChannel(ctx context.Context, accessLog *logrus.Entry, nc cryptossh.NewChannel, upstreamConn, downstreamConn cryptossh.Conn, firstSession bool) error {
|
||||
func (s *SSH) newChannel(ctx context.Context, accessLog *logrus.Entry, nc cryptossh.NewChannel, upstreamConn, downstreamConn cryptossh.Conn, firstSession bool) error {
|
||||
defer recover.Panic(s.log)
|
||||
|
||||
ch2, rs2, err := downstreamConn.OpenChannel(nc.ChannelType(), nc.ExtraData())
|
||||
|
@ -310,7 +310,7 @@ func (s *ssh) newChannel(ctx context.Context, accessLog *logrus.Entry, nc crypto
|
|||
return s.proxyChannel(ch1, ch2, rs1, rs2)
|
||||
}
|
||||
|
||||
func (s *ssh) proxyGlobalRequest(r *cryptossh.Request, c cryptossh.Conn) error {
|
||||
func (s *SSH) proxyGlobalRequest(r *cryptossh.Request, c cryptossh.Conn) error {
|
||||
ok, payload, err := c.SendRequest(r.Type, r.WantReply, r.Payload)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -319,7 +319,7 @@ func (s *ssh) proxyGlobalRequest(r *cryptossh.Request, c cryptossh.Conn) error {
|
|||
return r.Reply(ok, payload)
|
||||
}
|
||||
|
||||
func (s *ssh) proxyRequest(r *cryptossh.Request, ch cryptossh.Channel) error {
|
||||
func (s *SSH) proxyRequest(r *cryptossh.Request, ch cryptossh.Channel) error {
|
||||
ok, err := ch.SendRequest(r.Type, r.WantReply, r.Payload)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -328,7 +328,7 @@ func (s *ssh) proxyRequest(r *cryptossh.Request, ch cryptossh.Channel) error {
|
|||
return r.Reply(ok, nil)
|
||||
}
|
||||
|
||||
func (s *ssh) proxyChannel(ch1, ch2 cryptossh.Channel, rs1, rs2 <-chan *cryptossh.Request) error {
|
||||
func (s *SSH) proxyChannel(ch1, ch2 cryptossh.Channel, rs1, rs2 <-chan *cryptossh.Request) error {
|
||||
g := errgroup.Group{}
|
||||
|
||||
g.Go(func() error {
|
||||
|
@ -382,7 +382,7 @@ func (s *ssh) proxyChannel(ch1, ch2 cryptossh.Channel, rs1, rs2 <-chan *cryptoss
|
|||
return g.Wait()
|
||||
}
|
||||
|
||||
func (s *ssh) keepAliveConn(ctx context.Context, channel cryptossh.Channel) {
|
||||
func (s *SSH) keepAliveConn(ctx context.Context, channel cryptossh.Channel) {
|
||||
ticker := time.NewTicker(keepAliveInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"crypto/x509"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
@ -412,11 +413,14 @@ func TestProxy(t *testing.T) {
|
|||
|
||||
hook, log := testlog.New()
|
||||
|
||||
s, err := New(nil, nil, log, nil, hostKey, nil, dbOpenShiftClusters, dbPortal, dialer, &mux.Router{})
|
||||
s, err := New(nil, nil, log, nil, hostKey, nil, dbOpenShiftClusters, dbPortal, dialer)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := mux.NewRouter()
|
||||
r.Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/ssh/new").HandlerFunc(s.New)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
cryptossh "golang.org/x/crypto/ssh"
|
||||
|
||||
|
@ -29,7 +28,7 @@ const (
|
|||
sshNewTimeout = time.Minute
|
||||
)
|
||||
|
||||
type ssh struct {
|
||||
type SSH struct {
|
||||
env env.Core
|
||||
log *logrus.Entry
|
||||
baseAccessLog *logrus.Entry
|
||||
|
@ -54,8 +53,8 @@ func New(env env.Core,
|
|||
dbOpenShiftClusters database.OpenShiftClusters,
|
||||
dbPortal database.Portal,
|
||||
dialer proxy.Dialer,
|
||||
aadAuthenticatedRouter *mux.Router) (*ssh, error) {
|
||||
s := &ssh{
|
||||
) (*SSH, error) {
|
||||
s := &SSH{
|
||||
env: env,
|
||||
log: log,
|
||||
baseAccessLog: baseAccessLog,
|
||||
|
@ -78,8 +77,6 @@ func New(env env.Core,
|
|||
|
||||
s.baseServerConfig.AddHostKey(signer)
|
||||
|
||||
aadAuthenticatedRouter.NewRoute().Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/ssh/new").HandlerFunc(s.new)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -93,7 +90,9 @@ type response struct {
|
|||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (s *ssh) new(w http.ResponseWriter, r *http.Request) {
|
||||
// New creates a new temporary password from the request params and sends it
|
||||
// through the writer
|
||||
func (s *SSH) New(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
parts := strings.Split(r.URL.Path, "/")
|
||||
|
@ -171,7 +170,7 @@ func (s *ssh) new(w http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *ssh) sendResponse(w http.ResponseWriter, resp *response) {
|
||||
func (s *SSH) sendResponse(w http.ResponseWriter, resp *response) {
|
||||
b, err := json.MarshalIndent(resp, "", " ")
|
||||
if err != nil {
|
||||
s.internalServerError(w, err)
|
||||
|
@ -182,7 +181,7 @@ func (s *ssh) sendResponse(w http.ResponseWriter, resp *response) {
|
|||
_, _ = w.Write(b)
|
||||
}
|
||||
|
||||
func (s *ssh) internalServerError(w http.ResponseWriter, err error) {
|
||||
func (s *SSH) internalServerError(w http.ResponseWriter, err error) {
|
||||
s.log.Warn(err)
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
}
|
||||
|
|
|
@ -138,20 +138,21 @@ func TestNew(t *testing.T) {
|
|||
env := mock_env.NewMockCore(ctrl)
|
||||
env.EXPECT().IsLocalDevelopmentMode().AnyTimes().Return(false)
|
||||
|
||||
aadAuthenticatedRouter := &mux.Router{}
|
||||
|
||||
_, err = New(env, logrus.NewEntry(logrus.StandardLogger()), nil, nil, hostKey, elevatedGroupIDs, nil, dbPortal, nil, aadAuthenticatedRouter)
|
||||
s, err := New(env, logrus.NewEntry(logrus.StandardLogger()), nil, nil, hostKey, elevatedGroupIDs, nil, dbPortal, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.Methods(http.MethodPost).Path("/subscriptions/{subscriptionId}/resourcegroups/{resourceGroupName}/providers/microsoft.redhatopenshift/openshiftclusters/{resourceName}/ssh/new").HandlerFunc(s.New)
|
||||
|
||||
if tt.r != nil {
|
||||
tt.r(r)
|
||||
}
|
||||
|
||||
w := responsewriter.New(r)
|
||||
|
||||
aadAuthenticatedRouter.ServeHTTP(w, r)
|
||||
router.ServeHTTP(w, r)
|
||||
|
||||
portalClient.SetError(nil)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче