Merge pull request #2 from microsoft/add-cnetstat-source

Initial commit of cnetstat source code
This commit is contained in:
noahl 2020-09-24 10:24:44 -04:00 коммит произвёл GitHub
Родитель 173521d2c6 4042c9cf93
Коммит daecd05d08
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 855 добавлений и 0 удалений

259
cnetstat.go Normal file
Просмотреть файл

@ -0,0 +1,259 @@
package main
// A container-aware netstat.
//
// Dump a list of connections on a node, with their container and pod
// names. This currently assumes the containers run on docker with labels
// matching what my version of Kubelet does.
import (
"bufio"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
)
// A connection with a Kubernetes pod identifier instead of a PID
type KubeConnection struct {
conn Connection
container ContainerPath
}
// I'm not using the standard json module for JSON output because I
// want to flatten the KubeConnection before printing it
func writeKubeConnectionAsJSON(kc *KubeConnection, w io.Writer) error {
_, err := fmt.Fprintf(w,
"{\"protocol\": %v,"+
"\"local_host\": %v, "+
"\"local_port\": %v, "+
"\"remote_host\": %v, "+
"\"remote_port\": %v, "+
"\"connection_state\": %v, "+
"\"pod_namespace\": %v, "+
"\"pod_name\": %v, "+
"\"container_name\": %v}",
kc.conn.protocol,
kc.conn.localHost,
kc.conn.localPort,
kc.conn.remoteHost,
kc.conn.remotePort,
kc.conn.connectionState,
kc.container.PodNamespace,
kc.container.PodName,
kc.container.ContainerName)
return err
}
const subprocessTimeout = 5 * time.Second
const ppidColon string = "PPid:"
// Either return the parent PID of its argument, or an error
func parentOfPid(pid int) (int, error) {
fp, err := os.Open(fmt.Sprintf("/proc/%d/status", pid))
if err != nil {
return 0, err
}
defer fp.Close()
lines := bufio.NewScanner(fp)
for lines.Scan() {
line := lines.Text()
if strings.HasPrefix(line, ppidColon) {
pid, err := strconv.Atoi(line[len(ppidColon):])
if err != nil {
return 0, err
}
return pid, nil
}
}
return 0, fmt.Errorf("Couldn't find parent PID of PID %d", pid)
}
// Find the container a particular PID runs in, or return an error
func pidToPod(pid int, pidMap map[int]ContainerPath) (ContainerPath, error) {
// Remember the ancestors of this PID in case we have to
// search a process hierarchy
var ancestors []int
for {
kube_path, ok := pidMap[pid]
if ok {
// If we had to search for parents of the
// original pid, update the map so we won't
// have to do that again
for process, _ := range ancestors {
pidMap[process] = kube_path
}
return kube_path, nil
}
ancestors = append(ancestors, pid)
var err error
pid, err = parentOfPid(pid)
if err != nil {
return ContainerPath{}, err
}
}
}
// Map connections with PIDs into KubeConnections with container identifiers
func getKubeConnections(connections []Connection, pidMap map[int]ContainerPath) []KubeConnection {
kubeConnections := make([]KubeConnection, len(connections))
for i, conn := range connections {
pid := conn.pid
path, _ := pidToPod(pid, pidMap)
// If pidToPod returns an error, then path will be
// ContainerPath{}, which is what we want
kubeConnections[i] = KubeConnection{
conn: conn,
container: path,
}
}
return kubeConnections
}
// Convert empty strings to "-". Why? Because that's what netstat does
func emptyToDash(val string) string {
if len(val) > 0 {
return val
} else {
return "-"
}
}
var kubeConnectionHeaders = []string{
"Namespace", "Pod", "Container", "Protocol",
"Local Host", "Local Port", "Remote Host", "Remote Port",
"Connection State",
}
func (kc KubeConnection) Fields() []string {
return []string{
emptyToDash(kc.container.PodNamespace),
emptyToDash(kc.container.PodName),
emptyToDash(kc.container.ContainerName),
kc.conn.protocol,
kc.conn.localHost,
kc.conn.localPort,
kc.conn.remoteHost,
kc.conn.remotePort,
kc.conn.connectionState,
}
}
// Parse our arguments. Return the value of the format argument - either "table" or "json"
func parseArgs() (string, error) {
var format = flag.String("format", "table", "Output format. Either 'table' or 'json'")
flag.Parse()
// If we got any positional arguments, that's a user error
if len(flag.Args()) > 0 {
flag.Usage()
return "", fmt.Errorf("got extra arguments %v", flag.Args())
}
if (*format != "table") && (*format != "json") {
flag.Usage()
return "", fmt.Errorf("unrecognized format %v", format)
}
return *format, nil
}
// This is effectively main, but moving it to a separate function
// makes the error handling simpler
func cnetstat() error {
format, err := parseArgs()
if err != nil {
return err
}
// It would be possible to run as non-root and return less
// information, but that makes the netstat parsing more
// complicated (since netstat will also print a warning
// message), and for our use-case we really want all the data,
// so just run it as root.
if os.Geteuid() != 0 {
return fmt.Errorf("cnetstat must run as root")
}
namespaces, err := listNetNamespaces()
if err != nil {
return err
}
pidMap, err := buildPidMap()
if err != nil {
return err
}
// connections has one slice of Connections for each namespace
var connections = make([][]Connection, len(namespaces))
for i, namespace := range namespaces {
conns, err := getConnectionsFromNamespace(namespace.Pid)
if err != nil {
return err
}
connections[i] = conns
}
// count the total number of connections, so we can ...
var totalConnections int
for _, conns := range connections {
totalConnections += len(conns)
}
// ... flatten them into a single slice of all connections
// with just one allocation
allConnections := make([]Connection, totalConnections)
offset := 0
for _, conns := range connections {
copy(allConnections[offset:], conns)
offset += len(conns)
}
kubeConnections := getKubeConnections(allConnections, pidMap)
switch format {
case "json":
for _, conn := range kubeConnections {
err := writeKubeConnectionAsJSON(&conn, os.Stdout)
if err != nil {
return err
}
os.Stdout.WriteString("\n")
}
case "table":
table := make([]Fielder, len(kubeConnections))
for i, _ := range kubeConnections {
table[i] = &kubeConnections[i]
}
prettyPrintTable(table, kubeConnectionHeaders, os.Stdout)
}
return nil
}
func main() {
err := cnetstat()
if err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)
}
os.Exit(0)
}

