From 2c08713a45457821543b2cd4c990b222ad367c71 Mon Sep 17 00:00:00 2001 From: Di Xu Date: Tue, 11 Dec 2018 12:58:39 +0800 Subject: [PATCH] refactor job-exporter (#1840) * refactor job-exporter * select a network interface (#1849) * do not use irate in task_net* and service_net* metric (#1859) --- .travis.yml | 6 +- .../services-configuration.yaml.template | 6 + docs/alerting/README.md | 16 +- docs/alerting/exporter-metrics.md | 21 +- .../pai-jobview-dashboard.json.template | 18 +- .../pai-serviceview-dashboard.json.template | 4 +- .../pai-taskroleview-dashboard.json.template | 18 +- .../pai-taskview-dashboard.json.template | 22 +- .../build/job-exporter.dockerfile | 10 +- src/job-exporter/config/job-exporter.md | 73 +++ src/job-exporter/config/job-exporter.yaml | 20 + src/job-exporter/config/job_exporter.py | 30 + .../{test/logging.yaml => deploy/delete.sh} | 26 +- .../deploy/job-exporter.yaml.template | 107 +++ src/job-exporter/deploy/refresh.sh | 25 + src/job-exporter/deploy/service.yaml | 33 + src/job-exporter/deploy/start.sh | 29 + src/job-exporter/deploy/stop.sh | 22 + src/job-exporter/src/collector.py | 617 ++++++++++++++++++ src/job-exporter/src/docker_inspect.py | 48 +- src/job-exporter/src/docker_stats.py | 45 +- src/job-exporter/src/job_exporter.py | 391 ----------- src/job-exporter/src/main.py | 156 +++++ src/job-exporter/src/network.py | 127 +++- .../src/{gpu_exporter.py => nvidia.py} | 47 +- src/job-exporter/src/utils.py | 127 +--- src/job-exporter/test/base.py | 28 + src/job-exporter/test/test_collector.py | 180 +++++ src/job-exporter/test/test_docker_inspect.py | 33 +- src/job-exporter/test/test_docker_stats.py | 37 +- src/job-exporter/test/test_gpu_exporter.py | 89 --- src/job-exporter/test/test_job_exporter.py | 133 ---- .../no_older_than.py => test/test_network.py} | 37 +- src/job-exporter/test/test_nvidia.py | 50 ++ src/job-exporter/test/test_utils.py | 140 +--- src/node-exporter/config/node_exporter.py | 2 - .../deploy/node-exporter.yaml.template | 87 +-- src/node-exporter/deploy/service.yaml | 1 - .../deploy/alerting/pai-services.rules | 8 + .../deploy/prometheus-configmap.yaml.template | 14 +- src/watchdog/deploy/watchdog.yaml.template | 3 +- src/watchdog/src/watchdog.py | 1 - 42 files changed, 1667 insertions(+), 1220 deletions(-) create mode 100644 src/job-exporter/config/job-exporter.md create mode 100644 src/job-exporter/config/job-exporter.yaml create mode 100644 src/job-exporter/config/job_exporter.py rename src/job-exporter/{test/logging.yaml => deploy/delete.sh} (72%) create mode 100644 src/job-exporter/deploy/job-exporter.yaml.template create mode 100644 src/job-exporter/deploy/refresh.sh create mode 100644 src/job-exporter/deploy/service.yaml create mode 100644 src/job-exporter/deploy/start.sh create mode 100644 src/job-exporter/deploy/stop.sh create mode 100644 src/job-exporter/src/collector.py delete mode 100644 src/job-exporter/src/job_exporter.py create mode 100644 src/job-exporter/src/main.py rename src/job-exporter/src/{gpu_exporter.py => nvidia.py} (61%) create mode 100644 src/job-exporter/test/base.py create mode 100644 src/job-exporter/test/test_collector.py delete mode 100644 src/job-exporter/test/test_gpu_exporter.py delete mode 100644 src/job-exporter/test/test_job_exporter.py rename src/job-exporter/{src/no_older_than.py => test/test_network.py} (57%) create mode 100644 src/job-exporter/test/test_nvidia.py diff --git a/.travis.yml b/.travis.yml index f567ae68c..b4aaabe4d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -35,13 +35,13 @@ matrix: - python -m unittest deployment.clusterObjectModel.test.test_template_generate - language: python - python: 2.7 + python: 3.6 before_install: - cd src/job-exporter/test install: - - pip install pyyaml + - pip install prometheus_client script: - - python -m unittest discover . + - python3 -m unittest discover . - language: python python: 2.7 diff --git a/deployment/quick-start/services-configuration.yaml.template b/deployment/quick-start/services-configuration.yaml.template index da053072d..58f153b3f 100644 --- a/deployment/quick-start/services-configuration.yaml.template +++ b/deployment/quick-start/services-configuration.yaml.template @@ -93,6 +93,12 @@ rest-server: # node-exporter: # port: 9100 +# uncomment following if you want to customeize job-exporter +# job-exporter: +# port: 9102 +# logging-level: INFO +# interface: eth0,eno2 + # if you want to enable alert manager to send alert email, uncomment following lines and fill # the right values. # alert-manager: diff --git a/docs/alerting/README.md b/docs/alerting/README.md index 26642dbc3..baf6df144 100644 --- a/docs/alerting/README.md +++ b/docs/alerting/README.md @@ -60,14 +60,14 @@ Other pai component also used some metrics for display, they are:
  • node_network_transmit_bytes_total
  • node_disk_read_bytes_total
  • node_disk_written_bytes_total
  • -
  • container_CPUPerc
  • -
  • container_MemUsage
  • -
  • container_NetIn
  • -
  • container_NetOut
  • -
  • container_BlockIn
  • -
  • container_BlockOut
  • -
  • container_GPUPerc
  • -
  • container_GPUMemPerc
  • +
  • task_cpu_percent
  • +
  • task_mem_usage_byte
  • +
  • task_net_in_byte
  • +
  • task_net_out_byte
  • +
  • task_block_in_byte
  • +
  • task_block_out_byte
  • +
  • task_gpu_percent
  • +
  • task_gpu_mem_percent
  • diff --git a/docs/alerting/exporter-metrics.md b/docs/alerting/exporter-metrics.md index f6798ba15..421b67dc0 100644 --- a/docs/alerting/exporter-metrics.md +++ b/docs/alerting/exporter-metrics.md @@ -5,19 +5,18 @@ Some important metrics are listed below. | Metrics Name | By | Description | | --- | --- | --- | | `configured_gpu_count` | `job_exporter` | number of gpu configured by user | -| `nvidiasmi_attached_gpus` | `job_exporter` | number of gpu detectived by nvidiasmi, this metric may change due to nvidia-smi hangs | | `nvidiasmi_utilization_gpu` | `job_exporter` | GPU utilization detectived by nvidiasmi | | `nvidiasmi_utilization_memory` | `job_exporter` | GPU memory utilization detectiving by nvidiasmi | -| `container_GPUPerc` | `job_exporter` | GPU utilization by specified job container | -| `container_GPUMemPerc` | `job_exporter` | GPU memory utilization by specified job container | -| `container_CPUPerc` | `job_exporter` | CPU utilization by job detectived by docker stats | -| `container_MemUsage` | `job_exporter` | Memory usage by job detectived by docker stats (byte) | -| `container_MemLimit` | `job_exporter` | Memory limit by job detectived by docker stats (byte) | -| `container_MemPerc` | `job_exporter` | Memory utilization by job detectived by docker stats | -| `container_NetIn` | `job_exporter` | Network in traffic by job detectived by docker stats (byte) | -| `container_NetOut` | `job_exporter` | Network out traffic by job detectived by docker stats (byte) | -| `container_BlockIn` | `job_exporter` | Block io in traffic by job detectived by docker stats (byte) | -| `container_BlockOut` | `job_exporter` | Block io out traffic by job detectived by docker stats (byte) | +| `task_gpu_percent` | `job_exporter` | GPU utilization by specified job container | +| `task_gpu_mem_percent` | `job_exporter` | GPU memory utilization by specified job container | +| `task_cpu_percent` | `job_exporter` | CPU utilization by job detectived by docker stats | +| `task_mem_usage_byte` | `job_exporter` | Memory usage by job detectived by docker stats (byte) | +| `task_mem_limit_byte` | `job_exporter` | Memory limit by job detectived by docker stats (byte) | +| `task_mem_usage_percent` | `job_exporter` | Memory utilization by job detectived by docker stats | +| `task_net_in_byte` | `job_exporter` | Network in traffic by job detectived by docker stats (byte) | +| `task_net_out_byte` | `job_exporter` | Network out traffic by job detectived by docker stats (byte) | +| `task_block_in_byte` | `job_exporter` | Block io in traffic by job detectived by docker stats (byte) | +| `task_block_out_byte` | `job_exporter` | Block io out traffic by job detectived by docker stats (byte) | | `node_filefd_allocated` | `node_exporter` | Number of file descriptor allocated in node | | `node_disk_read_time_ms` | `node_exporter` | Node disk read time (ms) | | `node_disk_write_time_ms` | `node_exporter` | Node disk write time (ms) | diff --git a/src/grafana/deploy/grafana-configuration/pai-jobview-dashboard.json.template b/src/grafana/deploy/grafana-configuration/pai-jobview-dashboard.json.template index de35a4b26..447104b03 100644 --- a/src/grafana/deploy/grafana-configuration/pai-jobview-dashboard.json.template +++ b/src/grafana/deploy/grafana-configuration/pai-jobview-dashboard.json.template @@ -91,7 +91,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME)(container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"}) ", + "expr": "avg by (container_label_PAI_JOB_NAME)(task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"}) ", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -191,7 +191,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME) (container_MemUsage{ container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME) (task_mem_usage_byte{ container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "hide": false, "instant": false, @@ -283,7 +283,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])) ", + "expr": "avg by (container_label_PAI_JOB_NAME) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -294,7 +294,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])) ", + "expr": "avg by (container_label_PAI_JOB_NAME) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "format": "time_series", "hide": false, "interval": "", @@ -390,7 +390,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_BlockIn{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME) (irate(task_block_in_byte{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -401,7 +401,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_BlockOut{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME) (irate(task_block_out_byte{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "format": "time_series", "intervalFactor": 2, "legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}", @@ -480,7 +480,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -563,7 +563,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -637,7 +637,7 @@ "multiFormat": "regex values", "name": "job", "options": [], - "query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)", + "query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)", "refresh": 1, "regex": "^(?!\\s*$).+", "sort": 1, diff --git a/src/grafana/deploy/grafana-configuration/pai-serviceview-dashboard.json.template b/src/grafana/deploy/grafana-configuration/pai-serviceview-dashboard.json.template index d3ae9f846..38021402e 100644 --- a/src/grafana/deploy/grafana-configuration/pai-serviceview-dashboard.json.template +++ b/src/grafana/deploy/grafana-configuration/pai-serviceview-dashboard.json.template @@ -224,7 +224,7 @@ "steppedLine": false, "targets": [ { - "expr": "irate(service_net_in_byte{name=\"$service_name\"}[{{interval}}s])", + "expr": "service_net_in_byte{name=\"$service_name\"}[{{interval}}s]", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -232,7 +232,7 @@ "refId": "A" }, { - "expr": "irate(service_net_out_byte{name=\"$service_name\"}[{{interval}}s])", + "expr": "service_net_out_byte{name=\"$service_name\"}[{{interval}}s]", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{'{{'}}instance{{'}}'}} Outbound", diff --git a/src/grafana/deploy/grafana-configuration/pai-taskroleview-dashboard.json.template b/src/grafana/deploy/grafana-configuration/pai-taskroleview-dashboard.json.template index f6ab7c9a1..e6c3f9f39 100644 --- a/src/grafana/deploy/grafana-configuration/pai-taskroleview-dashboard.json.template +++ b/src/grafana/deploy/grafana-configuration/pai-taskroleview-dashboard.json.template @@ -108,7 +108,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -208,7 +208,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_MemUsage{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_mem_usage_byte{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "hide": false, "instant": false, @@ -300,7 +300,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "interval": "", "intervalFactor": 2, "legendFormat": "In {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}", @@ -310,7 +310,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "format": "time_series", "hide": false, "interval": "", @@ -406,7 +406,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_BlockIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(task_block_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "interval": "", "intervalFactor": 2, "legendFormat": "Write {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}", @@ -416,7 +416,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_BlockOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(task_block_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "intervalFactor": 2, "legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}", "refId": "B" @@ -494,7 +494,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -592,7 +592,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}", @@ -665,7 +665,7 @@ "multiFormat": "regex values", "name": "job", "options": [], - "query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)", + "query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)", "refresh": 1, "regex": "^(?!\\s*$).+", "sort": 1, diff --git a/src/grafana/deploy/grafana-configuration/pai-taskview-dashboard.json.template b/src/grafana/deploy/grafana-configuration/pai-taskview-dashboard.json.template index 475ab7e60..3a82e302a 100644 --- a/src/grafana/deploy/grafana-configuration/pai-taskview-dashboard.json.template +++ b/src/grafana/deploy/grafana-configuration/pai-taskview-dashboard.json.template @@ -96,7 +96,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -196,7 +196,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (container_MemUsage{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_mem_usage_byte{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "hide": false, "instant": false, @@ -288,7 +288,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -299,7 +299,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])", "format": "time_series", "hide": false, "interval": "", @@ -395,7 +395,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(container_BlockIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(task_block_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -406,7 +406,7 @@ "target": "" }, { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(container_BlockOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(task_block_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))", "format": "time_series", "intervalFactor": 2, "legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_env_PAI_TASK_INDEX}}'}}", @@ -485,7 +485,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -568,7 +568,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -673,7 +673,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX, minor_number)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX, minor_number)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "format": "time_series", "hide": false, "instant": false, @@ -771,7 +771,7 @@ "steppedLine": false, "targets": [ { - "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX, minor_number) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})", + "expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX, minor_number) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})", "hide": false, "instant": false, "interval": "", @@ -850,7 +850,7 @@ "multiFormat": "regex values", "name": "job", "options": [], - "query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)", + "query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)", "refresh": 1, "regex": "^(?!\\s*$).+", "sort": 1, diff --git a/src/job-exporter/build/job-exporter.dockerfile b/src/job-exporter/build/job-exporter.dockerfile index e7927ab7d..1b39c25f3 100644 --- a/src/job-exporter/build/job-exporter.dockerfile +++ b/src/job-exporter/build/job-exporter.dockerfile @@ -15,13 +15,13 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -FROM python:2.7 +FROM python:3.7 RUN apt-get update && apt-get install --no-install-recommends -y build-essential git && \ git clone https://github.com/yadutaf/infilter --depth 1 && \ cd infilter && make -FROM python:2.7 +FROM python:3.7 RUN curl -SL https://download.docker.com/linux/static/stable/x86_64/docker-17.06.2-ce.tgz \ | tar -xzvC /usr/local && \ @@ -30,7 +30,7 @@ RUN curl -SL https://download.docker.com/linux/static/stable/x86_64/docker-17.06 mkdir -p /job_exporter && \ rm -rf /var/lib/apt/lists/* -COPY --from=0 infilter/infilter /usr/bin -COPY src/* /job_exporter/ +RUN pip3 install prometheus_client -CMD python /job_exporter/job_exporter.py /datastorage/prometheus 30 +COPY --from=0 infilter/infilter /usr/bin +COPY src/*.py /job_exporter/ diff --git a/src/job-exporter/config/job-exporter.md b/src/job-exporter/config/job-exporter.md new file mode 100644 index 000000000..1da8c2c84 --- /dev/null +++ b/src/job-exporter/config/job-exporter.md @@ -0,0 +1,73 @@ +## Job-exporter section parser + +- [Default Configuration](#D_Config) +- [How to Configure](#HT_Config) +- [Generated Configuraiton](#G_Config) +- [Data Table](#T_config) + +#### Default configuration + +[job-exporter default configuration](job-exporter.yaml) + +#### How to configure cluster section in service-configuraiton.yaml + +All configurations in this section is optional. If you want to customized these value, you can configure it in service-configuration.yaml. + +For example, if you want to use different port than the default 9102, add following to your service-configuration.yaml as following: +```yaml +job-exporter: + port: new-value + logging-level: DEBUG +``` + +job-exporter needs to expose network related metrics by listening to one of network interface. +By default, job-exporter will listen to card of eth0 or eno2 whichever it found in the node. +But if your card name is different from these, you should edit job-exporter config in your +service-configuration.yaml like following: +```yaml +job-exporter: + interface: eth1,eno1 +``` + +the interface field is comma separated string, and job-exporter will listen to the one that found in your node. If none of interfaces found, job-exporter will select one that can access the internet. + + +#### Generated Configuration + +Generated configuration means the object model after parsing. The parsed data will be presented by a yaml format. +```yaml +job-exporter: + port: 9100 + logging-level: DEBUG + interface: eth0,eno2 +``` + + +#### Table + + + + + + + + + + + + + + + + + + + + + + + + + + +
    Data in Configuration FileData in Cluster Object ModelData in Jinja2 TemplateData type
    job-exporter.portcom["job-exporter"]["port"]cluster_cfg["job-exporter"]["port"]Int
    job-exporter.logging-levelcom["job-exporter"]["logging-level"]cluster_cfg["job-exporter"]["logging-level"]String
    job-exporter.interfacecom["job-exporter"]["interface"]cluster_cfg["job-exporter"]["interface"]String
    diff --git a/src/job-exporter/config/job-exporter.yaml b/src/job-exporter/config/job-exporter.yaml new file mode 100644 index 000000000..e895fafc0 --- /dev/null +++ b/src/job-exporter/config/job-exporter.yaml @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation +# # All rights reserved. +# # +# # MIT License +# # +# # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# # documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# # the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# # to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# # The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# # +# # THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# # BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +port: 9102 +logging-level: INFO +interface: eth0,eno2 diff --git a/src/job-exporter/config/job_exporter.py b/src/job-exporter/config/job_exporter.py new file mode 100644 index 000000000..54307dda1 --- /dev/null +++ b/src/job-exporter/config/job_exporter.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +import copy + +class JobExporter(object): + def __init__(self, cluster_conf, service_conf, default_service_conf): + self.cluster_conf = cluster_conf + self.service_conf = service_conf + self.default_service_conf = default_service_conf + + def validation_pre(self): + return True, None + + def run(self): + result = copy.deepcopy(self.default_service_conf) + result.update(self.service_conf) + return result + + def validation_post(self, conf): + port = conf["job-exporter"].get("port") + if type(port) != int: + msg = "expect port in job-exporter to be int but get %s with type %s" % \ + (port, type(port)) + return False, msg + level = conf["job-exporter"].get("logging-level") + if level not in {"DEBUG", "INFO", "WARNING"}: + msg = "expect logging-level in job-exporter to be {'DEBUG', 'INFO', 'WARNING'} but got %s" % \ + (level) + return False, msg + return True, None diff --git a/src/job-exporter/test/logging.yaml b/src/job-exporter/deploy/delete.sh similarity index 72% rename from src/job-exporter/test/logging.yaml rename to src/job-exporter/deploy/delete.sh index c2be3fa2d..1b952d11c 100644 --- a/src/job-exporter/test/logging.yaml +++ b/src/job-exporter/deploy/delete.sh @@ -1,3 +1,5 @@ +#!/bin/bash + # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -15,26 +17,10 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -version: 1 -disable_existing_loggers: False -formatters: - pai_k8sdp: - format: "%(asctime)s [%(levelname)s] - %(name)s : %(message)s" +pushd $(dirname "$0") > /dev/null -handlers: - console: - class: logging.StreamHandler - level: DEBUG - formatter: pai_k8sdp - stream: ext://sys.stdout - -loggers: - unittest_module: - level: INFO - handler: [console] - propagate: no +echo "Call stop script to stop all service first" +/bin/bash stop.sh || exit $? -root: - level: INFO - handlers: [console] \ No newline at end of file +popd > /dev/null \ No newline at end of file diff --git a/src/job-exporter/deploy/job-exporter.yaml.template b/src/job-exporter/deploy/job-exporter.yaml.template new file mode 100644 index 000000000..f562946ba --- /dev/null +++ b/src/job-exporter/deploy/job-exporter.yaml.template @@ -0,0 +1,107 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: job-exporter +spec: + selector: + matchLabels: + app: job-exporter + template: + metadata: + labels: + app: job-exporter + annotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/" + prometheus.io/port: "{{ cluster_cfg["job-exporter"]["port"] }}" + name: job-exporter + spec: + containers: + - image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}job-exporter:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }} + imagePullPolicy: Always + readinessProbe: + tcpSocket: # because http get will trigger job-exporter abandom old metrics, so we use tcp instead of http + port: {{ cluster_cfg["job-exporter"]["port"] }} + initialDelaySeconds: 3 + periodSeconds: 30 + command: + - "python" + - "/job_exporter/main.py" + - "--port" + - "{{ cluster_cfg["job-exporter"]["port"] }}" + - "--interval" + - "{{ cluster_cfg["prometheus"]["scrape_interval"] }}" + - "--interface" + - "{{ cluster_cfg["job-exporter"]["interface"] }}" + resources: + limits: + memory: "128Mi" + securityContext: + privileged: true # this is required since job-exporter will call setns to other containers + env: + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: LOGGING_LEVEL + value: {{ cluster_cfg["job-exporter"]["logging-level"] }} + - name: NV_DRIVER + value: /var/drivers/nvidia/current + volumeMounts: + - mountPath: /var/run/docker.sock + name: docker-socket + - mountPath: /dev + name: device-mount + - mountPath: /var/drivers/nvidia/current + name: driver-path + - mountPath: /datastorage/prometheus + name: collector-mount + - mountPath: /gpu-config + name: gpu-config + name: job-exporter + ports: + - containerPort: {{ cluster_cfg["job-exporter"]["port"] }} + hostPort: {{ cluster_cfg["job-exporter"]["port"] }} + name: main + volumes: + - name: docker-socket + hostPath: + path: /var/run/docker.sock + - name: device-mount + hostPath: + path: /dev + - name: driver-path + hostPath: + path: /var/drivers/nvidia/current + - name: collector-mount + hostPath: + path: {{ cluster_cfg["cluster"]["common"]["data-path"] }}/prometheus + - name: gpu-config + configMap: + name: gpu-configuration + imagePullSecrets: + - name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }} + hostNetwork: true + hostPID: true # This is required since job-exporter should get list of pid in container + tolerations: + - key: node.kubernetes.io/memory-pressure + operator: "Exists" + - key: node.kubernetes.io/disk-pressure + operator: "Exists" diff --git a/src/job-exporter/deploy/refresh.sh b/src/job-exporter/deploy/refresh.sh new file mode 100644 index 000000000..541b71142 --- /dev/null +++ b/src/job-exporter/deploy/refresh.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +pushd $(dirname "$0") > /dev/null + + + +popd > /dev/null \ No newline at end of file diff --git a/src/job-exporter/deploy/service.yaml b/src/job-exporter/deploy/service.yaml new file mode 100644 index 000000000..e91eff561 --- /dev/null +++ b/src/job-exporter/deploy/service.yaml @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +prerequisite: + - cluster-configuration + - drivers + +template-list: + - job-exporter.yaml + +start-script: start.sh +stop-script: stop.sh +delete-script: delete.sh +refresh-script: refresh.sh +upgraded-script: upgraded.sh + + +deploy-rules: + - notin: no-jobexporter diff --git a/src/job-exporter/deploy/start.sh b/src/job-exporter/deploy/start.sh new file mode 100644 index 000000000..132a65257 --- /dev/null +++ b/src/job-exporter/deploy/start.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +pushd $(dirname "$0") > /dev/null + +kubectl apply --overwrite=true -f job-exporter.yaml || exit $? + +# Wait until the service is ready. +PYTHONPATH="../../../deployment" python -m k8sPaiLibrary.monitorTool.check_pod_ready_status -w -k app -v job-exporter || exit $? + +popd > /dev/null diff --git a/src/job-exporter/deploy/stop.sh b/src/job-exporter/deploy/stop.sh new file mode 100644 index 000000000..9fd93044a --- /dev/null +++ b/src/job-exporter/deploy/stop.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +#!/bin/bash + +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +kubectl delete --ignore-not-found --now "daemonset/job-exporter" diff --git a/src/job-exporter/src/collector.py b/src/job-exporter/src/collector.py new file mode 100644 index 000000000..330540055 --- /dev/null +++ b/src/job-exporter/src/collector.py @@ -0,0 +1,617 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import re +import datetime +import logging +import threading +import subprocess +import time +import copy + +from prometheus_client import make_wsgi_app, Counter, Gauge, Histogram +from prometheus_client.core import GaugeMetricFamily + +import network +import utils +import docker_inspect +import docker_stats +import nvidia + +logger = logging.getLogger(__name__) + + +##### collector will generate following metrics +# Document about these metrics is in `` # TODO + +iteration_counter = Counter("collector_iteration_count", "total number of iteration", + ["name"]) + +def gen_docker_daemon_counter(): + return GaugeMetricFamily("docker_daemon_count", + "count of docker daemon", + labels=["error"]) + +def gen_gpu_util_gauge(): + return GaugeMetricFamily("nvidiasmi_utilization_gpu", + "gpu core utilization of card", + labels=["minor_number"]) + +def gen_gpu_mem_util_gauge(): + return GaugeMetricFamily("nvidiasmi_utilization_memory", + "gpu memory utilization of card", + labels=["minor_number"]) + + +class ResourceGauges(object): + def __init__(self): + self.task_labels = [ + "container_env_PAI_TASK_INDEX", + "container_label_PAI_CURRENT_TASK_ROLE_NAME", + "container_label_PAI_HOSTNAME", + "container_label_PAI_JOB_NAME", + "container_label_PAI_USER_NAME" + ] + self.service_labels = ["name"] + + self.task_labels_gpu = copy.deepcopy(self.task_labels) + self.task_labels_gpu.append("minor_number") + + self.gauges = {} + + self.add_task_and_service_gauge("{0}_cpu_percent", + "how much percent of cpu this {0} used") + self.add_task_and_service_gauge("{0}_mem_usage_byte", + "how much memory this {0} used") + self.add_task_and_service_gauge("{0}_mem_usage_percent", + "how much percent of memory this {0} used") + self.add_task_and_service_gauge("{0}_mem_limit_byte", + "how much memory this {0} are constrained to") + self.add_task_and_service_gauge("{0}_net_in_byte", + "how much network inbound this task used") + self.add_task_and_service_gauge("{0}_net_out_byte", + "how much network outbound this {0} used") + self.add_task_and_service_gauge("{0}_block_in_byte", + "how much block inbound this {0} used") + self.add_task_and_service_gauge("{0}_block_out_byte", + "how much block outbound this {0} used") + + self.add_gauge("task_gpu_percent", + "how much percent of gpu core this task used", + self.task_labels_gpu) + self.add_gauge("task_gpu_mem_percent", + "how much percent of gpu memory this task used", + self.task_labels_gpu) + + def add_task_and_service_gauge(self, name_tmpl, desc_tmpl): + self.add_gauge( + name_tmpl.format("task"), + desc_tmpl.format("task"), + self.task_labels) + + self.add_gauge( + name_tmpl.format("service"), + desc_tmpl.format("service"), + self.service_labels) + + def add_gauge(self, name, desc, labels): + self.gauges[name] = GaugeMetricFamily(name, desc, labels=labels) + + def add_value(self, metric_name, labels, val): + if metric_name not in self.gauges: + raise RuntimeError( + "{0} not found in gauges, all gauge names is {1}".format( + metric_name, ",".join(self.gauges.keys()))) + + gauge = self.gauges[metric_name] + + # because prometheus library requires label provided as array, we + # preprocess the labels and check any missing labels + label_array = [None] * len(gauge._labelnames) + + for k, v in labels.items(): + try: + index = gauge._labelnames.index(k) + label_array[index] = v + except ValueError: + logger.warning("unknown label %s with value %s for metrics %s", + k, v, metric_name) + continue + + for i, label_val in enumerate(label_array): + if label_val is None: + logger.error( + "not provided %s as label value for metric %s, ignore this metric", + gauge._labelnames[i], metric_name) + return + + gauge.add_metric(label_array, val) + + def as_array(self): + return self.gauges.values() + +##### + +class AtomicRef(object): + """ a thread safe way to store and get object, + should not modify data get from this ref """ + def __init__(self): + self.data = None + self.lock = threading.RLock() + + def get_and_set(self, new_data): + data = None + with self.lock: + data, self.data = self.data, new_data + return data + + def get(self): + with self.lock: + return self.data + + +class Collector(object): + """ collector is a model running in thread and responsible for collecting + some metrics, we use thread because we do not want to let hanging in one + collector can not have impact on other collectors. This is base class, + real collector should inhernit this class and implement collect_impl, + metrics are returned as an array.""" + def __init__(self, name, sleep_time, atomic_ref, iteration_counter): + self.name = name + self.sleep_time = sleep_time + self.atomic_ref = atomic_ref + self.iteration_counter = iteration_counter + + histogram_key = "collector_%s_iteration_lantecy_seconds" % self.name + histogram_desc = "latency for execute one interation of %s collector (seconds)" % \ + self.name + self.collector_histogram = Histogram(histogram_key, histogram_desc) + + logger.debug("init %s with sleep_time %d", self.name, self.sleep_time) + + def collect(self): + while True: + logger.debug("collecting metrics from %s", self.name) + + with self.collector_histogram.time(): + self.iteration_counter.labels(name=self.name).inc() + try: + self.atomic_ref.get_and_set(self.collect_impl()) + except Exception as e: + logger.exception("%s collector get an exception", self.name) + + logger.debug("finished collect metrcis from %s, will sleep for %s", + self.name, self.sleep_time) + + time.sleep(self.sleep_time) + + def collect_impl(self): + """ implementations are expected to return an array of + prometheus_client's metrics or None on exception """ + pass + + +def instantiate_collector(name, sleep_time, collector_class, *args): + """ test cases helper fn to instantiate a collector """ + atomic_ref = AtomicRef() + return atomic_ref, collector_class(name, sleep_time, atomic_ref, iteration_counter, *args) + + +def make_collector(name, sleep_time, collector_class, *args): + """ other module should use this fn to init a collector, this fn start a thread + to run the collector and return an atomic_ref so outside world can get metrics + collected by this collector """ + atomic_ref, instance = instantiate_collector(name, sleep_time, collector_class, *args) + + t = threading.Thread( + target=instance.collect, + name=name, + args=(), + daemon=True) + + t.start() + + return atomic_ref + + +class DockerCollector(Collector): + cmd_histogram = Histogram("cmd_docker_active_latency_seconds", + "Command call latency for checking docker daemon activeness (seconds)") + + cmd_timeout = 1 # 99th latency is 0.01s + + def collect_impl(self): + cmd = """ + systemctl is-active docker > /dev/null 2>&1 ; + if [ $? -eq 0 ]; then echo "true"; else echo "false" ; fi + """ + error = "ok" + + try: + out = utils.exec_cmd(cmd, shell=True, + histogram=DockerCollector.cmd_histogram, + timeout=DockerCollector.cmd_timeout) + + if "true" not in str(out): + error = "inactive" + except subprocess.CalledProcessError as e: + logger.exception("command '%s' return with error (code %d): %s", + cmd, e.returncode, e.output) + error = e.strerror() + except subprocess.TimeoutExpired as e: + logging.warning("check docker active timeout") + error = "timeout" + except Exception as e: + error = e.strerror() + + counter = gen_docker_daemon_counter() + counter.add_metric([error], 1) + + return [counter] + + +class GpuCollector(Collector): + cmd_histogram = Histogram("cmd_nvidia_smi_latency_seconds", + "Command call latency for nvidia-smi (seconds)") + + cmd_timeout = 3 # 99th latency is 0.97s + + def __init__(self, name, sleep_time, atomic_ref, iteration_counter, gpu_info_ref): + Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter) + self.gpu_info_ref = gpu_info_ref + + def collect_impl(self): + gpu_info = nvidia.nvidia_smi(GpuCollector.cmd_histogram, + GpuCollector.cmd_timeout) + + logging.debug("get gpu_info %s", gpu_info) + + self.gpu_info_ref.get_and_set(gpu_info) + + if gpu_info is not None: + core_utils = gen_gpu_util_gauge() + mem_utils = gen_gpu_mem_util_gauge() + + for minor, info in gpu_info.items(): + core_utils.add_metric([minor], info["gpu_util"]) + mem_utils.add_metric([minor], info["gpu_mem_util"]) + + return [core_utils, mem_utils] + + return None + + +class ContainerCollector(Collector): + stats_histogram = Histogram("cmd_docker_stats_latency_seconds", + "Command call latency for docker stats (seconds)") + stats_timeout = 20 + # 99th latency may larger than 10s, + # Because prometheus's largest bucket for recording histogram is 10s, + # we can not get value higher than 10s. + + inspect_histogram = Histogram("cmd_docker_inspect_latency_seconds", + "Command call latency for docker inspect (seconds)") + inspect_timeout = 1 # 99th latency is 0.042s + + iftop_histogram = Histogram("cmd_iftop_latency_seconds", + "Command call latency for iftop (seconds)") + iftop_timeout = 10 # 99th latency is 7.4s + + lsof_histogram = Histogram("cmd_lsof_latency_seconds", + "Command call latency for lsof (seconds)") + lsof_timeout = 2 # 99th latency is 0.5s + + pai_services = list(map(lambda s: "k8s_" + s, [ + "rest-server", + "pylon", + "webportal", + "grafana", + "prometheus", + "alertmanager", + "watchdog", + "end-to-end-test", + "yarn-frameworklauncher", + "hadoop-jobhistory-service", + "hadoop-name-node", + "hadoop-node-manager", + "hadoop-resource-manager", + "hadoop-data-node", + "zookeeper", + "node-exporter", + "job-exporter", + "yarn-exporter", + "nvidia-drivers" + ])) + + def __init__(self, name, sleep_time, atomic_ref, iteration_counter, gpu_info_ref, + stats_info_ref, interface): + Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter) + self.gpu_info_ref = gpu_info_ref + self.stats_info_ref = stats_info_ref + + self.network_interface = network.try_to_get_right_interface(interface) + logger.info("found %s as potential network interface to listen network traffic", + self.network_interface) + + # k8s will prepend "k8s_" to pod name. There will also be a container name + # prepend with "k8s_POD_" which is a docker container used to construct + # network & pid namespace for specific container. These container prepend + # with "k8s_POD" consume nothing. + + def collect_impl(self): + all_conns = network.iftop(self.network_interface, + ContainerCollector.iftop_histogram, + ContainerCollector.iftop_timeout) + + # set it to None so if nvidia-smi hangs till next time we get, + # we will get None + gpu_infos = self.gpu_info_ref.get_and_set(None) + + stats_obj = docker_stats.stats(ContainerCollector.stats_histogram, + ContainerCollector.stats_timeout) + self.stats_info_ref.get_and_set(stats_obj) + + logger.debug("all_conns is %s, gpu_info is %s, stats_obj is %s", + all_conns, gpu_infos, stats_obj) + + return self.collect_container_metrics(stats_obj, gpu_infos, all_conns) + + @staticmethod + def parse_from_labels(labels): + gpu_ids = [] + other_labels = {} + + for key, val in labels.items(): + if "container_label_GPU_ID" == key: + s2 = val.replace("\"", "").split(",") + for id in s2: + if id: + gpu_ids.append(id) + else: + other_labels[key] = val + + return gpu_ids, other_labels + + @classmethod + def infer_service_name(cls, container_name): + """ try to infer service name from container_name, if it's container not belongs + to pai service, will return None """ + if container_name.startswith("k8s_POD_"): + # this is empty container created by k8s for pod + return None + + # TODO speed this up, since this is O(n^2) + for service_name in cls.pai_services: + if container_name.startswith(service_name): + return service_name[4:] # remove "k8s_" prefix + + return None + + def process_one_container(self, container_id, stats, gpu_infos, all_conns, gauges): + container_name = utils.walk_json_field_safe(stats, "name") + pai_service_name = ContainerCollector.infer_service_name(container_name) + + inspect_info = docker_inspect.inspect(container_id, + ContainerCollector.inspect_histogram, + ContainerCollector.inspect_timeout) + + pid = utils.walk_json_field_safe(inspect_info, "pid") + inspect_labels = utils.walk_json_field_safe(inspect_info, "labels") + + logger.debug("%s has pid %s, labels %s, service_name %s", + container_name, pid, inspect_labels, pai_service_name) + + if not inspect_labels and pai_service_name is None: + logger.debug("%s is ignored", container_name) + return # other container, maybe kubelet or api-server + + # get network consumption, since all our services/jobs running in host + # network, and network statistic from docker is not specific to that + # container. We have to get network statistic by ourselves. + lsof_result = network.lsof(pid, + ContainerCollector.lsof_histogram, + ContainerCollector.lsof_timeout) + + net_in, net_out = network.get_container_network_metrics(all_conns, + lsof_result) + if logger.isEnabledFor(logging.DEBUG): + debug_info = utils.exec_cmd( + "ps -o cmd fp {0} | tail -n 1".format(pid), + shell=True) + + logger.debug("pid %s with cmd `%s` has lsof result %s, in %d, out %d", + pid, debug_info.strip(), lsof_result, net_in, net_out) + + if pai_service_name is None: + gpu_ids, container_labels = ContainerCollector.parse_from_labels(inspect_info["labels"]) + container_labels.update(inspect_info["env"]) + + if gpu_infos: + for id in gpu_ids: + labels = copy.deepcopy(container_labels) + labels["minor_number"] = id + + gauges.add_value("task_gpu_percent", + labels, gpu_infos[id]["gpu_util"]) + gauges.add_value("task_gpu_mem_percent", + labels, gpu_infos[id]["gpu_mem_util"]) + + gauges.add_value("task_cpu_percent", container_labels, stats["CPUPerc"]) + gauges.add_value("task_mem_usage_byte", container_labels, stats["MemUsage_Limit"]["usage"]) + gauges.add_value("task_mem_limit_byte", container_labels, stats["MemUsage_Limit"]["limit"]) + gauges.add_value("task_net_in_byte", container_labels, net_in) + gauges.add_value("task_net_out_byte", container_labels, net_out) + gauges.add_value("task_block_in_byte", container_labels, stats["BlockIO"]["in"]) + gauges.add_value("task_block_out_byte", container_labels, stats["BlockIO"]["out"]) + gauges.add_value("task_mem_usage_percent", container_labels, stats["MemPerc"]) + else: + labels = {"name": pai_service_name} + gauges.add_value("service_cpu_percent", labels, stats["CPUPerc"]) + gauges.add_value("service_mem_usage_byte", labels, stats["MemUsage_Limit"]["usage"]) + gauges.add_value("service_mem_limit_byte", labels, stats["MemUsage_Limit"]["limit"]) + gauges.add_value("service_mem_usage_percent", labels, stats["MemPerc"]) + gauges.add_value("service_net_in_byte", labels, net_in) + gauges.add_value("service_net_out_byte", labels, net_out) + gauges.add_value("service_block_in_byte", labels, stats["BlockIO"]["in"]) + gauges.add_value("service_block_out_byte", labels, stats["BlockIO"]["out"]) + + def collect_container_metrics(self, stats_obj, gpu_infos, all_conns): + if stats_obj is None: + logger.warning("docker stats returns None") + return None + + gauges = ResourceGauges() + + for container_id, stats in stats_obj.items(): + try: + self.process_one_container(container_id, stats, gpu_infos, all_conns, gauges) + except Exception: + logging.exception("error when trying to process container %s with name %s", + container_id, utils.walk_json_field_safe(stats, "name")) + + return gauges.as_array() + + +class ZombieCollector(Collector): + logs_histogram = Histogram("cmd_docker_logs_latency_seconds", + "Command call latency for docker logs (seconds)") + logs_timeout = 1 # 99th latency is 0.04s + + zombie_container_count = Gauge("zombie_container_count", + "number of zombie container found for this node", + ["type"]) + + class ZombieRecorder(object): + def __init__(self, type): + self.type = type + self.zombies = {} # key is container id, value is enter zombie time + + # When we first meet zombie container, we only record time of that meet, + # we wait extra decay_time to report it as zombie. Because at the time + # of our recording, zombie just produced, and haven't been recycled, we + # wait 5 minutes to avoid possible cases of normal zombie. + self.decay_time = datetime.timedelta(minutes=5) + + def update(self, zombie_ids, now): + """ feed in new zombie ids and get count of decayed zombie """ + # remove all records not exist anymore + for z_id in list(self.zombies.keys()): + if z_id not in zombie_ids: + logger.debug("pop zombie %s that not exist anymore", z_id) + self.zombies.pop(z_id) + + count = 0 + for current in zombie_ids: + if current in self.zombies: + enter_zombie_time = self.zombies[current] + if now - enter_zombie_time > self.decay_time: + count += 1 + else: + logger.debug("new zombie %s", current) + self.zombies[current] = now + + ZombieCollector.zombie_container_count.labels(self.type).set(count) + return count # for test + + def __len__(self): + return len(self.zombies) + + def __init__(self, name, sleep_time, atomic_ref, iteration_counter, stats_info_ref): + Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter) + self.stats_info_ref = stats_info_ref + + self.type1_zombies = ZombieCollector.ZombieRecorder("job_exit_hangs") + self.type2_zombies = ZombieCollector.ZombieRecorder("residual_job") + + self.yarn_pattern = u"container_\w{3}_[0-9]{13}_[0-9]{4}_[0-9]{2}_[0-9]{6}" + self.yarn_container_reg = re.compile(u"^" + self.yarn_pattern + "$") + self.job_container_reg = re.compile(u"^.+(" + self.yarn_pattern + u")$") + + def update_zombie_count_type1(self, exited_containers, now): + """ this fn will generate zombie container count for the first type, + exited_containers is container id set of which we believe exited """ + return self.type1_zombies.update(exited_containers, now) + + def update_zombie_count_type2(self, stats, now): + """ this fn will generate zombie container count for the second type """ + names = set([info["name"] for info in stats.values()]) + + job_containers = {} # key is original name, value is corresponding yarn_container name + yarn_containers = set() + + zombie_ids = set() + + for name in names: + if re.match(self.yarn_container_reg, name) is not None: + yarn_containers.add(name) + elif re.match(self.job_container_reg, name) is not None: + match = re.match(self.job_container_reg, name) + value = match.groups()[0] + job_containers[name] = value + else: + pass # ignore + + for job_name, yarn_name in job_containers.items(): + if yarn_name not in yarn_containers: + zombie_ids.add(job_name) + + return self.type2_zombies.update(zombie_ids, now) + + def docker_logs(self, container_id, tail="all"): + try: + return utils.exec_cmd( + ["docker", "logs", "--tail", str(tail), str(container_id)], + histogram=ZombieCollector.logs_histogram, + stderr=subprocess.STDOUT, # also capture stderr output + timeout=ZombieCollector.logs_timeout) + except subprocess.TimeoutExpired as e: + logger.warning("docker log timeout") + except subprocess.CalledProcessError as e: + logger.warning("docker logs returns %d, output %s", e.returncode, e.output) + except Exception: + logger.exception("exec docker logs error") + + return "" + + def is_container_exited(self, container_id): + logs = self.docker_logs(container_id, tail=50) + if re.search(u"USER COMMAND END", logs): + return True + return False + + def update_zombie_count(self, stats): + """ + There are two types of zombie: + 1. container which outputed "USER COMMAND END" but did not exist for a long period of time + 2. yarn container exited but job container didn't + """ + if stats is None: + logging.warning("docker stats is None") + return + + exited_containers = set(filter(self.is_container_exited, stats.keys())) + + now = datetime.datetime.now() + self.update_zombie_count_type1(exited_containers, now) + self.update_zombie_count_type2(stats, now) + + def collect_impl(self): + # set it to None so if docker-stats hangs till next time we get, + # we will get None + stats_info = self.stats_info_ref.get_and_set(None) + self.update_zombie_count(stats_info) diff --git a/src/job-exporter/src/docker_inspect.py b/src/job-exporter/src/docker_inspect.py index c412f4171..3a9c5bc68 100644 --- a/src/job-exporter/src/docker_inspect.py +++ b/src/job-exporter/src/docker_inspect.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -25,8 +25,8 @@ import utils logger = logging.getLogger(__name__) -targetLabel = {"PAI_HOSTNAME", "PAI_JOB_NAME", "PAI_USER_NAME", "PAI_CURRENT_TASK_ROLE_NAME", "GPU_ID"} -targetEnv = {"PAI_TASK_INDEX"} +target_label = {"PAI_HOSTNAME", "PAI_JOB_NAME", "PAI_USER_NAME", "PAI_CURRENT_TASK_ROLE_NAME", "GPU_ID"} +target_env = {"PAI_TASK_INDEX"} def parse_docker_inspect(inspect_output): @@ -37,39 +37,35 @@ def parse_docker_inspect(inspect_output): obj_labels = utils.walk_json_field_safe(obj, 0, "Config", "Labels") if obj_labels is not None: for key in obj_labels: - if key in targetLabel: - labelKey = "container_label_{0}".format(key.replace(".", "_")) - labelVal = obj_labels[key] - labels[labelKey] = labelVal + if key in target_label: + label_key = "container_label_{0}".format(key.replace(".", "_")) + label_val = obj_labels[key] + labels[label_key] = label_val obj_env = utils.walk_json_field_safe(obj, 0, "Config", "Env") if obj_env: for env in obj_env: - envItem = env.split("=") - if envItem[0] in targetEnv: - envKey = "container_env_{0}".format(envItem[0].replace(".", "_")) - envVal = envItem[1] - envs[envKey] = envVal + env_item = env.split("=") + if env_item[0] in target_env: + key = "container_env_{0}".format(env_item[0].replace(".", "_")) + val = env_item[1] + envs[key] = val pid = utils.walk_json_field_safe(obj, 0, "State", "Pid") return {"env": envs, "labels": labels, "pid": pid} -def inspect(containerId): +def inspect(container_id, histogram, timeout): try: - logger.debug("ready to run docker inspect") - dockerDockerInspect = utils.check_output(["docker", "inspect", containerId]) - inspectInfo = parse_docker_inspect(dockerDockerInspect) - - return inspectInfo + result = utils.exec_cmd( + ["docker", "inspect", container_id], + histogram=histogram, + timeout=timeout) + return parse_docker_inspect(result) except subprocess.CalledProcessError as e: logger.exception("command '%s' return with error (code %d): %s", e.cmd, e.returncode, e.output) - -def main(argv): - containerId = argv[0] - print inspect(containerId) - -# execute cmd example: python .\docker_inspect.py 33a22dcd4ba3 -if __name__ == "__main__": - main(sys.argv[1:]) + except subprocess.TimeoutExpired: + logger.warning("docker inspect timeout") + except Exception: + logger.exception("exec docker inspect error") diff --git a/src/job-exporter/src/docker_stats.py b/src/job-exporter/src/docker_stats.py index a8288f40e..05c404244 100644 --- a/src/job-exporter/src/docker_stats.py +++ b/src/job-exporter/src/docker_stats.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -17,10 +17,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import subprocess -import json import sys import re -import datetime import logging import utils @@ -28,7 +26,7 @@ import utils logger = logging.getLogger(__name__) def parse_percentile(data): - return data.replace("%", "") + return float(data.replace("%", "")) def parse_io(data): inOut = data.split("/") @@ -65,14 +63,13 @@ def convert_to_byte(data): return number def parse_docker_stats(stats): - data = [line.split(',') for line in stats.splitlines()] + data = [line.split(",") for line in stats.splitlines()] # pop the headers data.pop(0) - rowNum = len(data) - colNum = len(data[0]) - containerStats = {} + row_count = len(data) + container_stats = {} - for i in range(rowNum): + for i in range(row_count): id = data[i][0] containerInfo = { "id": data[i][0], @@ -83,26 +80,20 @@ def parse_docker_stats(stats): "BlockIO": parse_io(data[i][5]), "MemPerc": parse_percentile(data[i][6]) } - containerStats[id] = containerInfo - return containerStats + container_stats[id] = containerInfo + return container_stats -def stats(): - start = datetime.datetime.now() +def stats(histogram, timeout): try: - logger.info("ready to run docker stats") - dockerDockerStats = utils.check_output([ + result = utils.exec_cmd([ "docker", "stats", "--no-stream", "--format", - "table {{.Container}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"]) - return parse_docker_stats(dockerDockerStats) + "table {{.Container}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"], + histogram=histogram, + timeout=timeout) + return parse_docker_stats(result) except subprocess.CalledProcessError as e: logger.error("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) - finally: - end = datetime.datetime.now() - logger.info("docker state spent %s", end - start) - -def main(argv): - stats() - -# execute cmd example: python .\docker_stats.py True -if __name__ == "__main__": - main(sys.argv[1:]) + except subprocess.TimeoutExpired: + logger.warning("docker stats timeout") + except Exception: + logger.exception("exec docker stats error") diff --git a/src/job-exporter/src/job_exporter.py b/src/job-exporter/src/job_exporter.py deleted file mode 100644 index 61eccb7a4..000000000 --- a/src/job-exporter/src/job_exporter.py +++ /dev/null @@ -1,391 +0,0 @@ -#!/usr/bin/python -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -import copy -import sys -import time -import logging -import re -import datetime -import subprocess -import os -import json - -import docker_stats -import docker_inspect -import gpu_exporter -import network -import utils -from utils import Metric - -logger = logging.getLogger(__name__) - - -# k8s will prepend "k8s_" to pod name. There will also be a container name prepend with "k8s_POD_" -# which is a docker container used to construct network & pid namespace for specific container. These -# container prepend with "k8s_POD" consume nothing. -pai_services = map(lambda s: "k8s_" + s, [ - "rest-server", - "pylon", - "webportal", - "grafana", - "prometheus", - "alertmanager", - "watchdog", - "end-to-end-test", - "yarn-frameworklauncher", - "hadoop-jobhistory-service", - "hadoop-name-node", - "hadoop-node-manager", - "hadoop-resource-manager", - "hadoop-data-node", - "zookeeper", - "node-exporter", - "job-exporter", - "yarn-exporter", - "nvidia-drivers" -]) - - -class ZombieRecorder(object): - def __init__(self): - self.zombies = {} # key is container id, value is enter zombie time - - # When we first meet zombie container, we only record time of that meet, - # we wait extra decay_time to report it as zombie. Because at the time - # of our recording, zombie just produced, and haven't been recycled, we - # wait 5 minutes to avoid possible cases of normal zombie. - self.decay_time = datetime.timedelta(minutes=5) - - def update(self, zombie_ids, now): - """ feed in new zombie ids and get count of decayed zombie """ - # remove all records not exist anymore - for z_id in self.zombies.keys(): - if z_id not in zombie_ids: - logger.debug("pop zombie %s that not exist anymore", z_id) - self.zombies.pop(z_id) - - count = 0 - for current in zombie_ids: - if current in self.zombies: - enter_zombie_time = self.zombies[current] - if now - enter_zombie_time > self.decay_time: - count += 1 - else: - logger.debug("new zombie %s", current) - self.zombies[current] = now - - return count - - def __len__(self): - return len(self.zombies) - - -yarn_pattern = u"container_\w{3}_[0-9]{13}_[0-9]{4}_[0-9]{2}_[0-9]{6}" -yarn_container_reg = re.compile(u"^" + yarn_pattern + "$") -job_container_reg = re.compile(u"^.+(" + yarn_pattern + u")$") - - -def parse_from_labels(labels): - gpu_ids = [] - other_labels = {} - - for key, val in labels.items(): - if "container_label_GPU_ID" == key: - s2 = val.replace("\"", "").split(",") - for id in s2: - if id: - gpu_ids.append(id) - else: - other_labels[key] = val - - return gpu_ids, other_labels - - -def generate_zombie_count_type1(type1_zombies, exited_containers, now): - """ this fn will generate zombie container count for the first type, - exited_containers is container id set of which we believe exited """ - return type1_zombies.update(exited_containers, now) - - -def generate_zombie_count_type2(type2_zombies, stats, now): - """ this fn will generate zombie container count for the second type """ - names = set([info["name"] for info in stats.values()]) - - job_containers = {} # key is original name, value is corresponding yarn_container name - yarn_containers = set() - - zombie_ids = set() - - for name in names: - if re.match(yarn_container_reg, name) is not None: - yarn_containers.add(name) - elif re.match(job_container_reg, name) is not None: - match = re.match(job_container_reg, name) - value = match.groups()[0] - job_containers[name] = value - else: - pass # ignore - - for job_name, yarn_name in job_containers.items(): - if yarn_name not in yarn_containers: - zombie_ids.add(job_name) - - return type2_zombies.update(zombie_ids, now) - - -def docker_logs(container_id, tail="all"): - try: - return utils.check_output(["docker", "logs", "--tail", str(tail), str(container_id)]) - except subprocess.CalledProcessError as e: - logger.exception("command '%s' return with error (code %d): %s", - e.cmd, e.returncode, e.output) - - -def is_container_exited(container_id): - logs = docker_logs(container_id, tail=50) - if logs is not None and re.search(u"USER COMMAND END", logs): - return True - return False - - -def generate_zombie_count(stats, type1_zombies, type2_zombies): - """ - There are two types of zombie: - 1. container which outputed "USER COMMAND END" but did not exist for a long period of time - 2. yarn container exited but job container didn't - """ - exited_containers = set(filter(is_container_exited, stats.keys())) - logger.debug("exited_containers is %s", exited_containers) - - now = datetime.datetime.now() - zombie_count1 = generate_zombie_count_type1(type1_zombies, exited_containers, now) - zombie_count2 = generate_zombie_count_type2(type2_zombies, stats, now) - - return [Metric("zombie_container_count", {}, zombie_count1 + zombie_count2)] - - -def collect_job_metrics(gpu_infos, all_conns, type1_zombies, type2_zombies): - stats_obj = docker_stats.stats() - if stats_obj is None: - logger.warning("docker stats returns None") - return None - - result = [] - for container_id, stats in stats_obj.items(): - pai_service_name = None - - # TODO speed this up, since this is O(n^2) - for service_name in pai_services: - if stats["name"].startswith(service_name): - pai_service_name = service_name[4:] # remove "k8s_" prefix - break - - inspect_info = docker_inspect.inspect(container_id) - pid = inspect_info["pid"] if inspect_info is not None else None - inspect_labels = utils.walk_json_field_safe(inspect_info, "labels") - - if not inspect_labels and pai_service_name is None: - continue # other container, maybe kubelet or api-server - - # get network consumption, since all our services/jobs running in host network, - # network statistic from docker is not specific to that container. We have to - # get network statistic by ourselves. - lsof_result = network.lsof(pid) - net_in, net_out = network.get_container_network_metrics(all_conns, lsof_result) - if logger.isEnabledFor(logging.DEBUG): - debug_info = utils.check_output("ps -o cmd fp {0} | tail -n 1".format(pid), shell=True) - - logger.debug("pid %s with cmd `%s` has lsof result %s, in %d, out %d", - pid, debug_info, lsof_result, net_in, net_out) - - if pai_service_name is None: - gpu_ids, container_labels = parse_from_labels(inspect_info["labels"]) - container_labels.update(inspect_info["env"]) - - for id in gpu_ids: - if gpu_infos: - labels = copy.deepcopy(container_labels) - labels["minor_number"] = id - - result.append(Metric("container_GPUPerc", labels, gpu_infos[id]["gpu_util"])) - result.append(Metric("container_GPUMemPerc", labels, gpu_infos[id]["gpu_mem_util"])) - - result.append(Metric("container_CPUPerc", container_labels, stats["CPUPerc"])) - result.append(Metric("container_MemUsage", container_labels, stats["MemUsage_Limit"]["usage"])) - result.append(Metric("container_MemLimit", container_labels, stats["MemUsage_Limit"]["limit"])) - result.append(Metric("container_NetIn", container_labels, net_in)) - result.append(Metric("container_NetOut", container_labels, net_out)) - result.append(Metric("container_BlockIn", container_labels, stats["BlockIO"]["in"])) - result.append(Metric("container_BlockOut", container_labels, stats["BlockIO"]["out"])) - result.append(Metric("container_MemPerc", container_labels, stats["MemPerc"])) - else: - labels = {"name": pai_service_name} - result.append(Metric("service_cpu_percent", labels, stats["CPUPerc"])) - result.append(Metric("service_mem_usage_byte", labels, stats["MemUsage_Limit"]["usage"])) - result.append(Metric("service_mem_limit_byte", labels, stats["MemUsage_Limit"]["limit"])) - result.append(Metric("service_mem_usage_percent", labels, stats["MemPerc"])) - result.append(Metric("service_net_in_byte", labels, net_in)) - result.append(Metric("service_net_out_byte", labels, net_out)) - result.append(Metric("service_block_in_byte", labels, stats["BlockIO"]["in"])) - result.append(Metric("service_block_out_byte", labels, stats["BlockIO"]["out"])) - - result.extend(generate_zombie_count(stats_obj, type1_zombies, type2_zombies)) - - return result - - -def collect_docker_daemon_status(): - """ check docker daemon status in current host """ - cmd = "systemctl is-active docker | if [ $? -eq 0 ]; then echo \"active\"; else exit 1 ; fi" - error = "ok" - - try: - logger.info("call systemctl to get docker status") - - out = utils.check_output(cmd, shell=True) - - if "active" not in out: - error = "inactive" - except subprocess.CalledProcessError as e: - logger.exception("command '%s' return with error (code %d): %s", - cmd, e.returncode, e.output) - error = e.strerror() - except OSError as e: - if e.errno == os.errno.ENOENT: - logger.warning("systemctl not found") - error = e.strerror() - - return [Metric("docker_daemon_count", {"error": error}, 1)] - - -def get_gpu_count(path): - hostname = os.environ.get("HOSTNAME") - ip = os.environ.get("HOST_IP") - - logger.info("hostname is %s, ip is %s", hostname, ip) - - with open(path) as f: - gpu_config = json.load(f) - - if hostname is not None and gpu_config["nodes"].get(hostname) is not None: - return gpu_config["nodes"][hostname]["gpuCount"] - elif ip is not None and gpu_config["nodes"].get(ip) is not None: - return gpu_config["nodes"][ip]["gpuCount"] - else: - logger.warning("failed to find gpu count from config %s", gpu_config) - return 0 - - -def config_environ(): - """ since job-exporter needs to call nvidia-smi, we need to change - LD_LIBRARY_PATH to correct value """ - driver_path = os.environ.get("NV_DRIVER") - logger.debug("NV_DRIVER is %s", driver_path) - - ld_path = os.environ.get("LD_LIBRARY_PATH", "") - os.environ["LD_LIBRARY_PATH"] = ld_path + os.pathsep + \ - os.path.join(driver_path, "lib") + os.pathsep + \ - os.path.join(driver_path, "lib64") - - logger.debug("LD_LIBRARY_PATH is %s", os.environ["LD_LIBRARY_PATH"]) - -def main(argv): - config_environ() - - log_dir = argv[0] - gpu_metrics_path = log_dir + "/gpu_exporter.prom" - job_metrics_path = log_dir + "/job_exporter.prom" - docker_metrics_path = log_dir + "/docker.prom" - time_metrics_path = log_dir + "/time.prom" - configured_gpu_path = log_dir + "/cofigured_gpu.prom" - time_sleep_s = int(argv[1]) - - iter = 0 - - gpu_singleton = utils.Singleton(gpu_exporter.collect_gpu_info, name="gpu_singleton") - docker_status_singleton = utils.Singleton(collect_docker_daemon_status, name="docker_singleton") - - type1_zombies = ZombieRecorder() - type2_zombies = ZombieRecorder() - - configured_gpu_count = get_gpu_count("/gpu-config/gpu-configuration.json") - logger.debug("configured_gpu_count is %s", configured_gpu_count) - utils.export_metrics_to_file(configured_gpu_path, [Metric("configured_gpu_count", {}, - configured_gpu_count)]) - - while True: - start = datetime.datetime.now() - try: - logger.info("job exporter running {0} iteration".format(str(iter))) - iter += 1 - - docker_status, is_old = docker_status_singleton.try_get() - if not is_old: - utils.export_metrics_to_file(docker_metrics_path, docker_status) - else: - utils.export_metrics_to_file(docker_metrics_path, None) - - all_conns = network.iftop() - logger.debug("iftop result is %s", all_conns) - - gpu_infos, is_old = gpu_singleton.try_get() - - if is_old: - gpu_infos = None # ignore old gpu_infos - - gpu_metrics = gpu_exporter.convert_gpu_info_to_metrics(gpu_infos) - utils.export_metrics_to_file(gpu_metrics_path, gpu_metrics) - - # join with docker stats metrics and docker inspect labels - job_metrics = collect_job_metrics(gpu_infos, all_conns, type1_zombies, type2_zombies) - utils.export_metrics_to_file(job_metrics_path, job_metrics) - except Exception as e: - logger.exception("exception in job exporter loop") - finally: - end = datetime.datetime.now() - - time_metrics = [Metric("job_exporter_iteration_seconds", {}, (end - start).seconds)] - utils.export_metrics_to_file(time_metrics_path, time_metrics) - - time.sleep(time_sleep_s) - - -def get_logging_level(): - mapping = { - "DEBUG": logging.DEBUG, - "INFO": logging.INFO, - "WARNING": logging.WARNING - } - - result = logging.INFO - - if os.environ.get("LOGGING_LEVEL") is not None: - level = os.environ["LOGGING_LEVEL"] - result = mapping.get(level.upper()) - if result is None: - sys.stderr.write("unknown logging level " + level + ", default to INFO\n") - result = logging.INFO - - return result - -if __name__ == "__main__": - logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s", - level=get_logging_level()) - - main(sys.argv[1:]) diff --git a/src/job-exporter/src/main.py b/src/job-exporter/src/main.py new file mode 100644 index 000000000..227fded9d --- /dev/null +++ b/src/job-exporter/src/main.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import os +import json +import threading + +from wsgiref.simple_server import make_server +from prometheus_client import make_wsgi_app, Gauge +from prometheus_client.core import REGISTRY + +import collector + +logger = logging.getLogger(__name__) + + +configured_gpu_counter = Gauge("configured_gpu_count", + "total number of gpu configured for this node") + + +class CustomCollector(object): + def __init__(self, atomic_refs): + self.atomic_refs = atomic_refs + + def collect(self): + data = [] + + for ref in self.atomic_refs: + # set None to achieve + # https://github.com/Microsoft/pai/issues/1764#issuecomment-442733098 + d = ref.get_and_set(None) + if d is not None: + data.extend(d) + + if len(data) > 0: + for datum in data: + yield datum + else: + # https://stackoverflow.com/a/6266586 + # yield nothing + return + yield + + +def config_environ(): + """ since job-exporter needs to call nvidia-smi, we need to change + LD_LIBRARY_PATH to correct value """ + driver_path = os.environ.get("NV_DRIVER") + logger.debug("NV_DRIVER is %s", driver_path) + + ld_path = os.environ.get("LD_LIBRARY_PATH", "") + os.environ["LD_LIBRARY_PATH"] = ld_path + os.pathsep + \ + os.path.join(driver_path, "lib") + os.pathsep + \ + os.path.join(driver_path, "lib64") + + logger.debug("LD_LIBRARY_PATH is %s", os.environ["LD_LIBRARY_PATH"]) + + +def try_remove_old_prom_file(path): + """ try to remove old prom file, since old prom file are exposed by node-exporter, + if we do not remove, node-exporter will still expose old metrics """ + if os.path.isfile(path): + try: + os.unlink(path) + except Exception as e: + log.warning("can not remove old prom file %s", path) + + +def get_gpu_count(path): + hostname = os.environ.get("HOSTNAME") + ip = os.environ.get("HOST_IP") + + logger.debug("hostname is %s, ip is %s", hostname, ip) + + with open(path) as f: + gpu_config = json.load(f) + + if hostname is not None and gpu_config["nodes"].get(hostname) is not None: + return gpu_config["nodes"][hostname]["gpuCount"] + elif ip is not None and gpu_config["nodes"].get(ip) is not None: + return gpu_config["nodes"][ip]["gpuCount"] + else: + logger.warning("failed to find gpu count from config %s", gpu_config) + return 0 + + +def main(args): + config_environ() + try_remove_old_prom_file(args.log + "/gpu_exporter.prom") + try_remove_old_prom_file(args.log + "/job_exporter.prom") + try_remove_old_prom_file(args.log + "/docker.prom") + try_remove_old_prom_file(args.log + "/time.prom") + try_remove_old_prom_file(args.log + "/configured_gpu.prom") + + configured_gpu_counter.set(get_gpu_count("/gpu-config/gpu-configuration.json")) + + # used to exchange gpu info between GpuCollector and ContainerCollector + gpu_info_ref = collector.AtomicRef() + + # used to exchange docker stats info between ContainerCollector and ZombieCollector + stats_info_ref = collector.AtomicRef() + + interval = args.interval + # Because all collector except container_collector will spent little time in calling + # external command to get metrics, so they need to sleep 30s to align with prometheus + # scrape interval. The 99th latency of container_collector loop is around 20s, so it + # should only sleep 10s to adapt to scrape interval + collector_args = [ + ("docker_daemon_collector", interval, collector.DockerCollector), + ("gpu_collector", interval, collector.GpuCollector, gpu_info_ref), + ("container_collector", interval - 18, collector.ContainerCollector, + gpu_info_ref, stats_info_ref, args.interface), + ("zombie_collector", interval, collector.ZombieCollector, stats_info_ref), + ] + + refs = list(map(lambda x: collector.make_collector(*x), collector_args)) + + REGISTRY.register(CustomCollector(refs)) + + app = make_wsgi_app(REGISTRY) + httpd = make_server("", int(args.port), app) + httpd.serve_forever() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--log", "-l", help="log dir to store log", default="/datastorage/prometheus") + parser.add_argument("--port", "-p", help="port to expose metrics", default="9102") + parser.add_argument("--interval", "-i", help="prometheus scrape interval", type=int, default=30) + parser.add_argument("--interface", "-n", help="network interface for job-exporter to listen on", required=True) + args = parser.parse_args() + + def get_logging_level(): + mapping = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING + } + + result = logging.INFO + + if os.environ.get("LOGGING_LEVEL") is not None: + level = os.environ["LOGGING_LEVEL"] + result = mapping.get(level.upper()) + if result is None: + sys.stderr.write("unknown logging level " + level + \ + ", default to INFO\n") + result = logging.INFO + + return result + + logging.basicConfig(format="%(asctime)s - %(levelname)s - %(threadName)s - %(filename)s:%(lineno)s - %(message)s", + level=get_logging_level()) + + main(args) diff --git a/src/job-exporter/src/network.py b/src/job-exporter/src/network.py index c1a3405e2..3b49aa5a7 100644 --- a/src/job-exporter/src/network.py +++ b/src/job-exporter/src/network.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -16,13 +16,14 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -import subprocess -import json -import sys import re -import time import logging import collections +import subprocess +import socket +import fcntl +import struct +import array import utils @@ -60,12 +61,20 @@ def convert_to_byte(data): return number -def iftop(): +def iftop(interface, histogram, timeout): + cmd = ["iftop", "-t", "-P", "-s", "1", "-L", "10000", "-B", "-n", "-N"] + if interface is not None: + cmd.extend(["-i", interface]) + try: - output = utils.check_output(["iftop", "-t", "-P", "-s", "1", "-L", "10000", - "-B", "-n", "-N"]) + output = utils.exec_cmd( + cmd, + stderr=subprocess.STDOUT, # also capture stderr output + histogram=histogram, timeout=timeout) return parse_iftop(output) - except: + except subprocess.TimeoutExpired: + logger.warning("iftop timeout") + except Exception: logger.exception("exec iftop error") return None @@ -89,7 +98,7 @@ def parse_iftop(iftop_output, duration=40): if part == 1: data.append(line) - for line_no in xrange(0, len(data), 2): + for line_no in range(0, len(data), 2): line1 = data[line_no].split() line2 = data[line_no + 1].split() src = line1[1] @@ -116,15 +125,22 @@ def parse_iftop(iftop_output, duration=40): return result -def lsof(pid): +def lsof(pid, histogram, timeout): """ use infilter to do setns https://github.com/yadutaf/infilter """ if pid is None: return None try: - output = utils.check_output(["infilter", str(pid), "/usr/bin/lsof", "-i", "-n", "-P"]) + output = utils.exec_cmd(["infilter", str(pid), "/usr/bin/lsof", "-i", "-n", "-P"], + histogram=histogram, + stderr=subprocess.STDOUT, # also capture stderr output + timeout=timeout) return parse_lsof(output) - except: + except subprocess.TimeoutExpired: + logger.warning("lsof timeout") + except subprocess.CalledProcessError as e: + logger.warning("infilter lsof returns %d, output %s", e.returncode, e.output) + except Exception: logger.exception("exec lsof error") return None @@ -138,9 +154,13 @@ def parse_lsof(lsof_output): for line in data: if "ESTABLISHED" in line: parts = line.split() - pid = parts[1] - src = parts[8].split("->")[0] - conns[pid].add(src) + if len(parts) == 10: + pid = parts[1] + src = parts[8].split("->")[0] + conns[pid].add(src) + else: + logger.warning("unknown format of lsof %s", parts) + continue return conns @@ -165,19 +185,68 @@ def get_container_network_metrics(all_conns, lsof_result): return in_byte, out_byte -def main(pids): - """ test purpose """ - conns = iftop() +def format_ip(addr): + return str(addr[0]) + "." + \ + str(addr[1]) + "." + \ + str(addr[2]) + "." + \ + str(addr[3]) - logger.debug("conns is %s", conns) - logger.debug("lsof_result is %s", lsof_result) - for pid in pids: - lsof_result = lsof(pid) - in_byte, out_byte = get_container_network_metrics(conns, lsof_result) - logger.info("pid %s in %d out %d", pid, in_byte, out_byte) +def get_interfaces(): + """ get all network interfaces we see, return map with interface name as key, and ip as value """ + max_possible = 128 + bytes = max_possible * 32 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + names = array.array("B", b"\0" * bytes) + outbytes = struct.unpack("iL", fcntl.ioctl( + s.fileno(), + 0x8912, # SIOCGIFCONF + struct.pack("iL", bytes, names.buffer_info()[0]) + ))[0] -if __name__ == "__main__": - logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s", - level=logging.DEBUG) - main(sys.argv[1:]) + namestr = names.tostring() + + result = {} + for i in range(0, outbytes, 40): + name = namestr[i:i+16].split(b"\0", 1)[0].decode("ascii") + ip = namestr[i+20:i+24] + result[name] = format_ip(ip) + return result + + +def get_ip_can_access_internet(target="hub.docker.com"): + """ return None on error """ + s = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect((target, 80)) + except socket.gaierror: + logger.exception("failed to connect to %s", target) + return None + except Exception: + logger.exception("unknown exception when tying to connect %s", target) + + return s.getsockname()[0] + + +def try_to_get_right_interface(configured_ifs): + """ try to return a right interface so that iftop can listen on """ + ifs = get_interfaces() + + logger.debug("found interfaces %s", ifs) + + for interface in configured_ifs.split(","): + interface = interface.strip() + if interface in ifs: + return interface + + logger.info("didn't find correct network interface in this node, configured %s, found %s", + configured_ifs, ifs) + + ip = get_ip_can_access_internet() + if ip is not None: + for if_name, if_ip in ifs.items(): + if ip == if_ip: + return if_name + + return None diff --git a/src/job-exporter/src/gpu_exporter.py b/src/job-exporter/src/nvidia.py similarity index 61% rename from src/job-exporter/src/gpu_exporter.py rename to src/job-exporter/src/nvidia.py index 12d16ac35..c1cdf2a90 100644 --- a/src/job-exporter/src/gpu_exporter.py +++ b/src/job-exporter/src/nvidia.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -17,13 +17,11 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import subprocess -import sys from xml.dom import minidom import os import logging import utils -from utils import Metric logger = logging.getLogger(__name__) @@ -43,46 +41,25 @@ def parse_smi_xml_result(smi): if gpu_util == "N/A" or gpu_mem_util == "N/A": continue - result[str(minor)] = {"gpu_util": gpu_util, "gpu_mem_util": gpu_mem_util} + result[str(minor)] = {"gpu_util": float(gpu_util), "gpu_mem_util": float(gpu_mem_util)} return result -def collect_gpu_info(): - """ in some cases, nvidia-smi may block indefinitely, caller should be aware of this """ +def nvidia_smi(histogram, timeout): driver_path = os.environ["NV_DRIVER"] bin_path = os.path.join(driver_path, "bin/nvidia-smi") - try: - logger.info("call %s to get gpu metrics", bin_path) # used to check if nvidia-smi hangs - smi_output = utils.check_output([bin_path, "-q", "-x"]) + try: + smi_output = utils.exec_cmd([bin_path, "-q", "-x"], + histogram=histogram, timeout=timeout) return parse_smi_xml_result(smi_output) except subprocess.CalledProcessError as e: - if e.returncode == 127: - logger.exception("nvidia cmd error. command '%s' return with error (code %d): %s", - e.cmd, e.returncode, e.output) - else: - logger.exception("command '%s' return with error (code %d): %s", - e.cmd, e.returncode, e.output) - except OSError as e: - logger.exception("nvidia-smi not found") + logger.exception("command '%s' return with error (code %d): %s", + e.cmd, e.returncode, e.output) + except subprocess.TimeoutExpired: + logger.warning("nvidia-smi timeout") + except Exception: + logger.exception("exec nvidia-smi error") return None - - -def convert_gpu_info_to_metrics(gpu_infos): - if gpu_infos is None: - return None - - result = [Metric("nvidiasmi_attached_gpus", {}, len(gpu_infos))] - - for minor, info in gpu_infos.items(): - label = {"minor_number": minor} - result.append(Metric("nvidiasmi_utilization_gpu", label, info["gpu_util"])) - result.append(Metric("nvidiasmi_utilization_memory", label, info["gpu_mem_util"])) - - return result - - -if __name__ == "__main__": - print collect_gpu_info() diff --git a/src/job-exporter/src/utils.py b/src/job-exporter/src/utils.py index 10320ae2c..cee9310fe 100644 --- a/src/job-exporter/src/utils.py +++ b/src/job-exporter/src/utils.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -16,128 +16,27 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -import re -import codecs import subprocess import logging -import threading -import datetime -from Queue import Queue -from Queue import Empty logger = logging.getLogger(__name__) -class Metric(object): - """ represents one prometheus metric record - https://prometheus.io/docs/concepts/data_model/ """ - def __init__(self, name, labels, value): - self.name = name - self.labels = labels - self.value = value +def exec_cmd(*args, **kwargs): + """ exec a cmd with timeout, also record time used using prometheus higtogram """ + if kwargs.get("histogram") is not None: + histogram = kwargs.pop("histogram") + else: + histogram = None - def __eq__(self, o): - return self.name == o.name and self.labels == o.labels and self.value == o.value + logger.debug("about to exec %s", args[0]) - def __repr__(self): - if len(self.labels) > 0: - labels = ", ".join(map(lambda x: "{}=\"{}\"".format(x[0], x[1]), - self.labels.items())) - labels = "{" + labels + "}" - else: - labels = "" - - return "{}{} {}".format(self.name, labels, self.value) - - -def export_metrics_to_file(path, metrics): - """ if metrics not None, should still open the path, to modify time stamp of file, - readiness probe needs this""" - with codecs.open(path, "w", encoding="utf-8") as f: - if metrics is not None: - for metric in metrics: - f.write(str(metric)) - f.write("\n") - - -def check_output(*args, **kwargs): - """ subprocess.check_output may hanging if cmd output too much stdout or stderr """ - kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.PIPE - process = subprocess.Popen(*args, **kwargs) - outs = [] - errs = [] - - while process.poll() is None: - out, err = process.communicate() - outs.append(out) - errs.append(err) - if process.returncode != 0: - logger.warn("process `%s` failed with return code %d, stdout %s, stderr %s", - args, process.returncode, "".join(outs), "".join(errs)) - return "".join(outs) - - -class Singleton(object): - """ wrapper around metrics getter, because getter may block - indefinitely, so we wrap call in thread. - Also, to avoid having too much threads, use semaphore to ensure only 1 - thread is running """ - def __init__(self, getter, get_timeout_s=3, name="singleton"): - self.getter = getter - self.get_timeout_s = get_timeout_s - self.name = name - - self.semaphore = threading.Semaphore(1) - self.queue = Queue(1) - self.old_metrics = None - - def wrapper(self, semaphore, queue): - """ wrapper assume semaphore already acquired, will release semaphore on exit """ - result = None - - try: - try: - # remove result put by previous thread but didn't get by main thread - queue.get(block=False) - except Empty: - pass - - start = datetime.datetime.now() - result = self.getter() - except Exception as e: - logger.warn("%s get metrics failed", self.name) - logger.exception(e) - finally: - logger.info("%s get metrics spent %s", self.name, datetime.datetime.now() - start) - semaphore.release() - queue.put(result) - - def try_get(self): - """ return value, is_old tuple, if value is not newly getted, return value we get last time, - and set is_old to True, otherwise return newly getted value and False """ - if self.semaphore.acquire(False): - t = threading.Thread(target=Singleton.wrapper, name=self.name + "-getter", - args=(self, self.semaphore, self.queue)) - t.start() - else: - logger.warn("%s is still running", self.name + "-getter") - - try: - self.old_metrics = self.queue.get(block=True, timeout=self.get_timeout_s) - return self.old_metrics, False - except Empty: - pass - - return self.old_metrics, True - - -def camel_to_underscore(label): - """ convert camel case into underscore - https://stackoverflow.com/a/1176023 """ - tmp = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', label) - return re.sub('([a-z0-9])([A-Z])', r'\1_\2', tmp).lower() + if histogram is not None: + with histogram.time(): + return subprocess.check_output(*args, **kwargs).decode("utf-8") + else: + return subprocess.check_output(*args, **kwargs).decode("utf-8") def walk_json_field_safe(obj, *fields): diff --git a/src/job-exporter/test/base.py b/src/job-exporter/test/base.py new file mode 100644 index 000000000..9f9b8299b --- /dev/null +++ b/src/job-exporter/test/base.py @@ -0,0 +1,28 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import logging +import unittest + +class TestBase(unittest.TestCase): + """ + Test Base class for job-exporter + """ + @classmethod + def setUpClass(cls): + logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s", + level=logging.DEBUG) diff --git a/src/job-exporter/test/test_collector.py b/src/job-exporter/test/test_collector.py new file mode 100644 index 000000000..af34c9d7d --- /dev/null +++ b/src/job-exporter/test/test_collector.py @@ -0,0 +1,180 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import os +import sys +import copy +import unittest +import datetime +import time +import logging + +import base + +sys.path.append(os.path.abspath("../src/")) + +import collector +from collector import ContainerCollector + +logger = logging.getLogger(__name__) + +class TestContainerCollector(base.TestBase): + """ + Test ContainerCollector in collecotr.py + """ + + def test_parse_from_labels(self): + labels = {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"} + gpuIds, otherLabels = ContainerCollector.parse_from_labels(labels) + self.assertEqual(["0", "1"], gpuIds,) + copied = copy.deepcopy(labels) + copied.pop("container_label_GPU_ID") + self.assertEqual(copied, otherLabels) + + def test_infer_service_name(self): + self.assertIsNone(ContainerCollector.infer_service_name( + "k8s_POD_alertmanager-7884c59f78-66r86_default_0a32e30a-f6ae-11e8")) + + self.assertEqual( + "alertmanager", + ContainerCollector.infer_service_name( + "k8s_alertmanager_alertmanager-7884c59f78-66r86_default_0a32e30a-f6ae-11e8-a62d-000d3ab25bb6_2")) + + self.assertIsNone(ContainerCollector.infer_service_name( + "k8s_kube-scheduler_kube-scheduler-10.151.40.4_kube-system_f1164d931979939cf0601155df9c748a_6")) + + +class TestDockerCollector(base.TestBase): + """ + Test DockerCollector in collector.py + """ + + def assert_metrics(self, metrics): + self.assertEqual(1, len(metrics)) + self.assertEqual(1, len(metrics[0].samples)) + sample = metrics[0].samples[0] + self.assertEqual(1, len(sample[1])) # label keys + self.assertEqual(1, sample[2]) # sample value + + def test_impl(self): + _, c = collector.instantiate_collector( + "test_docker_collector1", + 0.5, + collector.DockerCollector) + + self.assert_metrics(c.collect_impl()) + + def test_base_collector(self): + """ actually setup DockerCollector thread, and test, since this is multi-thread + test case, maybe sensitive to the system load """ + ref = collector.make_collector( + "test_docker_collector2", + 0.5, + collector.DockerCollector) + + metrics = None + for i in range(10): + metrics = ref.get() + if metrics is not None: + break + time.sleep(0.1) + + self.assert_metrics(metrics) + + +class TestZombieCollector(base.TestBase): + """ + Test ZombieCollector in collector.py + """ + def setUp(self): + # Because prometheus forbid same metric name, and we generate metric + # in from name, we need to differentiate name using time. + t = str(time.time()).replace(".", "_") + + _, self.collector = collector.instantiate_collector( + "test_zombie_collector" + t, + 0.5, + collector.ZombieCollector, + collector.AtomicRef()) + + def test_update_zombie_count_type1(self): + start = datetime.datetime.now() + + one_sec = datetime.timedelta(seconds=1) + + type1_recorder = self.collector.type1_zombies + + self.assertEqual(0, + self.collector.update_zombie_count_type1({"a", "b"}, start)) + self.assertEqual(2, len(type1_recorder)) + + self.assertEqual(0, + self.collector.update_zombie_count_type1({"a", "b"}, + start + type1_recorder.decay_time - one_sec)) + self.assertEqual(2, len(type1_recorder)) + + self.assertEqual(2, + self.collector.update_zombie_count_type1({"a", "b"}, + start + type1_recorder.decay_time + one_sec)) + self.assertEqual(2, len(type1_recorder)) + + self.assertEqual(1, + self.collector.update_zombie_count_type1({"a"}, + start + type1_recorder.decay_time + 2 *one_sec)) + self.assertEqual(1, len(type1_recorder)) + + self.assertEqual(0, + self.collector.update_zombie_count_type1({}, + start + type1_recorder.decay_time + 3 * one_sec)) + self.assertEqual(0, len(type1_recorder)) + + def test_update_zombie_count_type2(self): + start = datetime.datetime.now() + + one_sec = datetime.timedelta(seconds=1) + + stats = {"43ffe701d883": + {"name": "core-caffe2_resnet50_20181012040921.586-container_e03_1539312078880_0780_01_000002"}, + "8de2f53e64cb": + {"name": "container_e03_1539312078880_0780_01_000002"}} + + type2_recorder = self.collector.type2_zombies + + self.assertEqual(0, + self.collector.update_zombie_count_type2(stats, start)) + + stats.pop("8de2f53e64cb") + + self.assertEqual(0, + self.collector.update_zombie_count_type2(stats, start + one_sec)) + + self.assertEqual(0, + self.collector.update_zombie_count_type2(stats, + start + type2_recorder.decay_time)) + + self.assertEqual(1, + self.collector.update_zombie_count_type2(stats, + start + type2_recorder.decay_time + 2 * one_sec)) + + stats.pop("43ffe701d883") + + self.assertEqual(0, + self.collector.update_zombie_count_type2(stats, + start + type2_recorder.decay_time + 3 * one_sec)) + +if __name__ == '__main__': + unittest.main() diff --git a/src/job-exporter/test/test_docker_inspect.py b/src/job-exporter/test/test_docker_inspect.py index 18853fe37..0072efa1d 100644 --- a/src/job-exporter/test/test_docker_inspect.py +++ b/src/job-exporter/test/test_docker_inspect.py @@ -15,46 +15,25 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -import os import sys +import os import unittest -import yaml -import logging -import logging.config + +import base sys.path.append(os.path.abspath("../src/")) from docker_inspect import parse_docker_inspect -class TestDockerInspect(unittest.TestCase): +class TestDockerInspect(base.TestBase): """ Test docker_inspect.py """ - def setUp(self): - try: - os.chdir(os.path.abspath("test")) - except: - pass - - configuration_path = "logging.yaml" - - if os.path.exists(configuration_path): - with open(configuration_path, 'rt') as f: - logging_configuration = yaml.safe_load(f.read()) - logging.config.dictConfig(logging_configuration) - logging.getLogger() - - - def tearDown(self): - try: - os.chdir(os.path.abspath("..")) - except: - pass def test_parse_docker_inspect(self): sample_path = "data/docker_inspect_sample.json" - file = open(sample_path, "r") - docker_inspect = file.read() + with open(sample_path, "r") as f: + docker_inspect = f.read() inspect_info = parse_docker_inspect(docker_inspect) target_inspect_info = {"labels": {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"}, "env": {"container_env_PAI_TASK_INDEX": "0"}, "pid": 95539} self.assertEqual(target_inspect_info, inspect_info) diff --git a/src/job-exporter/test/test_docker_stats.py b/src/job-exporter/test/test_docker_stats.py index c37c96e61..984548c6d 100644 --- a/src/job-exporter/test/test_docker_stats.py +++ b/src/job-exporter/test/test_docker_stats.py @@ -18,9 +18,8 @@ import os import sys import unittest -import yaml -import logging -import logging.config + +import base sys.path.append(os.path.abspath("../src/")) @@ -30,37 +29,17 @@ from docker_stats import parse_usage_limit from docker_stats import parse_io from docker_stats import parse_percentile -class TestDockerStats(unittest.TestCase): +class TestDockerStats(base.TestBase): """ Test docker_stats.py """ - def setUp(self): - try: - os.chdir(os.path.abspath("test")) - except: - pass - - configuration_path = "logging.yaml" - - if os.path.exists(configuration_path): - with open(configuration_path, 'rt') as f: - logging_configuration = yaml.safe_load(f.read()) - logging.config.dictConfig(logging_configuration) - logging.getLogger() - - def tearDown(self): - try: - os.chdir(os.path.abspath("..")) - except: - pass - def test_parse_docker_inspect(self): sample_path = "data/docker_stats_sample.txt" - file = open(sample_path, "r") - docker_stats = file.read() + with open(sample_path, "r") as f: + docker_stats = f.read() + stats_info = parse_docker_stats(docker_stats) - target_stats_info = {'722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636': {'BlockIO': {'out': 156000000.0, 'in': 28600000.0}, 'NetIO': {'out': 425000000000.0, 'in': 1580000000000.0}, 'CPUPerc': '0.00', 'MemPerc': '0.19', 'id': '722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636', 'MemUsage_Limit': {'usage': 111149056.0, 'limit': 59088012574.72}, 'name': 'alert-manager'}, '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419': {'BlockIO': {'out': 0.0, 'in': 28000000.0}, 'NetIO': {'out': 0.0, 'in': 0.0}, 'CPUPerc': '0.00', 'MemPerc': '6.23', 'id': '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419', 'MemUsage_Limit': {'usage': 19587399.68, 'limit': 314572800.0}, 'name': 'prometheus'}} - print stats_info.values() + target_stats_info = {'722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636': {'BlockIO': {'out': 156000000.0, 'in': 28600000.0}, 'NetIO': {'out': 425000000000.0, 'in': 1580000000000.0}, 'CPUPerc': 0.00, 'MemPerc': 0.19, 'id': '722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636', 'MemUsage_Limit': {'usage': 111149056.0, 'limit': 59088012574.72}, 'name': 'alert-manager'}, '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419': {'BlockIO': {'out': 0.0, 'in': 28000000.0}, 'NetIO': {'out': 0.0, 'in': 0.0}, 'CPUPerc': 0.00, 'MemPerc': 6.23, 'id': '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419', 'MemUsage_Limit': {'usage': 19587399.68, 'limit': 314572800.0}, 'name': 'prometheus'}} self.assertEqual(target_stats_info, stats_info) def test_convert_to_byte(self): @@ -83,7 +62,7 @@ class TestDockerStats(unittest.TestCase): def test_parse_percentile(self): data = "24.45%" result = parse_percentile(data) - target = "24.45" + target = 24.45 self.assertEqual(target, result) if __name__ == '__main__': diff --git a/src/job-exporter/test/test_gpu_exporter.py b/src/job-exporter/test/test_gpu_exporter.py deleted file mode 100644 index 55cf61d67..000000000 --- a/src/job-exporter/test/test_gpu_exporter.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -import os -import sys -import unittest -import yaml -import logging -import logging.config - -sys.path.append(os.path.abspath("../src/")) - -import gpu_exporter -from utils import Metric - -class TestGPUExporter(unittest.TestCase): - """ - Test gpu_exporter.py - """ - def setUp(self): - try: - os.chdir(os.path.abspath("test")) - except: - pass - - configuration_path = "logging.yaml" - - if os.path.exists(configuration_path): - with open(configuration_path, 'rt') as f: - logging_configuration = yaml.safe_load(f.read()) - logging.config.dictConfig(logging_configuration) - logging.getLogger() - - - def tearDown(self): - try: - os.chdir(os.path.abspath("..")) - except: - pass - - def test_parse_smi_xml_result(self): - sample_path = "data/nvidia_smi_sample.xml" - file = open(sample_path, "r") - nvidia_smi_result = file.read() - nvidia_smi_parse_result = gpu_exporter.parse_smi_xml_result(nvidia_smi_result) - target_smi_info = {'1': {'gpu_util': u'98', 'gpu_mem_util': u'97'}, - '0': {'gpu_util': u'100', 'gpu_mem_util': u'99'}} - self.assertEqual(target_smi_info, nvidia_smi_parse_result) - - def test_convert_gpu_info_to_metrics(self): - info = {'1': {'gpu_util': u'98', 'gpu_mem_util': u'97'}, - '0': {'gpu_util': u'100', 'gpu_mem_util': u'99'}} - metrics = gpu_exporter.convert_gpu_info_to_metrics(info) - self.assertEqual(5, len(metrics)) - - self.assertIn(Metric("nvidiasmi_attached_gpus", {}, 2), metrics) - self.assertIn(Metric("nvidiasmi_utilization_gpu", {"minor_number": "0"}, "100"), - metrics) - self.assertIn(Metric("nvidiasmi_utilization_memory", {"minor_number": "0"}, "99"), - metrics) - self.assertIn(Metric("nvidiasmi_utilization_gpu", {"minor_number": "1"}, "98"), - metrics) - self.assertIn(Metric("nvidiasmi_utilization_memory", {"minor_number": "1"}, "97"), - metrics) - - def test_exporter_will_not_report_unsupported_gpu(self): - sample_path = "data/nvidia_smi_outdated_gpu.xml" - file = open(sample_path, "r") - nvidia_smi_result = file.read() - nvidia_smi_parse_result = gpu_exporter.parse_smi_xml_result(nvidia_smi_result) - self.assertEqual({}, nvidia_smi_parse_result) - - -if __name__ == '__main__': - unittest.main() diff --git a/src/job-exporter/test/test_job_exporter.py b/src/job-exporter/test/test_job_exporter.py deleted file mode 100644 index 63b67ddb4..000000000 --- a/src/job-exporter/test/test_job_exporter.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (c) Microsoft Corporation -# All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the "Software"), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and -# to permit persons to whom the Software is furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING -# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -import os -import sys -import copy -import unittest -import yaml -import logging -import logging.config -import datetime - -sys.path.append(os.path.abspath("../src/")) - -import job_exporter - -log = logging.getLogger(__name__) - -class TestJobExporter(unittest.TestCase): - """ - Test job_exporter.py - """ - def setUp(self): - try: - os.chdir(os.path.abspath("test")) - except: - pass - - configuration_path = "logging.yaml" - - if os.path.exists(configuration_path): - with open(configuration_path, 'rt') as f: - logging_configuration = yaml.safe_load(f.read()) - logging.config.dictConfig(logging_configuration) - logging.getLogger() - - - def tearDown(self): - try: - os.chdir(os.path.abspath("..")) - except: - pass - - def test_parse_from_labels(self): - labels = {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"} - gpuIds, otherLabels = job_exporter.parse_from_labels(labels) - self.assertEqual(["0", "1"], gpuIds,) - copied = copy.deepcopy(labels) - copied.pop("container_label_GPU_ID") - self.assertEqual(copied, otherLabels) - - def test_generate_zombie_count_type1(self): - zombies = job_exporter.ZombieRecorder() - - start = datetime.datetime.now() - - one_sec = datetime.timedelta(seconds=1) - - self.assertEqual(0, - job_exporter.generate_zombie_count_type1(zombies, {"a", "b"}, start)) - self.assertEqual(2, len(zombies)) - - self.assertEqual(0, - job_exporter.generate_zombie_count_type1(zombies, {"a", "b"}, - start + zombies.decay_time - one_sec)) - self.assertEqual(2, len(zombies)) - - self.assertEqual(2, - job_exporter.generate_zombie_count_type1(zombies, {"a", "b"}, - start + zombies.decay_time + one_sec)) - self.assertEqual(2, len(zombies)) - - self.assertEqual(1, - job_exporter.generate_zombie_count_type1(zombies, {"a"}, - start + zombies.decay_time + 2 *one_sec)) - self.assertEqual(1, len(zombies)) - - self.assertEqual(0, - job_exporter.generate_zombie_count_type1(zombies, {}, - start + zombies.decay_time + 3 * one_sec)) - self.assertEqual(0, len(zombies)) - - - def test_generate_zombie_count_type2(self): - zombies = job_exporter.ZombieRecorder() - - start = datetime.datetime.now() - - one_sec = datetime.timedelta(seconds=1) - - stats = {"43ffe701d883": - {"name": "core-caffe2_resnet50_20181012040921.586-container_e03_1539312078880_0780_01_000002"}, - "8de2f53e64cb": - {"name": "container_e03_1539312078880_0780_01_000002"}} - - self.assertEqual(0, - job_exporter.generate_zombie_count_type2(zombies, stats, start)) - - stats.pop("8de2f53e64cb") - - self.assertEqual(0, - job_exporter.generate_zombie_count_type2(zombies, stats, start + one_sec)) - - self.assertEqual(0, - job_exporter.generate_zombie_count_type2(zombies, stats, - start + zombies.decay_time)) - - self.assertEqual(1, - job_exporter.generate_zombie_count_type2(zombies, stats, - start + zombies.decay_time + 2 * one_sec)) - - stats.pop("43ffe701d883") - - self.assertEqual(0, - job_exporter.generate_zombie_count_type2(zombies, stats, - start + zombies.decay_time + 3 * one_sec)) - -if __name__ == '__main__': - unittest.main() diff --git a/src/job-exporter/src/no_older_than.py b/src/job-exporter/test/test_network.py similarity index 57% rename from src/job-exporter/src/no_older_than.py rename to src/job-exporter/test/test_network.py index 03ccbda35..d0910aeaa 100644 --- a/src/job-exporter/src/no_older_than.py +++ b/src/job-exporter/test/test_network.py @@ -1,4 +1,3 @@ -#!/usr/bin/python # Copyright (c) Microsoft Corporation # All rights reserved. # @@ -16,27 +15,27 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -import argparse -import datetime import os +import sys +import unittest -def check_no_older_than(paths, delta): - """ raise RuntimeError exception if any path in paths is older than `now - delta` """ - now = datetime.datetime.now() - delta = datetime.timedelta(seconds=delta) - oldest = now - delta +import base - for path in paths: - mtime = os.path.getmtime(path) - mtime = datetime.datetime.fromtimestamp(mtime) - if oldest > mtime: - raise RuntimeError("{} was updated more than {} seconds ago".format(path, delta)) +sys.path.append(os.path.abspath("../src/")) +import network -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("paths", nargs="+", help="file to be checked") - parser.add_argument("-d", "--delta", type=int, default=60, help="check file is no older than -d seconds") - args = parser.parse_args() +class TestNetwork(base.TestBase): + """ + Test network.py + """ - check_no_older_than(args.paths, args.delta) + def test_parse_lsof(self): + output = """COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME +python3 52485 dixu 5u IPv4 420398429 0t0 TCP 10.150.148.166:43682->198.100.183.212:443 (ESTABLISHED) +0t0 TCP 10.151.40.4:36090->10.151.40.4:8031 (ESTABLISHED) + """ + result = network.parse_lsof(output) + +if __name__ == '__main__': + unittest.main() diff --git a/src/job-exporter/test/test_nvidia.py b/src/job-exporter/test/test_nvidia.py new file mode 100644 index 000000000..c02b4feb5 --- /dev/null +++ b/src/job-exporter/test/test_nvidia.py @@ -0,0 +1,50 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import os +import sys +import unittest + +import base + +sys.path.append(os.path.abspath("../src/")) + +import nvidia + +class TestNvidia(base.TestBase): + """ + Test nvidia.py + """ + def test_parse_smi_xml_result(self): + sample_path = "data/nvidia_smi_sample.xml" + with open(sample_path, "r") as f: + nvidia_smi_result = f.read() + nvidia_smi_parse_result = nvidia.parse_smi_xml_result(nvidia_smi_result) + target_smi_info = {'1': {'gpu_util': 98, 'gpu_mem_util': 97}, + '0': {'gpu_util': 100, 'gpu_mem_util': 99}} + self.assertEqual(target_smi_info, nvidia_smi_parse_result) + + def test_exporter_will_not_report_unsupported_gpu(self): + sample_path = "data/nvidia_smi_outdated_gpu.xml" + with open(sample_path, "r") as f: + nvidia_smi_result = f.read() + nvidia_smi_parse_result = nvidia.parse_smi_xml_result(nvidia_smi_result) + self.assertEqual({}, nvidia_smi_parse_result) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/job-exporter/test/test_utils.py b/src/job-exporter/test/test_utils.py index b2920a648..092d502c5 100644 --- a/src/job-exporter/test/test_utils.py +++ b/src/job-exporter/test/test_utils.py @@ -18,139 +18,35 @@ import os import sys import unittest -import yaml -import threading -import logging -import logging.config -import tempfile +import subprocess + +import base sys.path.append(os.path.abspath("../src/")) import utils -from utils import Metric -log = logging.getLogger(__name__) - -class TestUtils(unittest.TestCase): +class TestUtils(base.TestBase): """ Test utils.py """ - def setUp(self): - try: - os.chdir(os.path.abspath("test")) - except: - pass + def test_walk_json_field_safe(self): + self.assertIsNone(utils.walk_json_field_safe(None, 1, "abc")) + self.assertIsNone(utils.walk_json_field_safe([], 1, "abc")) + self.assertIsNone(utils.walk_json_field_safe([{"abc"}], 1, "abc")) + self.assertEqual("345", + utils.walk_json_field_safe([{"name": "123"}, {"name": "345"}], 1, "name")) - configuration_path = "logging.yaml" + def test_exec_cmd_with_0_return_value(self): + self.assertEqual("10\n", utils.exec_cmd(["echo", "10"])) - if os.path.exists(configuration_path): - with open(configuration_path, 'rt') as f: - logging_configuration = yaml.safe_load(f.read()) - logging.config.dictConfig(logging_configuration) - logging.getLogger() + def test_exec_cmd_with_timeout(self): + with self.assertRaises(subprocess.TimeoutExpired) as context: + utils.exec_cmd(["sleep", "10"], timeout=1) - - def tearDown(self): - try: - os.chdir(os.path.abspath("..")) - except: - pass - - def test_metrics_eq(self): - self.assertEqual(Metric("foo", {"abc": "1"}, "3"), - Metric("foo", {"abc": "1"}, "3")) - - self.assertNotEqual(Metric("foo", {"abc": "1"}, "3"), - Metric("bar", {"abc": "1"}, "3")) - self.assertNotEqual(Metric("foo", {"abc": "2"}, "3"), - Metric("foo", {"abc": "1"}, "3")) - self.assertNotEqual(Metric("foo", {"abc": "2"}, "3"), - Metric("foo", {"abc": "2"}, "5")) - - def test_export_metrics_to_file(self): - metrics = [] - metrics.append(Metric("foo", {"bar": 2}, "3")) - metrics.append(Metric("bar", {}, "4")) - with tempfile.NamedTemporaryFile() as f: - utils.export_metrics_to_file(f.name, metrics) - lines = f.readlines() - self.assertEqual("foo{bar=\"2\"} 3", lines[0].strip()) - self.assertEqual("bar 4", lines[1].strip()) - - def test_singleton_normal(self): - def getter(): - return 100 - - singleton = utils.Singleton(getter) - - for _ in xrange(10): - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertFalse(is_old) - - def test_singleton_with_blocking_getter_no_old_data(self): - semaphore = threading.Semaphore(1) - - def blocking_getter(): - semaphore.acquire(blocking=True) - semaphore.release() - return 100 - - singleton = utils.Singleton(blocking_getter, get_timeout_s=0.2) - - val, is_old = singleton.try_get() - self.assertIsNotNone(val) - self.assertFalse(is_old) - - for _ in xrange(3): - semaphore.acquire() - - for _ in xrange(3): - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertTrue(is_old) - - semaphore.release() - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertFalse(is_old) - - def test_singleton_with_blocking_getter_allow_old_data(self): - semaphore = threading.Semaphore(1) - - def blocking_getter(): - semaphore.acquire(blocking=True) - semaphore.release() - return 100 - - singleton = utils.Singleton(blocking_getter, get_timeout_s=0.2) - - semaphore.acquire() - - for _ in xrange(3): - val, is_old = singleton.try_get() - self.assertIsNone(val) - self.assertTrue(is_old) - - semaphore.release() - # let singleton cache one value - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertFalse(is_old) - - for _ in xrange(3): - semaphore.acquire() - - for _ in xrange(3): - # singleton returns old value - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertTrue(is_old) - - semaphore.release() - val, is_old = singleton.try_get() - self.assertEqual(100, val) - self.assertFalse(is_old) + def test_exec_cmd_with_non_0_return_value(self): + with self.assertRaises(subprocess.CalledProcessError) as context: + utils.exec_cmd(["false"]) if __name__ == '__main__': unittest.main() diff --git a/src/node-exporter/config/node_exporter.py b/src/node-exporter/config/node_exporter.py index c9e5aacc4..c52a15237 100644 --- a/src/node-exporter/config/node_exporter.py +++ b/src/node-exporter/config/node_exporter.py @@ -9,8 +9,6 @@ class NodeExporter(object): self.default_service_conf = default_service_conf def validation_pre(self): - print("cluster_config is %s, service_config is %s, default is %s" % \ - (self.cluster_conf, self.service_conf, self.default_service_conf)) return True, None def run(self): diff --git a/src/node-exporter/deploy/node-exporter.yaml.template b/src/node-exporter/deploy/node-exporter.yaml.template index 68e474772..f20e086b5 100644 --- a/src/node-exporter/deploy/node-exporter.yaml.template +++ b/src/node-exporter/deploy/node-exporter.yaml.template @@ -18,8 +18,6 @@ apiVersion: apps/v1 kind: DaemonSet metadata: - annotations: - prometheus.io/scrape: 'true' name: node-exporter spec: selector: @@ -29,6 +27,10 @@ spec: metadata: labels: app: node-exporter + annotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "{{ cluster_cfg["node-exporter"]["port"] }}" name: node-exporter spec: containers: @@ -38,17 +40,13 @@ spec: httpGet: path: /metrics port: {{ cluster_cfg["node-exporter"]["port"] }} - initialDelaySeconds: 30 + initialDelaySeconds: 3 periodSeconds: 30 resources: limits: memory: "128Mi" - volumeMounts: - - mountPath: /datastorage/prometheus - name: collector-mount name: node-exporter args: - - '--collector.textfile.directory=/datastorage/prometheus' - '--no-collector.arp' - '--no-collector.bcache' - '--no-collector.bonding' @@ -82,84 +80,9 @@ spec: - containerPort: {{ cluster_cfg["node-exporter"]["port"] }} hostPort: {{ cluster_cfg["node-exporter"]["port"] }} name: scrape - - image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}job-exporter:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }} - imagePullPolicy: Always - readinessProbe: - exec: - command: - - "python" - - "/job_exporter/no_older_than.py" - - "--delta" - - "120" - - "/datastorage/prometheus/job_exporter.prom" - - "/datastorage/prometheus/gpu_exporter.prom" - initialDelaySeconds: 30 - periodSeconds: 30 - resources: - limits: - memory: "128Mi" - securityContext: - privileged: true # this is required by job-exporter - env: - - name: HOST_IP - valueFrom: - fieldRef: - fieldPath: status.hostIP - - name: LOGGING_LEVEL - value: INFO - - name: NV_DRIVER - value: /var/drivers/nvidia/current - volumeMounts: - - mountPath: /root/.docker - name: docker-cred-volume - - mountPath: /bin/docker - name: docker-bin - - mountPath: /var/run/docker.sock - name: docker-socket - - mountPath: /dev - name: device-mount - - mountPath: /var/drivers/nvidia/current - name: driver-path - - mountPath: /datastorage/prometheus - name: collector-mount - - mountPath: /gpu-config - name: gpu-config - name: job-exporter - volumes: - - name: docker-bin - hostPath: - path: /bin/docker - - name: docker-socket - hostPath: - path: /var/run/docker.sock - - name: docker-cred-volume - configMap: - name: docker-credentials - - name: device-mount - hostPath: - path: /dev - - name: driver-path - hostPath: - path: /var/drivers/nvidia/current - - name: collector-mount - hostPath: - path: {{ cluster_cfg["cluster"]["common"]["data-path"] }}/prometheus - - name: rootfs - hostPath: - path: / - - name: var-run - hostPath: - path: /var/run - - name: sys - hostPath: - path: /sys - - name: gpu-config - configMap: - name: gpu-configuration imagePullSecrets: - name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }} hostNetwork: true - hostPID: true # This is required since job-exporter should get list of pid in container tolerations: - key: node.kubernetes.io/memory-pressure operator: "Exists" diff --git a/src/node-exporter/deploy/service.yaml b/src/node-exporter/deploy/service.yaml index 868b5034e..fb6fcbe79 100644 --- a/src/node-exporter/deploy/service.yaml +++ b/src/node-exporter/deploy/service.yaml @@ -17,7 +17,6 @@ prerequisite: - cluster-configuration - - drivers template-list: - node-exporter.yaml diff --git a/src/prometheus/deploy/alerting/pai-services.rules b/src/prometheus/deploy/alerting/pai-services.rules index 04d86c802..87d8b7432 100644 --- a/src/prometheus/deploy/alerting/pai-services.rules +++ b/src/prometheus/deploy/alerting/pai-services.rules @@ -41,3 +41,11 @@ groups: type: pai_service annotations: summary: "{{$labels.job}} in {{$labels.instance}} not up detected" + + - alert: JobExporterHangs + expr: irate(collector_iteration_count_total[5m]) == 0 + for: 5m + labels: + type: pai_service + annotations: + summary: "{{$labels.name}} in {{$labels.instance}} hangs detected" diff --git a/src/prometheus/deploy/prometheus-configmap.yaml.template b/src/prometheus/deploy/prometheus-configmap.yaml.template index 193d303a7..903d6365b 100644 --- a/src/prometheus/deploy/prometheus-configmap.yaml.template +++ b/src/prometheus/deploy/prometheus-configmap.yaml.template @@ -29,19 +29,7 @@ data: rule_files: - "/etc/prometheus-alert/*.rules" scrape_configs: - - job_name: "node_exporter" - scrape_interval: {{ prom_info["scrape_interval"] }}s - kubernetes_sd_configs: - - api_server: "{{ cluster_cfg["kubernetes"]["api-servers-url"] }}" - role: node - # Extract label __address__ inner ip address and replace the port to exporter"s port to build target address. - # Prometheus will fetch data from target address. - relabel_configs: - - source_labels: [__address__] - regex: '([^:]+)(:\d*)?' - replacement: "${1}:{{ cluster_cfg["node-exporter"]["port"] }}" - target_label: __address__ - - job_name: "pai_serivce_exporter" + - job_name: 'pai_serivce_exporter' scrape_interval: {{ prom_info["scrape_interval"] }}s kubernetes_sd_configs: - api_server: "{{ cluster_cfg["kubernetes"]["api-servers-url"] }}" diff --git a/src/watchdog/deploy/watchdog.yaml.template b/src/watchdog/deploy/watchdog.yaml.template index 2ec6881fc..060cb2ef1 100644 --- a/src/watchdog/deploy/watchdog.yaml.template +++ b/src/watchdog/deploy/watchdog.yaml.template @@ -39,8 +39,7 @@ spec: image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}watchdog:{{cluster_cfg["cluster"]["docker-registry"]["tag"]}} imagePullPolicy: Always readinessProbe: - httpGet: - path: / + tcpSocket: # because http get will trigger job-exporter abandom old metrics, so we use tcp instead of http port: 9101 initialDelaySeconds: 30 periodSeconds: 30 diff --git a/src/watchdog/src/watchdog.py b/src/watchdog/src/watchdog.py index 40bc13e6d..4787d1983 100644 --- a/src/watchdog/src/watchdog.py +++ b/src/watchdog/src/watchdog.py @@ -23,7 +23,6 @@ import json import sys import requests import logging -from logging.handlers import RotatingFileHandler import time import threading