111
dockerPidMap.go Normal file
Просмотреть файл

@ -0,0 +1,111 @@
package main
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strings"
)
// A ContainerPath identifies a container in Kubernetes
type ContainerPath struct {
PodNamespace string
PodName string
ContainerName string
}
// A DockerContainer connects a container's docker ID and its
// Kubernetes ContainerPath
type DockerContainer struct {
kubePath ContainerPath
dockerId string
}
// parseDockerContainerList parses the output of
// `docker ps --format "{{.ID}} {{.Labels}}"`
// and returns a list of DockerContainers
//
// There will be one Docker container per pod with the special
// container_name 'POD'. This container holds the cgroups for the pod,
// but doesn't correspond to any Kubernetes container.
func parseDockerContainerList(docker_out io.Reader) ([]DockerContainer, error) {
var result []DockerContainer
var scanner = bufio.NewScanner(docker_out)
for scanner.Scan() {
line := scanner.Text()
columns := strings.SplitN(line, " ", 2)
if len(columns) != 2 {
return nil, fmt.Errorf("Couldn't parse Docker output line %v", line)
}
container_id := columns[0]
labels := strings.Split(columns[1], ",")
container_path := ContainerPath{}
for _, label := range labels {
if strings.Contains(label, "=") {
parts := strings.SplitN(label, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Couldn't parse container label %v", label)
}
key := parts[0]
value := parts[1]
switch key {
case "io.kubernetes.pod.name":
container_path.PodName = value
case "io.kubernetes.pod.namespace":
container_path.PodNamespace = value
case "io.kubernetes.container.name":
container_path.ContainerName = value
}
}
}
result = append(result, DockerContainer{kubePath: container_path,
dockerId: container_id})
}
return result, nil
}
// Build a map from host PIDs to ContainerPaths.
func buildPidMap() (map[int]ContainerPath, error) {
ctx, _ := context.WithTimeout(context.Background(), subprocessTimeout)
dockerPsOut, err := exec.CommandContext(ctx, "docker", "ps", "--format", "{{.ID}} {{.Labels}}").Output()
if err != nil {
return nil, err
}
dockerContainers, err := parseDockerContainerList(strings.NewReader(string(dockerPsOut)))
if err != nil {
return nil, err
}
pidMap := make(map[int]ContainerPath)
for _, container := range dockerContainers {
ctx, _ := context.WithTimeout(context.Background(), subprocessTimeout)
dockerInspectOut, err := exec.CommandContext(
ctx, "docker", "inspect", "--format", "{{.State.Pid}}", container.dockerId).Output()
if err != nil {
// We expect errors here if a container was
// deleted between `docker ps` and here.
continue
}
var rootPid int
_, err = fmt.Fscan(strings.NewReader(string(dockerInspectOut)), &rootPid)
if err != nil {
return nil, err
}
pidMap[rootPid] = container.kubePath
}
return pidMap, nil
}

55
dockerPidMap_test.go Normal file
Просмотреть файл

@ -0,0 +1,55 @@
package main
import (
"strings"
"testing"
)
// This should match the output format of 'docker ps --format "{{.ID}} {{.Labels}}"'
const dockerPsOutput = `56443455 component=foo,io.kubernetes.pod.name=frontend,io.kubernetes.pod.namespace=my-app,a=b,c=d,io.kubernetes.container.name=fe-server
fab8905c component=bar,io.kubernetes.pod.name=frontend,io.kubernetes.pod.namespace=my-app,x=y,z=w,io.kubernetes.container.name=log-shipper
a01098fd component=baz,io.kubernetes.pod.name=frontend,io.kubernetes.pod.namespace=my-app,g=x,h=j,io.kubernetes.container.name=POD
65323bda component=bot,io.kubernetes.pod.name=backend,io.kubernetes.pod.namespace=my-app,h=k,j=r,io.kubernetes.container.name=be-server`
var parsedOutput = [4]DockerContainer{
DockerContainer{dockerId: "56443455",
kubePath: ContainerPath{
PodName: "frontend",
PodNamespace: "my-app",
ContainerName: "fe-server"}},
DockerContainer{dockerId: "fab8905c",
kubePath: ContainerPath{
PodName: "frontend",
PodNamespace: "my-app",
ContainerName: "log-shipper"}},
DockerContainer{dockerId: "a01098fd",
kubePath: ContainerPath{
PodName: "frontend",
PodNamespace: "my-app",
ContainerName: "POD"}},
DockerContainer{dockerId: "65323bda",
kubePath: ContainerPath{
PodName: "backend",
PodNamespace: "my-app",
ContainerName: "be-server"}},
}
func TestParseDockerContainerList(t *testing.T) {
got, err := parseDockerContainerList(strings.NewReader(dockerPsOutput))
if err != nil {
t.Logf("Got error %v from GetDockerContainers", err)
t.FailNow()
}
if len(got) != len(parsedOutput) {
t.Logf("Got %v containers, expected %v", len(got), len(parsedOutput))
t.FailNow()
}
for i, expected := range parsedOutput {
if expected != got[i] {
t.Errorf("Mismatched docker output parse")
}
}
}

42
lsns.go Normal file
Просмотреть файл

@ -0,0 +1,42 @@
package main
import (
"context"
"encoding/json"
"os/exec"
)
type NamespaceData struct {
Ns string
Pid string
Command string
}
// Parse output from 'lsns --json --output ns,pid,command'
func parseLsnsOutput(blob []byte) ([]NamespaceData, error) {
var dummy map[string][]NamespaceData
err := json.Unmarshal(blob, &dummy)
if err != nil {
return nil, err
}
// The map should always contain a single element, with key
// "namespaces"
return dummy["namespaces"], nil
}
// Run lsns and parse the output.
// NOTE: if not run as root, lsns will succeed, but not necessarily
// return all namespaces
func listNetNamespaces() ([]NamespaceData, error) {
ctx, _ := context.WithTimeout(context.Background(), subprocessTimeout)
output, err := exec.CommandContext(ctx, "lsns", "--json", "--type", "net", "--output", "ns,pid,command").Output()
if err != nil {
return nil, err
}
return parseLsnsOutput(output)
}

44
lsns_test.go Normal file
Просмотреть файл

@ -0,0 +1,44 @@
package main
import (
"testing"
)
// This matches the format of 'sudo lsns --json --type net --output ns,pid,command'
var lsnsOutput = []byte(`
{
"namespaces": [
{"ns": "23", "pid": "1", "command": "/sbin/init"},
{"ns": "24", "pid": "96", "command": "/pause"},
{"ns": "25", "pid": "284", "command": "/pause"}
]
}
`)
var lsnsCorrectParse = []NamespaceData{
NamespaceData{Ns: "23", Pid: "1", Command: "/sbin/init"},
NamespaceData{Ns: "24", Pid: "96", Command: "/pause"},
NamespaceData{Ns: "25", Pid: "284", Command: "/pause"},
}
func TestParseLsnsOutput(t *testing.T) {
namespaces, err := parseLsnsOutput(lsnsOutput)
if err != nil {
t.Logf("Got error '%v' from parseLsnsOutput", err)
t.FailNow()
}
if len(namespaces) != len(lsnsCorrectParse) {
t.Logf("Got %v namespaces from lsnsCorrectParse, expected %v",
len(namespaces), len(lsnsCorrectParse))
t.FailNow()
}
for i, expected := range lsnsCorrectParse {
if namespaces[i] != expected {
t.Errorf("Bad parse of namespace %v: expected %v, got %v",
i, expected, namespaces[i])
}
}
}

132
netstat.go Normal file
Просмотреть файл

@ -0,0 +1,132 @@
package main
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strconv"
"strings"
)
// A connection as returned by netstat (also as seen by the kernel)
type Connection struct {
protocol string // Either "tcp" or "tcp6"
localHost string // Either an IP address or a hostname
localPort string // Either a number or a well-known protocol like "http"
remoteHost string // Like localHost
remotePort string // Like localPort
connectionState string // "ESTABLISHED", "TIME_WAIT", etc.
pid int // 0 if unknown. Connections in TIME_WAIT will have a zero pid
}
// Split a netstat address into a host and a port. An address can be
// hostname:port
// IPv4addr:port
// [IPv6addr]:port
// and port can be a number or a string describing a well-known service
// (i.e. 'http' instead of 80)
func hostAndPort(address string) (string, string, error) {
split := strings.LastIndexByte(address, byte(':'))
if split == -1 {
return "", "", fmt.Errorf("No : in address %v", address)
}
return address[:split], address[split+1:], nil
}
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, a_elt := range a {
b_elt := b[i]
if a_elt != b_elt {
return false
}
}
return true
}
// Parse the output of 'sudo netstat --tcp --program'
var expectedHeaderFields = []string{
"Proto", "Recv-Q", "Send-Q", "Local", "Address", "Foreign",
"Address", "State", "PID/Program", "name"}
func parseNetstatOutput(output io.Reader) ([]Connection, error) {
lines := bufio.NewScanner(output)
lines.Scan()
if lines.Text() != "Active Internet connections (w/o servers)" {
return nil, fmt.Errorf("Unexpected line 1 of netstat output: %s", lines.Text())
}
lines.Scan()
header := lines.Text()
if !stringSlicesEqual(strings.Fields(header), expectedHeaderFields) {
return nil, fmt.Errorf("Unexpected line 2 of netstat output: %s", header)
}
var result []Connection
for lines.Scan() {
var proto, recv_q, send_q, local_address, remote_address, state, pid_name string
// There may be extra space-separated groups at the
// end of the line. Deliberately ignore them.
n, err := fmt.Sscan(lines.Text(), &proto, &recv_q, &send_q, &local_address,
&remote_address, &state, &pid_name)
if n < 7 {
return nil, fmt.Errorf("Couldn't scan netstat output line: %s", lines.Text())
}
if err != nil {
return nil, err
}
localHost, localPort, err := hostAndPort(local_address)
if err != nil {
return nil, err
}
remoteHost, remotePort, err := hostAndPort(remote_address)
if err != nil {
return nil, err
}
parts := strings.Split(pid_name, "/")
// parts[0] will be the pid
var pid int
if parts[0] == "-" {
pid = 0
} else {
pid, err = strconv.Atoi(parts[0])
if err != nil {
return nil, err
}
}
result = append(result, Connection{
protocol: proto,
localHost: localHost,
localPort: localPort,
remoteHost: remoteHost,
remotePort: remotePort,
connectionState: state,
pid: pid,
})
}
return result, nil
}
// Get open TCP connections from the namespace of pid, in the format of parseNetstatOutput
func getConnectionsFromNamespace(pid string) ([]Connection, error) {
ctx, _ := context.WithTimeout(context.Background(), subprocessTimeout)
netstatOutput, err := exec.CommandContext(ctx, "nsenter", "-t", pid, "-n", "netstat", "--tcp", "--program").Output()
if err != nil {
return nil, err
}
return parseNetstatOutput(strings.NewReader(string(netstatOutput)))
}

119
netstat_test.go Normal file
Просмотреть файл

@ -0,0 +1,119 @@
package main
import (
"strings"
"testing"
)
// Call t.Errorf(message) if a != b
func expectEqual(t *testing.T, a, b interface{}, message string) {
if a != b {
t.Errorf(message)
}
}
func TestHostAndPort(t *testing.T) {
host, port, _ := hostAndPort("127.0.0.1:234")
expectEqual(t, host, "127.0.0.1", "Unexpected host from 127.0.0.1:234")
expectEqual(t, port, "234", "Unexpected port from 127.0.0.1:234")
host, port, _ = hostAndPort("foo.com:https")
expectEqual(t, host, "foo.com", "Unexpected host from foo.com:https")
expectEqual(t, port, "https", "Unexpected port from foo.com:https")
host, port, _ = hostAndPort("[::16:5]:578")
expectEqual(t, host, "[::16:5]", "Unexpected host from [::16:5]:578")
expectEqual(t, port, "578", "Unexpected port from [::16:5]:578")
}
// This should match the output format of 'sudo netstat --tcp --program'
const netstatOutput = `Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 kube-node-1:2960 10.0.1.2:https TIME_WAIT -
tcp 0 0 kube-node-1:9502 10.0.3.4:https ESTABLISHED 36/abcd
tcp 0 0 kube-node-1:4587 10.0.5.6:8685 TIME_WAIT -
tcp 0 0 kube-node-1:0178 10.0.7.8:http-alt TIME_WAIT -
tcp 0 0 kube-node-1:ssh 10.0.9.10:3920 ESTABLISHED 9486/sshd: user
tcp 0 0 kube-node-1:5639 kube-node-12:http-alt TIME_WAIT -
tcp6 0 0 kube-node-1:1234 kube-node-15:9294 TIME_WAIT -
tcp6 0 0 kube-node-1:9168 [::16:5:3]:298 TIME_WAIT -`
var netstatExpectedParse = [8]Connection{
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "2960",
remoteHost: "10.0.1.2",
remotePort: "https",
connectionState: "TIME_WAIT",
pid: 0},
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "9502",
remoteHost: "10.0.3.4",
remotePort: "https",
connectionState: "ESTABLISHED",
pid: 36},
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "4587",
remoteHost: "10.0.5.6",
remotePort: "8685",
connectionState: "TIME_WAIT",
pid: 0},
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "0178",
remoteHost: "10.0.7.8",
remotePort: "http-alt",
connectionState: "TIME_WAIT",
pid: 0},
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "ssh",
remoteHost: "10.0.9.10",
remotePort: "3920",
connectionState: "ESTABLISHED",
pid: 9486},
Connection{protocol: "tcp",
localHost: "kube-node-1",
localPort: "5639",
remoteHost: "kube-node-12",
remotePort: "http-alt",
connectionState: "TIME_WAIT",
pid: 0},
Connection{protocol: "tcp6",
localHost: "kube-node-1",
localPort: "1234",
remoteHost: "kube-node-15",
remotePort: "9294",
connectionState: "TIME_WAIT",
pid: 0},
Connection{protocol: "tcp6",
localHost: "kube-node-1",
localPort: "9168",
remoteHost: "[::16:5:3]",
remotePort: "298",
connectionState: "TIME_WAIT",
pid: 0},
}
func TestParseNetstatOutput(t *testing.T) {
connections, err := parseNetstatOutput(strings.NewReader(netstatOutput))
if err != nil {
t.Logf("Got error %v from parse_netstat_output", err)
t.FailNow()
}
if len(connections) != len(netstatExpectedParse) {
t.Logf("Got %v connections, expected %v", len(connections), len(netstatExpectedParse))
t.FailNow()
}
for i, expected := range netstatExpectedParse {
if expected != connections[i] {
t.Errorf("Got connection %v, expected %v", connections[i], expected)
}
}
}

57
print_table.go Normal file
Просмотреть файл

@ -0,0 +1,57 @@
package main
import (
"bufio"
"fmt"
"io"
)
type Fielder interface {
// Return the fields of this object, as strings
Fields() []string
}
func max(a, b int) int {
if a < b {
return b
}
return a
}
// Print a table. The fields will print in the order returned by
// Fielder.Fields(). header is the first row of fields to print, which
// can be used for column titles.
func prettyPrintTable(rows []Fielder, header []string, f io.Writer) {
w := bufio.NewWriter(f)
fieldWidths := make([]int, len(header))
for i, field := range header {
fieldWidths[i] = len(field)
}
// Get the widths of all fields
for _, row := range rows {
for i, field := range row.Fields() {
fieldWidths[i] = max(fieldWidths[i], len(field))
}
}
// Add 2 spaces in between columns
for i := 0; i < len(fieldWidths)-1; i++ {
fieldWidths[i] += 2
}
// Print the table, with appropriate spacing
for i, field := range header {
// Write field, left-padded to width fieldWidths[i]
fmt.Fprintf(w, "%-*s", fieldWidths[i], field)
}
w.WriteString("\n")
for _, row := range rows {
for i, field := range row.Fields() {
fmt.Fprintf(w, "%-*s", fieldWidths[i], field)
}
w.WriteString("\n")
}
w.Flush()
}

36
print_table_test.go Normal file
Просмотреть файл

@ -0,0 +1,36 @@
package main
import (
"bytes"
"testing"
)
type TestTable struct {
a string
b string
c string
}
func (t TestTable) Fields() []string {
return []string{t.a, t.b, t.c}
}
const expectedTable = `AAA B C
a b cc
aaa b c
`
func TestPrettyPrintTable(t *testing.T) {
var buf bytes.Buffer
table := []Fielder{
&TestTable{a: "a", b: "b", c: "cc"},
&TestTable{a: "aaa", b: "b", c: "c"},
}
prettyPrintTable(table, []string{"AAA", "B", "C"}, &buf)
written := buf.String()
if written != expectedTable {
t.Errorf("prettyPrintTable wrote %#v, expected %#v", written, expectedTable)
}
}