* refactor job-exporter

* select a network interface (#1849)

* do not use irate in task_net* and service_net* metric (#1859)
This commit is contained in:
Di Xu 2018-12-11 12:58:39 +08:00 коммит произвёл GitHub
Родитель 274fba0da5
Коммит 2c08713a45
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
42 изменённых файлов: 1667 добавлений и 1220 удалений

Просмотреть файл

@ -35,13 +35,13 @@ matrix:
- python -m unittest deployment.clusterObjectModel.test.test_template_generate
- language: python
python: 2.7
python: 3.6
before_install:
- cd src/job-exporter/test
install:
- pip install pyyaml
- pip install prometheus_client
script:
- python -m unittest discover .
- python3 -m unittest discover .
- language: python
python: 2.7

Просмотреть файл

@ -93,6 +93,12 @@ rest-server:
# node-exporter:
# port: 9100
# uncomment following if you want to customeize job-exporter
# job-exporter:
# port: 9102
# logging-level: INFO
# interface: eth0,eno2
# if you want to enable alert manager to send alert email, uncomment following lines and fill
# the right values.
# alert-manager:

Просмотреть файл

@ -60,14 +60,14 @@ Other pai component also used some metrics for display, they are:
<li>node_network_transmit_bytes_total</li>
<li>node_disk_read_bytes_total</li>
<li>node_disk_written_bytes_total</li>
<li>container_CPUPerc</li>
<li>container_MemUsage</li>
<li>container_NetIn</li>
<li>container_NetOut</li>
<li>container_BlockIn</li>
<li>container_BlockOut</li>
<li>container_GPUPerc</li>
<li>container_GPUMemPerc</li>
<li>task_cpu_percent</li>
<li>task_mem_usage_byte</li>
<li>task_net_in_byte</li>
<li>task_net_out_byte</li>
<li>task_block_in_byte</li>
<li>task_block_out_byte</li>
<li>task_gpu_percent</li>
<li>task_gpu_mem_percent</li>
</ul>
</td>
</tr>

Просмотреть файл

@ -5,19 +5,18 @@ Some important metrics are listed below.
| Metrics Name | By | Description |
| --- | --- | --- |
| `configured_gpu_count` | `job_exporter` | number of gpu configured by user |
| `nvidiasmi_attached_gpus` | `job_exporter` | number of gpu detectived by nvidiasmi, this metric may change due to nvidia-smi hangs |
| `nvidiasmi_utilization_gpu` | `job_exporter` | GPU utilization detectived by nvidiasmi |
| `nvidiasmi_utilization_memory` | `job_exporter` | GPU memory utilization detectiving by nvidiasmi |
| `container_GPUPerc` | `job_exporter` | GPU utilization by specified job container |
| `container_GPUMemPerc` | `job_exporter` | GPU memory utilization by specified job container |
| `container_CPUPerc` | `job_exporter` | CPU utilization by job detectived by docker stats |
| `container_MemUsage` | `job_exporter` | Memory usage by job detectived by docker stats (byte) |
| `container_MemLimit` | `job_exporter` | Memory limit by job detectived by docker stats (byte) |
| `container_MemPerc` | `job_exporter` | Memory utilization by job detectived by docker stats |
| `container_NetIn` | `job_exporter` | Network in traffic by job detectived by docker stats (byte) |
| `container_NetOut` | `job_exporter` | Network out traffic by job detectived by docker stats (byte) |
| `container_BlockIn` | `job_exporter` | Block io in traffic by job detectived by docker stats (byte) |
| `container_BlockOut` | `job_exporter` | Block io out traffic by job detectived by docker stats (byte) |
| `task_gpu_percent` | `job_exporter` | GPU utilization by specified job container |
| `task_gpu_mem_percent` | `job_exporter` | GPU memory utilization by specified job container |
| `task_cpu_percent` | `job_exporter` | CPU utilization by job detectived by docker stats |
| `task_mem_usage_byte` | `job_exporter` | Memory usage by job detectived by docker stats (byte) |
| `task_mem_limit_byte` | `job_exporter` | Memory limit by job detectived by docker stats (byte) |
| `task_mem_usage_percent` | `job_exporter` | Memory utilization by job detectived by docker stats |
| `task_net_in_byte` | `job_exporter` | Network in traffic by job detectived by docker stats (byte) |
| `task_net_out_byte` | `job_exporter` | Network out traffic by job detectived by docker stats (byte) |
| `task_block_in_byte` | `job_exporter` | Block io in traffic by job detectived by docker stats (byte) |
| `task_block_out_byte` | `job_exporter` | Block io out traffic by job detectived by docker stats (byte) |
| `node_filefd_allocated` | `node_exporter` | Number of file descriptor allocated in node |
| `node_disk_read_time_ms` | `node_exporter` | Node disk read time (ms) |
| `node_disk_write_time_ms` | `node_exporter` | Node disk write time (ms) |

Просмотреть файл

@ -91,7 +91,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME)(container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"}) ",
"expr": "avg by (container_label_PAI_JOB_NAME)(task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"}) ",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
@ -191,7 +191,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME) (container_MemUsage{ container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME) (task_mem_usage_byte{ container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"hide": false,
"instant": false,
@ -283,7 +283,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])) ",
"expr": "avg by (container_label_PAI_JOB_NAME) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -294,7 +294,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])) ",
"expr": "avg by (container_label_PAI_JOB_NAME) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"format": "time_series",
"hide": false,
"interval": "",
@ -390,7 +390,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_BlockIn{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(task_block_in_byte{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -401,7 +401,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(container_BlockOut{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME) (irate(task_block_out_byte{ container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}",
@ -480,7 +480,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -563,7 +563,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
@ -637,7 +637,7 @@
"multiFormat": "regex values",
"name": "job",
"options": [],
"query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)",
"query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)",
"refresh": 1,
"regex": "^(?!\\s*$).+",
"sort": 1,

Просмотреть файл

@ -224,7 +224,7 @@
"steppedLine": false,
"targets": [
{
"expr": "irate(service_net_in_byte{name=\"$service_name\"}[{{interval}}s])",
"expr": "service_net_in_byte{name=\"$service_name\"}[{{interval}}s]",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -232,7 +232,7 @@
"refId": "A"
},
{
"expr": "irate(service_net_out_byte{name=\"$service_name\"}[{{interval}}s])",
"expr": "service_net_out_byte{name=\"$service_name\"}[{{interval}}s]",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{'{{'}}instance{{'}}'}} Outbound",

Просмотреть файл

@ -108,7 +108,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
@ -208,7 +208,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_MemUsage{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_mem_usage_byte{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"hide": false,
"instant": false,
@ -300,7 +300,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"interval": "",
"intervalFactor": 2,
"legendFormat": "In {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}",
@ -310,7 +310,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"format": "time_series",
"hide": false,
"interval": "",
@ -406,7 +406,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_BlockIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(task_block_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"interval": "",
"intervalFactor": 2,
"legendFormat": "Write {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}",
@ -416,7 +416,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(container_BlockOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (irate(task_block_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"intervalFactor": 2,
"legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}",
"refId": "B"
@ -494,7 +494,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -592,7 +592,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_label_PAI_CURRENT_TASK_ROLE_NAME}}'}}",
@ -665,7 +665,7 @@
"multiFormat": "regex values",
"name": "job",
"options": [],
"query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)",
"query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)",
"refresh": 1,
"regex": "^(?!\\s*$).+",
"sort": 1,

Просмотреть файл

@ -96,7 +96,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (container_CPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_cpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
@ -196,7 +196,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (container_MemUsage{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_mem_usage_byte{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"hide": false,
"instant": false,
@ -288,7 +288,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (irate(container_NetIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_net_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -299,7 +299,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (irate(container_NetOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX) (task_net_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s])",
"format": "time_series",
"hide": false,
"interval": "",
@ -395,7 +395,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(container_BlockIn{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(task_block_in_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -406,7 +406,7 @@
"target": ""
},
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(container_BlockOut{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (irate(task_block_out_byte{container_label_PAI_JOB_NAME=~\"$job\"}[{{interval}}s]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "Read {{'{{container_label_PAI_JOB_NAME}}'}}-{{'{{container_env_PAI_TASK_INDEX}}'}}",
@ -485,7 +485,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -568,7 +568,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
@ -673,7 +673,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX, minor_number)(container_GPUPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME,container_env_PAI_TASK_INDEX, minor_number)(task_gpu_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"format": "time_series",
"hide": false,
"instant": false,
@ -771,7 +771,7 @@
"steppedLine": false,
"targets": [
{
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX, minor_number) (container_GPUMemPerc{container_label_PAI_JOB_NAME=~\"$job\"})",
"expr": "avg by (container_label_PAI_JOB_NAME, container_label_PAI_CURRENT_TASK_ROLE_NAME, container_env_PAI_TASK_INDEX, minor_number) (task_gpu_mem_percent{container_label_PAI_JOB_NAME=~\"$job\"})",
"hide": false,
"instant": false,
"interval": "",
@ -850,7 +850,7 @@
"multiFormat": "regex values",
"name": "job",
"options": [],
"query": "label_values(container_CPUPerc, container_label_PAI_JOB_NAME)",
"query": "label_values(task_cpu_percent, container_label_PAI_JOB_NAME)",
"refresh": 1,
"regex": "^(?!\\s*$).+",
"sort": 1,

Просмотреть файл

@ -15,13 +15,13 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
FROM python:2.7
FROM python:3.7
RUN apt-get update && apt-get install --no-install-recommends -y build-essential git && \
git clone https://github.com/yadutaf/infilter --depth 1 && \
cd infilter && make
FROM python:2.7
FROM python:3.7
RUN curl -SL https://download.docker.com/linux/static/stable/x86_64/docker-17.06.2-ce.tgz \
| tar -xzvC /usr/local && \
@ -30,7 +30,7 @@ RUN curl -SL https://download.docker.com/linux/static/stable/x86_64/docker-17.06
mkdir -p /job_exporter && \
rm -rf /var/lib/apt/lists/*
COPY --from=0 infilter/infilter /usr/bin
COPY src/* /job_exporter/
RUN pip3 install prometheus_client
CMD python /job_exporter/job_exporter.py /datastorage/prometheus 30
COPY --from=0 infilter/infilter /usr/bin
COPY src/*.py /job_exporter/

Просмотреть файл

@ -0,0 +1,73 @@
## Job-exporter section parser
- [Default Configuration](#D_Config)
- [How to Configure](#HT_Config)
- [Generated Configuraiton](#G_Config)
- [Data Table](#T_config)
#### Default configuration <a name="D_Config"></a>
[job-exporter default configuration](job-exporter.yaml)
#### How to configure cluster section in service-configuraiton.yaml <a name="HT_Config"></a>
All configurations in this section is optional. If you want to customized these value, you can configure it in service-configuration.yaml.
For example, if you want to use different port than the default 9102, add following to your service-configuration.yaml as following:
```yaml
job-exporter:
port: new-value
logging-level: DEBUG
```
job-exporter needs to expose network related metrics by listening to one of network interface.
By default, job-exporter will listen to card of eth0 or eno2 whichever it found in the node.
But if your card name is different from these, you should edit job-exporter config in your
service-configuration.yaml like following:
```yaml
job-exporter:
interface: eth1,eno1
```
the interface field is comma separated string, and job-exporter will listen to the one that found in your node. If none of interfaces found, job-exporter will select one that can access the internet.
#### Generated Configuration <a name="G_Config"></a>
Generated configuration means the object model after parsing. The parsed data will be presented by a yaml format.
```yaml
job-exporter:
port: 9100
logging-level: DEBUG
interface: eth0,eno2
```
#### Table <a name="T_Config"></a>
<table>
<tr>
<td>Data in Configuration File</td>
<td>Data in Cluster Object Model</td>
<td>Data in Jinja2 Template</td>
<td>Data type</td>
</tr>
<tr>
<td>job-exporter.port</td>
<td>com["job-exporter"]["port"]</td>
<td>cluster_cfg["job-exporter"]["port"]</td>
<td>Int</td>
</tr>
<tr>
<td>job-exporter.logging-level</td>
<td>com["job-exporter"]["logging-level"]</td>
<td>cluster_cfg["job-exporter"]["logging-level"]</td>
<td>String</td>
</tr>
<tr>
<td>job-exporter.interface</td>
<td>com["job-exporter"]["interface"]</td>
<td>cluster_cfg["job-exporter"]["interface"]</td>
<td>String</td>
</tr>
</table>

Просмотреть файл

@ -0,0 +1,20 @@
# Copyright (c) Microsoft Corporation
# # All rights reserved.
# #
# # MIT License
# #
# # Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# # documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# # the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# # to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# # The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
# #
# # THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# # BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
port: 9102
logging-level: INFO
interface: eth0,eno2

Просмотреть файл

@ -0,0 +1,30 @@
#!/usr/bin/env python
import copy
class JobExporter(object):
def __init__(self, cluster_conf, service_conf, default_service_conf):
self.cluster_conf = cluster_conf
self.service_conf = service_conf
self.default_service_conf = default_service_conf
def validation_pre(self):
return True, None
def run(self):
result = copy.deepcopy(self.default_service_conf)
result.update(self.service_conf)
return result
def validation_post(self, conf):
port = conf["job-exporter"].get("port")
if type(port) != int:
msg = "expect port in job-exporter to be int but get %s with type %s" % \
(port, type(port))
return False, msg
level = conf["job-exporter"].get("logging-level")
if level not in {"DEBUG", "INFO", "WARNING"}:
msg = "expect logging-level in job-exporter to be {'DEBUG', 'INFO', 'WARNING'} but got %s" % \
(level)
return False, msg
return True, None

Просмотреть файл

@ -1,3 +1,5 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -15,26 +17,10 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
version: 1
disable_existing_loggers: False
formatters:
pai_k8sdp:
format: "%(asctime)s [%(levelname)s] - %(name)s : %(message)s"
pushd $(dirname "$0") > /dev/null
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: pai_k8sdp
stream: ext://sys.stdout
loggers:
unittest_module:
level: INFO
handler: [console]
propagate: no
echo "Call stop script to stop all service first"
/bin/bash stop.sh || exit $?
root:
level: INFO
handlers: [console]
popd > /dev/null

Просмотреть файл

@ -0,0 +1,107 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: job-exporter
spec:
selector:
matchLabels:
app: job-exporter
template:
metadata:
labels:
app: job-exporter
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/"
prometheus.io/port: "{{ cluster_cfg["job-exporter"]["port"] }}"
name: job-exporter
spec:
containers:
- image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}job-exporter:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
readinessProbe:
tcpSocket: # because http get will trigger job-exporter abandom old metrics, so we use tcp instead of http
port: {{ cluster_cfg["job-exporter"]["port"] }}
initialDelaySeconds: 3
periodSeconds: 30
command:
- "python"
- "/job_exporter/main.py"
- "--port"
- "{{ cluster_cfg["job-exporter"]["port"] }}"
- "--interval"
- "{{ cluster_cfg["prometheus"]["scrape_interval"] }}"
- "--interface"
- "{{ cluster_cfg["job-exporter"]["interface"] }}"
resources:
limits:
memory: "128Mi"
securityContext:
privileged: true # this is required since job-exporter will call setns to other containers
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: LOGGING_LEVEL
value: {{ cluster_cfg["job-exporter"]["logging-level"] }}
- name: NV_DRIVER
value: /var/drivers/nvidia/current
volumeMounts:
- mountPath: /var/run/docker.sock
name: docker-socket
- mountPath: /dev
name: device-mount
- mountPath: /var/drivers/nvidia/current
name: driver-path
- mountPath: /datastorage/prometheus
name: collector-mount
- mountPath: /gpu-config
name: gpu-config
name: job-exporter
ports:
- containerPort: {{ cluster_cfg["job-exporter"]["port"] }}
hostPort: {{ cluster_cfg["job-exporter"]["port"] }}
name: main
volumes:
- name: docker-socket
hostPath:
path: /var/run/docker.sock
- name: device-mount
hostPath:
path: /dev
- name: driver-path
hostPath:
path: /var/drivers/nvidia/current
- name: collector-mount
hostPath:
path: {{ cluster_cfg["cluster"]["common"]["data-path"] }}/prometheus
- name: gpu-config
configMap:
name: gpu-configuration
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
hostNetwork: true
hostPID: true # This is required since job-exporter should get list of pid in container
tolerations:
- key: node.kubernetes.io/memory-pressure
operator: "Exists"
- key: node.kubernetes.io/disk-pressure
operator: "Exists"

Просмотреть файл

@ -0,0 +1,25 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
pushd $(dirname "$0") > /dev/null
popd > /dev/null

Просмотреть файл

@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
prerequisite:
- cluster-configuration
- drivers
template-list:
- job-exporter.yaml
start-script: start.sh
stop-script: stop.sh
delete-script: delete.sh
refresh-script: refresh.sh
upgraded-script: upgraded.sh
deploy-rules:
- notin: no-jobexporter

Просмотреть файл

@ -0,0 +1,29 @@
#!/bin/bash
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
pushd $(dirname "$0") > /dev/null
kubectl apply --overwrite=true -f job-exporter.yaml || exit $?
# Wait until the service is ready.
PYTHONPATH="../../../deployment" python -m k8sPaiLibrary.monitorTool.check_pod_ready_status -w -k app -v job-exporter || exit $?
popd > /dev/null

Просмотреть файл

@ -0,0 +1,22 @@
#!/bin/sh
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
kubectl delete --ignore-not-found --now "daemonset/job-exporter"

Просмотреть файл

@ -0,0 +1,617 @@
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import re
import datetime
import logging
import threading
import subprocess
import time
import copy
from prometheus_client import make_wsgi_app, Counter, Gauge, Histogram
from prometheus_client.core import GaugeMetricFamily
import network
import utils
import docker_inspect
import docker_stats
import nvidia
logger = logging.getLogger(__name__)
##### collector will generate following metrics
# Document about these metrics is in `` # TODO
iteration_counter = Counter("collector_iteration_count", "total number of iteration",
["name"])
def gen_docker_daemon_counter():
return GaugeMetricFamily("docker_daemon_count",
"count of docker daemon",
labels=["error"])
def gen_gpu_util_gauge():
return GaugeMetricFamily("nvidiasmi_utilization_gpu",
"gpu core utilization of card",
labels=["minor_number"])
def gen_gpu_mem_util_gauge():
return GaugeMetricFamily("nvidiasmi_utilization_memory",
"gpu memory utilization of card",
labels=["minor_number"])
class ResourceGauges(object):
def __init__(self):
self.task_labels = [
"container_env_PAI_TASK_INDEX",
"container_label_PAI_CURRENT_TASK_ROLE_NAME",
"container_label_PAI_HOSTNAME",
"container_label_PAI_JOB_NAME",
"container_label_PAI_USER_NAME"
]
self.service_labels = ["name"]
self.task_labels_gpu = copy.deepcopy(self.task_labels)
self.task_labels_gpu.append("minor_number")
self.gauges = {}
self.add_task_and_service_gauge("{0}_cpu_percent",
"how much percent of cpu this {0} used")
self.add_task_and_service_gauge("{0}_mem_usage_byte",
"how much memory this {0} used")
self.add_task_and_service_gauge("{0}_mem_usage_percent",
"how much percent of memory this {0} used")
self.add_task_and_service_gauge("{0}_mem_limit_byte",
"how much memory this {0} are constrained to")
self.add_task_and_service_gauge("{0}_net_in_byte",
"how much network inbound this task used")
self.add_task_and_service_gauge("{0}_net_out_byte",
"how much network outbound this {0} used")
self.add_task_and_service_gauge("{0}_block_in_byte",
"how much block inbound this {0} used")
self.add_task_and_service_gauge("{0}_block_out_byte",
"how much block outbound this {0} used")
self.add_gauge("task_gpu_percent",
"how much percent of gpu core this task used",
self.task_labels_gpu)
self.add_gauge("task_gpu_mem_percent",
"how much percent of gpu memory this task used",
self.task_labels_gpu)
def add_task_and_service_gauge(self, name_tmpl, desc_tmpl):
self.add_gauge(
name_tmpl.format("task"),
desc_tmpl.format("task"),
self.task_labels)
self.add_gauge(
name_tmpl.format("service"),
desc_tmpl.format("service"),
self.service_labels)
def add_gauge(self, name, desc, labels):
self.gauges[name] = GaugeMetricFamily(name, desc, labels=labels)
def add_value(self, metric_name, labels, val):
if metric_name not in self.gauges:
raise RuntimeError(
"{0} not found in gauges, all gauge names is {1}".format(
metric_name, ",".join(self.gauges.keys())))
gauge = self.gauges[metric_name]
# because prometheus library requires label provided as array, we
# preprocess the labels and check any missing labels
label_array = [None] * len(gauge._labelnames)
for k, v in labels.items():
try:
index = gauge._labelnames.index(k)
label_array[index] = v
except ValueError:
logger.warning("unknown label %s with value %s for metrics %s",
k, v, metric_name)
continue
for i, label_val in enumerate(label_array):
if label_val is None:
logger.error(
"not provided %s as label value for metric %s, ignore this metric",
gauge._labelnames[i], metric_name)
return
gauge.add_metric(label_array, val)
def as_array(self):
return self.gauges.values()
#####
class AtomicRef(object):
""" a thread safe way to store and get object,
should not modify data get from this ref """
def __init__(self):
self.data = None
self.lock = threading.RLock()
def get_and_set(self, new_data):
data = None
with self.lock:
data, self.data = self.data, new_data
return data
def get(self):
with self.lock:
return self.data
class Collector(object):
""" collector is a model running in thread and responsible for collecting
some metrics, we use thread because we do not want to let hanging in one
collector can not have impact on other collectors. This is base class,
real collector should inhernit this class and implement collect_impl,
metrics are returned as an array."""
def __init__(self, name, sleep_time, atomic_ref, iteration_counter):
self.name = name
self.sleep_time = sleep_time
self.atomic_ref = atomic_ref
self.iteration_counter = iteration_counter
histogram_key = "collector_%s_iteration_lantecy_seconds" % self.name
histogram_desc = "latency for execute one interation of %s collector (seconds)" % \
self.name
self.collector_histogram = Histogram(histogram_key, histogram_desc)
logger.debug("init %s with sleep_time %d", self.name, self.sleep_time)
def collect(self):
while True:
logger.debug("collecting metrics from %s", self.name)
with self.collector_histogram.time():
self.iteration_counter.labels(name=self.name).inc()
try:
self.atomic_ref.get_and_set(self.collect_impl())
except Exception as e:
logger.exception("%s collector get an exception", self.name)
logger.debug("finished collect metrcis from %s, will sleep for %s",
self.name, self.sleep_time)
time.sleep(self.sleep_time)
def collect_impl(self):
""" implementations are expected to return an array of
prometheus_client's metrics or None on exception """
pass
def instantiate_collector(name, sleep_time, collector_class, *args):
""" test cases helper fn to instantiate a collector """
atomic_ref = AtomicRef()
return atomic_ref, collector_class(name, sleep_time, atomic_ref, iteration_counter, *args)
def make_collector(name, sleep_time, collector_class, *args):
""" other module should use this fn to init a collector, this fn start a thread
to run the collector and return an atomic_ref so outside world can get metrics
collected by this collector """
atomic_ref, instance = instantiate_collector(name, sleep_time, collector_class, *args)
t = threading.Thread(
target=instance.collect,
name=name,
args=(),
daemon=True)
t.start()
return atomic_ref
class DockerCollector(Collector):
cmd_histogram = Histogram("cmd_docker_active_latency_seconds",
"Command call latency for checking docker daemon activeness (seconds)")
cmd_timeout = 1 # 99th latency is 0.01s
def collect_impl(self):
cmd = """
systemctl is-active docker > /dev/null 2>&1 ;
if [ $? -eq 0 ]; then echo "true"; else echo "false" ; fi
"""
error = "ok"
try:
out = utils.exec_cmd(cmd, shell=True,
histogram=DockerCollector.cmd_histogram,
timeout=DockerCollector.cmd_timeout)
if "true" not in str(out):
error = "inactive"
except subprocess.CalledProcessError as e:
logger.exception("command '%s' return with error (code %d): %s",
cmd, e.returncode, e.output)
error = e.strerror()
except subprocess.TimeoutExpired as e:
logging.warning("check docker active timeout")
error = "timeout"
except Exception as e:
error = e.strerror()
counter = gen_docker_daemon_counter()
counter.add_metric([error], 1)
return [counter]
class GpuCollector(Collector):
cmd_histogram = Histogram("cmd_nvidia_smi_latency_seconds",
"Command call latency for nvidia-smi (seconds)")
cmd_timeout = 3 # 99th latency is 0.97s
def __init__(self, name, sleep_time, atomic_ref, iteration_counter, gpu_info_ref):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)
self.gpu_info_ref = gpu_info_ref
def collect_impl(self):
gpu_info = nvidia.nvidia_smi(GpuCollector.cmd_histogram,
GpuCollector.cmd_timeout)
logging.debug("get gpu_info %s", gpu_info)
self.gpu_info_ref.get_and_set(gpu_info)
if gpu_info is not None:
core_utils = gen_gpu_util_gauge()
mem_utils = gen_gpu_mem_util_gauge()
for minor, info in gpu_info.items():
core_utils.add_metric([minor], info["gpu_util"])
mem_utils.add_metric([minor], info["gpu_mem_util"])
return [core_utils, mem_utils]
return None
class ContainerCollector(Collector):
stats_histogram = Histogram("cmd_docker_stats_latency_seconds",
"Command call latency for docker stats (seconds)")
stats_timeout = 20
# 99th latency may larger than 10s,
# Because prometheus's largest bucket for recording histogram is 10s,
# we can not get value higher than 10s.
inspect_histogram = Histogram("cmd_docker_inspect_latency_seconds",
"Command call latency for docker inspect (seconds)")
inspect_timeout = 1 # 99th latency is 0.042s
iftop_histogram = Histogram("cmd_iftop_latency_seconds",
"Command call latency for iftop (seconds)")
iftop_timeout = 10 # 99th latency is 7.4s
lsof_histogram = Histogram("cmd_lsof_latency_seconds",
"Command call latency for lsof (seconds)")
lsof_timeout = 2 # 99th latency is 0.5s
pai_services = list(map(lambda s: "k8s_" + s, [
"rest-server",
"pylon",
"webportal",
"grafana",
"prometheus",
"alertmanager",
"watchdog",
"end-to-end-test",
"yarn-frameworklauncher",
"hadoop-jobhistory-service",
"hadoop-name-node",
"hadoop-node-manager",
"hadoop-resource-manager",
"hadoop-data-node",
"zookeeper",
"node-exporter",
"job-exporter",
"yarn-exporter",
"nvidia-drivers"
]))
def __init__(self, name, sleep_time, atomic_ref, iteration_counter, gpu_info_ref,
stats_info_ref, interface):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)
self.gpu_info_ref = gpu_info_ref
self.stats_info_ref = stats_info_ref
self.network_interface = network.try_to_get_right_interface(interface)
logger.info("found %s as potential network interface to listen network traffic",
self.network_interface)
# k8s will prepend "k8s_" to pod name. There will also be a container name
# prepend with "k8s_POD_" which is a docker container used to construct
# network & pid namespace for specific container. These container prepend
# with "k8s_POD" consume nothing.
def collect_impl(self):
all_conns = network.iftop(self.network_interface,
ContainerCollector.iftop_histogram,
ContainerCollector.iftop_timeout)
# set it to None so if nvidia-smi hangs till next time we get,
# we will get None
gpu_infos = self.gpu_info_ref.get_and_set(None)
stats_obj = docker_stats.stats(ContainerCollector.stats_histogram,
ContainerCollector.stats_timeout)
self.stats_info_ref.get_and_set(stats_obj)
logger.debug("all_conns is %s, gpu_info is %s, stats_obj is %s",
all_conns, gpu_infos, stats_obj)
return self.collect_container_metrics(stats_obj, gpu_infos, all_conns)
@staticmethod
def parse_from_labels(labels):
gpu_ids = []
other_labels = {}
for key, val in labels.items():
if "container_label_GPU_ID" == key:
s2 = val.replace("\"", "").split(",")
for id in s2:
if id:
gpu_ids.append(id)
else:
other_labels[key] = val
return gpu_ids, other_labels
@classmethod
def infer_service_name(cls, container_name):
""" try to infer service name from container_name, if it's container not belongs
to pai service, will return None """
if container_name.startswith("k8s_POD_"):
# this is empty container created by k8s for pod
return None
# TODO speed this up, since this is O(n^2)
for service_name in cls.pai_services:
if container_name.startswith(service_name):
return service_name[4:] # remove "k8s_" prefix
return None
def process_one_container(self, container_id, stats, gpu_infos, all_conns, gauges):
container_name = utils.walk_json_field_safe(stats, "name")
pai_service_name = ContainerCollector.infer_service_name(container_name)
inspect_info = docker_inspect.inspect(container_id,
ContainerCollector.inspect_histogram,
ContainerCollector.inspect_timeout)
pid = utils.walk_json_field_safe(inspect_info, "pid")
inspect_labels = utils.walk_json_field_safe(inspect_info, "labels")
logger.debug("%s has pid %s, labels %s, service_name %s",
container_name, pid, inspect_labels, pai_service_name)
if not inspect_labels and pai_service_name is None:
logger.debug("%s is ignored", container_name)
return # other container, maybe kubelet or api-server
# get network consumption, since all our services/jobs running in host
# network, and network statistic from docker is not specific to that
# container. We have to get network statistic by ourselves.
lsof_result = network.lsof(pid,
ContainerCollector.lsof_histogram,
ContainerCollector.lsof_timeout)
net_in, net_out = network.get_container_network_metrics(all_conns,
lsof_result)
if logger.isEnabledFor(logging.DEBUG):
debug_info = utils.exec_cmd(
"ps -o cmd fp {0} | tail -n 1".format(pid),
shell=True)
logger.debug("pid %s with cmd `%s` has lsof result %s, in %d, out %d",
pid, debug_info.strip(), lsof_result, net_in, net_out)
if pai_service_name is None:
gpu_ids, container_labels = ContainerCollector.parse_from_labels(inspect_info["labels"])
container_labels.update(inspect_info["env"])
if gpu_infos:
for id in gpu_ids:
labels = copy.deepcopy(container_labels)
labels["minor_number"] = id
gauges.add_value("task_gpu_percent",
labels, gpu_infos[id]["gpu_util"])
gauges.add_value("task_gpu_mem_percent",
labels, gpu_infos[id]["gpu_mem_util"])
gauges.add_value("task_cpu_percent", container_labels, stats["CPUPerc"])
gauges.add_value("task_mem_usage_byte", container_labels, stats["MemUsage_Limit"]["usage"])
gauges.add_value("task_mem_limit_byte", container_labels, stats["MemUsage_Limit"]["limit"])
gauges.add_value("task_net_in_byte", container_labels, net_in)
gauges.add_value("task_net_out_byte", container_labels, net_out)
gauges.add_value("task_block_in_byte", container_labels, stats["BlockIO"]["in"])
gauges.add_value("task_block_out_byte", container_labels, stats["BlockIO"]["out"])
gauges.add_value("task_mem_usage_percent", container_labels, stats["MemPerc"])
else:
labels = {"name": pai_service_name}
gauges.add_value("service_cpu_percent", labels, stats["CPUPerc"])
gauges.add_value("service_mem_usage_byte", labels, stats["MemUsage_Limit"]["usage"])
gauges.add_value("service_mem_limit_byte", labels, stats["MemUsage_Limit"]["limit"])
gauges.add_value("service_mem_usage_percent", labels, stats["MemPerc"])
gauges.add_value("service_net_in_byte", labels, net_in)
gauges.add_value("service_net_out_byte", labels, net_out)
gauges.add_value("service_block_in_byte", labels, stats["BlockIO"]["in"])
gauges.add_value("service_block_out_byte", labels, stats["BlockIO"]["out"])
def collect_container_metrics(self, stats_obj, gpu_infos, all_conns):
if stats_obj is None:
logger.warning("docker stats returns None")
return None
gauges = ResourceGauges()
for container_id, stats in stats_obj.items():
try:
self.process_one_container(container_id, stats, gpu_infos, all_conns, gauges)
except Exception:
logging.exception("error when trying to process container %s with name %s",
container_id, utils.walk_json_field_safe(stats, "name"))
return gauges.as_array()
class ZombieCollector(Collector):
logs_histogram = Histogram("cmd_docker_logs_latency_seconds",
"Command call latency for docker logs (seconds)")
logs_timeout = 1 # 99th latency is 0.04s
zombie_container_count = Gauge("zombie_container_count",
"number of zombie container found for this node",
["type"])
class ZombieRecorder(object):
def __init__(self, type):
self.type = type
self.zombies = {} # key is container id, value is enter zombie time
# When we first meet zombie container, we only record time of that meet,
# we wait extra decay_time to report it as zombie. Because at the time
# of our recording, zombie just produced, and haven't been recycled, we
# wait 5 minutes to avoid possible cases of normal zombie.
self.decay_time = datetime.timedelta(minutes=5)
def update(self, zombie_ids, now):
""" feed in new zombie ids and get count of decayed zombie """
# remove all records not exist anymore
for z_id in list(self.zombies.keys()):
if z_id not in zombie_ids:
logger.debug("pop zombie %s that not exist anymore", z_id)
self.zombies.pop(z_id)
count = 0
for current in zombie_ids:
if current in self.zombies:
enter_zombie_time = self.zombies[current]
if now - enter_zombie_time > self.decay_time:
count += 1
else:
logger.debug("new zombie %s", current)
self.zombies[current] = now
ZombieCollector.zombie_container_count.labels(self.type).set(count)
return count # for test
def __len__(self):
return len(self.zombies)
def __init__(self, name, sleep_time, atomic_ref, iteration_counter, stats_info_ref):
Collector.__init__(self, name, sleep_time, atomic_ref, iteration_counter)
self.stats_info_ref = stats_info_ref
self.type1_zombies = ZombieCollector.ZombieRecorder("job_exit_hangs")
self.type2_zombies = ZombieCollector.ZombieRecorder("residual_job")
self.yarn_pattern = u"container_\w{3}_[0-9]{13}_[0-9]{4}_[0-9]{2}_[0-9]{6}"
self.yarn_container_reg = re.compile(u"^" + self.yarn_pattern + "$")
self.job_container_reg = re.compile(u"^.+(" + self.yarn_pattern + u")$")
def update_zombie_count_type1(self, exited_containers, now):
""" this fn will generate zombie container count for the first type,
exited_containers is container id set of which we believe exited """
return self.type1_zombies.update(exited_containers, now)
def update_zombie_count_type2(self, stats, now):
""" this fn will generate zombie container count for the second type """
names = set([info["name"] for info in stats.values()])
job_containers = {} # key is original name, value is corresponding yarn_container name
yarn_containers = set()
zombie_ids = set()
for name in names:
if re.match(self.yarn_container_reg, name) is not None:
yarn_containers.add(name)
elif re.match(self.job_container_reg, name) is not None:
match = re.match(self.job_container_reg, name)
value = match.groups()[0]
job_containers[name] = value
else:
pass # ignore
for job_name, yarn_name in job_containers.items():
if yarn_name not in yarn_containers:
zombie_ids.add(job_name)
return self.type2_zombies.update(zombie_ids, now)
def docker_logs(self, container_id, tail="all"):
try:
return utils.exec_cmd(
["docker", "logs", "--tail", str(tail), str(container_id)],
histogram=ZombieCollector.logs_histogram,
stderr=subprocess.STDOUT, # also capture stderr output
timeout=ZombieCollector.logs_timeout)
except subprocess.TimeoutExpired as e:
logger.warning("docker log timeout")
except subprocess.CalledProcessError as e:
logger.warning("docker logs returns %d, output %s", e.returncode, e.output)
except Exception:
logger.exception("exec docker logs error")
return ""
def is_container_exited(self, container_id):
logs = self.docker_logs(container_id, tail=50)
if re.search(u"USER COMMAND END", logs):
return True
return False
def update_zombie_count(self, stats):
"""
There are two types of zombie:
1. container which outputed "USER COMMAND END" but did not exist for a long period of time
2. yarn container exited but job container didn't
"""
if stats is None:
logging.warning("docker stats is None")
return
exited_containers = set(filter(self.is_container_exited, stats.keys()))
now = datetime.datetime.now()
self.update_zombie_count_type1(exited_containers, now)
self.update_zombie_count_type2(stats, now)
def collect_impl(self):
# set it to None so if docker-stats hangs till next time we get,
# we will get None
stats_info = self.stats_info_ref.get_and_set(None)
self.update_zombie_count(stats_info)

Просмотреть файл

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -25,8 +25,8 @@ import utils
logger = logging.getLogger(__name__)
targetLabel = {"PAI_HOSTNAME", "PAI_JOB_NAME", "PAI_USER_NAME", "PAI_CURRENT_TASK_ROLE_NAME", "GPU_ID"}
targetEnv = {"PAI_TASK_INDEX"}
target_label = {"PAI_HOSTNAME", "PAI_JOB_NAME", "PAI_USER_NAME", "PAI_CURRENT_TASK_ROLE_NAME", "GPU_ID"}
target_env = {"PAI_TASK_INDEX"}
def parse_docker_inspect(inspect_output):
@ -37,39 +37,35 @@ def parse_docker_inspect(inspect_output):
obj_labels = utils.walk_json_field_safe(obj, 0, "Config", "Labels")
if obj_labels is not None:
for key in obj_labels:
if key in targetLabel:
labelKey = "container_label_{0}".format(key.replace(".", "_"))
labelVal = obj_labels[key]
labels[labelKey] = labelVal
if key in target_label:
label_key = "container_label_{0}".format(key.replace(".", "_"))
label_val = obj_labels[key]
labels[label_key] = label_val
obj_env = utils.walk_json_field_safe(obj, 0, "Config", "Env")
if obj_env:
for env in obj_env:
envItem = env.split("=")
if envItem[0] in targetEnv:
envKey = "container_env_{0}".format(envItem[0].replace(".", "_"))
envVal = envItem[1]
envs[envKey] = envVal
env_item = env.split("=")
if env_item[0] in target_env:
key = "container_env_{0}".format(env_item[0].replace(".", "_"))
val = env_item[1]
envs[key] = val
pid = utils.walk_json_field_safe(obj, 0, "State", "Pid")
return {"env": envs, "labels": labels, "pid": pid}
def inspect(containerId):
def inspect(container_id, histogram, timeout):
try:
logger.debug("ready to run docker inspect")
dockerDockerInspect = utils.check_output(["docker", "inspect", containerId])
inspectInfo = parse_docker_inspect(dockerDockerInspect)
return inspectInfo
result = utils.exec_cmd(
["docker", "inspect", container_id],
histogram=histogram,
timeout=timeout)
return parse_docker_inspect(result)
except subprocess.CalledProcessError as e:
logger.exception("command '%s' return with error (code %d): %s",
e.cmd, e.returncode, e.output)
def main(argv):
containerId = argv[0]
print inspect(containerId)
# execute cmd example: python .\docker_inspect.py 33a22dcd4ba3
if __name__ == "__main__":
main(sys.argv[1:])
except subprocess.TimeoutExpired:
logger.warning("docker inspect timeout")
except Exception:
logger.exception("exec docker inspect error")

Просмотреть файл

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -17,10 +17,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import subprocess
import json
import sys
import re
import datetime
import logging
import utils
@ -28,7 +26,7 @@ import utils
logger = logging.getLogger(__name__)
def parse_percentile(data):
return data.replace("%", "")
return float(data.replace("%", ""))
def parse_io(data):
inOut = data.split("/")
@ -65,14 +63,13 @@ def convert_to_byte(data):
return number
def parse_docker_stats(stats):
data = [line.split(',') for line in stats.splitlines()]
data = [line.split(",") for line in stats.splitlines()]
# pop the headers
data.pop(0)
rowNum = len(data)
colNum = len(data[0])
containerStats = {}
row_count = len(data)
container_stats = {}
for i in range(rowNum):
for i in range(row_count):
id = data[i][0]
containerInfo = {
"id": data[i][0],
@ -83,26 +80,20 @@ def parse_docker_stats(stats):
"BlockIO": parse_io(data[i][5]),
"MemPerc": parse_percentile(data[i][6])
}
containerStats[id] = containerInfo
return containerStats
container_stats[id] = containerInfo
return container_stats
def stats():
start = datetime.datetime.now()
def stats(histogram, timeout):
try:
logger.info("ready to run docker stats")
dockerDockerStats = utils.check_output([
result = utils.exec_cmd([
"docker", "stats", "--no-stream", "--format",
"table {{.Container}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"])
return parse_docker_stats(dockerDockerStats)
"table {{.Container}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"],
histogram=histogram,
timeout=timeout)
return parse_docker_stats(result)
except subprocess.CalledProcessError as e:
logger.error("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
finally:
end = datetime.datetime.now()
logger.info("docker state spent %s", end - start)
def main(argv):
stats()
# execute cmd example: python .\docker_stats.py True
if __name__ == "__main__":
main(sys.argv[1:])
except subprocess.TimeoutExpired:
logger.warning("docker stats timeout")
except Exception:
logger.exception("exec docker stats error")

Просмотреть файл

@ -1,391 +0,0 @@
#!/usr/bin/python
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import copy
import sys
import time
import logging
import re
import datetime
import subprocess
import os
import json
import docker_stats
import docker_inspect
import gpu_exporter
import network
import utils
from utils import Metric
logger = logging.getLogger(__name__)
# k8s will prepend "k8s_" to pod name. There will also be a container name prepend with "k8s_POD_"
# which is a docker container used to construct network & pid namespace for specific container. These
# container prepend with "k8s_POD" consume nothing.
pai_services = map(lambda s: "k8s_" + s, [
"rest-server",
"pylon",
"webportal",
"grafana",
"prometheus",
"alertmanager",
"watchdog",
"end-to-end-test",
"yarn-frameworklauncher",
"hadoop-jobhistory-service",
"hadoop-name-node",
"hadoop-node-manager",
"hadoop-resource-manager",
"hadoop-data-node",
"zookeeper",
"node-exporter",
"job-exporter",
"yarn-exporter",
"nvidia-drivers"
])
class ZombieRecorder(object):
def __init__(self):
self.zombies = {} # key is container id, value is enter zombie time
# When we first meet zombie container, we only record time of that meet,
# we wait extra decay_time to report it as zombie. Because at the time
# of our recording, zombie just produced, and haven't been recycled, we
# wait 5 minutes to avoid possible cases of normal zombie.
self.decay_time = datetime.timedelta(minutes=5)
def update(self, zombie_ids, now):
""" feed in new zombie ids and get count of decayed zombie """
# remove all records not exist anymore
for z_id in self.zombies.keys():
if z_id not in zombie_ids:
logger.debug("pop zombie %s that not exist anymore", z_id)
self.zombies.pop(z_id)
count = 0
for current in zombie_ids:
if current in self.zombies:
enter_zombie_time = self.zombies[current]
if now - enter_zombie_time > self.decay_time:
count += 1
else:
logger.debug("new zombie %s", current)
self.zombies[current] = now
return count
def __len__(self):
return len(self.zombies)
yarn_pattern = u"container_\w{3}_[0-9]{13}_[0-9]{4}_[0-9]{2}_[0-9]{6}"
yarn_container_reg = re.compile(u"^" + yarn_pattern + "$")
job_container_reg = re.compile(u"^.+(" + yarn_pattern + u")$")
def parse_from_labels(labels):
gpu_ids = []
other_labels = {}
for key, val in labels.items():
if "container_label_GPU_ID" == key:
s2 = val.replace("\"", "").split(",")
for id in s2:
if id:
gpu_ids.append(id)
else:
other_labels[key] = val
return gpu_ids, other_labels
def generate_zombie_count_type1(type1_zombies, exited_containers, now):
""" this fn will generate zombie container count for the first type,
exited_containers is container id set of which we believe exited """
return type1_zombies.update(exited_containers, now)
def generate_zombie_count_type2(type2_zombies, stats, now):
""" this fn will generate zombie container count for the second type """
names = set([info["name"] for info in stats.values()])
job_containers = {} # key is original name, value is corresponding yarn_container name
yarn_containers = set()
zombie_ids = set()
for name in names:
if re.match(yarn_container_reg, name) is not None:
yarn_containers.add(name)
elif re.match(job_container_reg, name) is not None:
match = re.match(job_container_reg, name)
value = match.groups()[0]
job_containers[name] = value
else:
pass # ignore
for job_name, yarn_name in job_containers.items():
if yarn_name not in yarn_containers:
zombie_ids.add(job_name)
return type2_zombies.update(zombie_ids, now)
def docker_logs(container_id, tail="all"):
try:
return utils.check_output(["docker", "logs", "--tail", str(tail), str(container_id)])
except subprocess.CalledProcessError as e:
logger.exception("command '%s' return with error (code %d): %s",
e.cmd, e.returncode, e.output)
def is_container_exited(container_id):
logs = docker_logs(container_id, tail=50)
if logs is not None and re.search(u"USER COMMAND END", logs):
return True
return False
def generate_zombie_count(stats, type1_zombies, type2_zombies):
"""
There are two types of zombie:
1. container which outputed "USER COMMAND END" but did not exist for a long period of time
2. yarn container exited but job container didn't
"""
exited_containers = set(filter(is_container_exited, stats.keys()))
logger.debug("exited_containers is %s", exited_containers)
now = datetime.datetime.now()
zombie_count1 = generate_zombie_count_type1(type1_zombies, exited_containers, now)
zombie_count2 = generate_zombie_count_type2(type2_zombies, stats, now)
return [Metric("zombie_container_count", {}, zombie_count1 + zombie_count2)]
def collect_job_metrics(gpu_infos, all_conns, type1_zombies, type2_zombies):
stats_obj = docker_stats.stats()
if stats_obj is None:
logger.warning("docker stats returns None")
return None
result = []
for container_id, stats in stats_obj.items():
pai_service_name = None
# TODO speed this up, since this is O(n^2)
for service_name in pai_services:
if stats["name"].startswith(service_name):
pai_service_name = service_name[4:] # remove "k8s_" prefix
break
inspect_info = docker_inspect.inspect(container_id)
pid = inspect_info["pid"] if inspect_info is not None else None
inspect_labels = utils.walk_json_field_safe(inspect_info, "labels")
if not inspect_labels and pai_service_name is None:
continue # other container, maybe kubelet or api-server
# get network consumption, since all our services/jobs running in host network,
# network statistic from docker is not specific to that container. We have to
# get network statistic by ourselves.
lsof_result = network.lsof(pid)
net_in, net_out = network.get_container_network_metrics(all_conns, lsof_result)
if logger.isEnabledFor(logging.DEBUG):
debug_info = utils.check_output("ps -o cmd fp {0} | tail -n 1".format(pid), shell=True)
logger.debug("pid %s with cmd `%s` has lsof result %s, in %d, out %d",
pid, debug_info, lsof_result, net_in, net_out)
if pai_service_name is None:
gpu_ids, container_labels = parse_from_labels(inspect_info["labels"])
container_labels.update(inspect_info["env"])
for id in gpu_ids:
if gpu_infos:
labels = copy.deepcopy(container_labels)
labels["minor_number"] = id
result.append(Metric("container_GPUPerc", labels, gpu_infos[id]["gpu_util"]))
result.append(Metric("container_GPUMemPerc", labels, gpu_infos[id]["gpu_mem_util"]))
result.append(Metric("container_CPUPerc", container_labels, stats["CPUPerc"]))
result.append(Metric("container_MemUsage", container_labels, stats["MemUsage_Limit"]["usage"]))
result.append(Metric("container_MemLimit", container_labels, stats["MemUsage_Limit"]["limit"]))
result.append(Metric("container_NetIn", container_labels, net_in))
result.append(Metric("container_NetOut", container_labels, net_out))
result.append(Metric("container_BlockIn", container_labels, stats["BlockIO"]["in"]))
result.append(Metric("container_BlockOut", container_labels, stats["BlockIO"]["out"]))
result.append(Metric("container_MemPerc", container_labels, stats["MemPerc"]))
else:
labels = {"name": pai_service_name}
result.append(Metric("service_cpu_percent", labels, stats["CPUPerc"]))
result.append(Metric("service_mem_usage_byte", labels, stats["MemUsage_Limit"]["usage"]))
result.append(Metric("service_mem_limit_byte", labels, stats["MemUsage_Limit"]["limit"]))
result.append(Metric("service_mem_usage_percent", labels, stats["MemPerc"]))
result.append(Metric("service_net_in_byte", labels, net_in))
result.append(Metric("service_net_out_byte", labels, net_out))
result.append(Metric("service_block_in_byte", labels, stats["BlockIO"]["in"]))
result.append(Metric("service_block_out_byte", labels, stats["BlockIO"]["out"]))
result.extend(generate_zombie_count(stats_obj, type1_zombies, type2_zombies))
return result
def collect_docker_daemon_status():
""" check docker daemon status in current host """
cmd = "systemctl is-active docker | if [ $? -eq 0 ]; then echo \"active\"; else exit 1 ; fi"
error = "ok"
try:
logger.info("call systemctl to get docker status")
out = utils.check_output(cmd, shell=True)
if "active" not in out:
error = "inactive"
except subprocess.CalledProcessError as e:
logger.exception("command '%s' return with error (code %d): %s",
cmd, e.returncode, e.output)
error = e.strerror()
except OSError as e:
if e.errno == os.errno.ENOENT:
logger.warning("systemctl not found")
error = e.strerror()
return [Metric("docker_daemon_count", {"error": error}, 1)]
def get_gpu_count(path):
hostname = os.environ.get("HOSTNAME")
ip = os.environ.get("HOST_IP")
logger.info("hostname is %s, ip is %s", hostname, ip)
with open(path) as f:
gpu_config = json.load(f)
if hostname is not None and gpu_config["nodes"].get(hostname) is not None:
return gpu_config["nodes"][hostname]["gpuCount"]
elif ip is not None and gpu_config["nodes"].get(ip) is not None:
return gpu_config["nodes"][ip]["gpuCount"]
else:
logger.warning("failed to find gpu count from config %s", gpu_config)
return 0
def config_environ():
""" since job-exporter needs to call nvidia-smi, we need to change
LD_LIBRARY_PATH to correct value """
driver_path = os.environ.get("NV_DRIVER")
logger.debug("NV_DRIVER is %s", driver_path)
ld_path = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = ld_path + os.pathsep + \
os.path.join(driver_path, "lib") + os.pathsep + \
os.path.join(driver_path, "lib64")
logger.debug("LD_LIBRARY_PATH is %s", os.environ["LD_LIBRARY_PATH"])
def main(argv):
config_environ()
log_dir = argv[0]
gpu_metrics_path = log_dir + "/gpu_exporter.prom"
job_metrics_path = log_dir + "/job_exporter.prom"
docker_metrics_path = log_dir + "/docker.prom"
time_metrics_path = log_dir + "/time.prom"
configured_gpu_path = log_dir + "/cofigured_gpu.prom"
time_sleep_s = int(argv[1])
iter = 0
gpu_singleton = utils.Singleton(gpu_exporter.collect_gpu_info, name="gpu_singleton")
docker_status_singleton = utils.Singleton(collect_docker_daemon_status, name="docker_singleton")
type1_zombies = ZombieRecorder()
type2_zombies = ZombieRecorder()
configured_gpu_count = get_gpu_count("/gpu-config/gpu-configuration.json")
logger.debug("configured_gpu_count is %s", configured_gpu_count)
utils.export_metrics_to_file(configured_gpu_path, [Metric("configured_gpu_count", {},
configured_gpu_count)])
while True:
start = datetime.datetime.now()
try:
logger.info("job exporter running {0} iteration".format(str(iter)))
iter += 1
docker_status, is_old = docker_status_singleton.try_get()
if not is_old:
utils.export_metrics_to_file(docker_metrics_path, docker_status)
else:
utils.export_metrics_to_file(docker_metrics_path, None)
all_conns = network.iftop()
logger.debug("iftop result is %s", all_conns)
gpu_infos, is_old = gpu_singleton.try_get()
if is_old:
gpu_infos = None # ignore old gpu_infos
gpu_metrics = gpu_exporter.convert_gpu_info_to_metrics(gpu_infos)
utils.export_metrics_to_file(gpu_metrics_path, gpu_metrics)
# join with docker stats metrics and docker inspect labels
job_metrics = collect_job_metrics(gpu_infos, all_conns, type1_zombies, type2_zombies)
utils.export_metrics_to_file(job_metrics_path, job_metrics)
except Exception as e:
logger.exception("exception in job exporter loop")
finally:
end = datetime.datetime.now()
time_metrics = [Metric("job_exporter_iteration_seconds", {}, (end - start).seconds)]
utils.export_metrics_to_file(time_metrics_path, time_metrics)
time.sleep(time_sleep_s)
def get_logging_level():
mapping = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING
}
result = logging.INFO
if os.environ.get("LOGGING_LEVEL") is not None:
level = os.environ["LOGGING_LEVEL"]
result = mapping.get(level.upper())
if result is None:
sys.stderr.write("unknown logging level " + level + ", default to INFO\n")
result = logging.INFO
return result
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s",
level=get_logging_level())
main(sys.argv[1:])

Просмотреть файл

@ -0,0 +1,156 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import json
import threading
from wsgiref.simple_server import make_server
from prometheus_client import make_wsgi_app, Gauge
from prometheus_client.core import REGISTRY
import collector
logger = logging.getLogger(__name__)
configured_gpu_counter = Gauge("configured_gpu_count",
"total number of gpu configured for this node")
class CustomCollector(object):
def __init__(self, atomic_refs):
self.atomic_refs = atomic_refs
def collect(self):
data = []
for ref in self.atomic_refs:
# set None to achieve
# https://github.com/Microsoft/pai/issues/1764#issuecomment-442733098
d = ref.get_and_set(None)
if d is not None:
data.extend(d)
if len(data) > 0:
for datum in data:
yield datum
else:
# https://stackoverflow.com/a/6266586
# yield nothing
return
yield
def config_environ():
""" since job-exporter needs to call nvidia-smi, we need to change
LD_LIBRARY_PATH to correct value """
driver_path = os.environ.get("NV_DRIVER")
logger.debug("NV_DRIVER is %s", driver_path)
ld_path = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = ld_path + os.pathsep + \
os.path.join(driver_path, "lib") + os.pathsep + \
os.path.join(driver_path, "lib64")
logger.debug("LD_LIBRARY_PATH is %s", os.environ["LD_LIBRARY_PATH"])
def try_remove_old_prom_file(path):
""" try to remove old prom file, since old prom file are exposed by node-exporter,
if we do not remove, node-exporter will still expose old metrics """
if os.path.isfile(path):
try:
os.unlink(path)
except Exception as e:
log.warning("can not remove old prom file %s", path)
def get_gpu_count(path):
hostname = os.environ.get("HOSTNAME")
ip = os.environ.get("HOST_IP")
logger.debug("hostname is %s, ip is %s", hostname, ip)
with open(path) as f:
gpu_config = json.load(f)
if hostname is not None and gpu_config["nodes"].get(hostname) is not None:
return gpu_config["nodes"][hostname]["gpuCount"]
elif ip is not None and gpu_config["nodes"].get(ip) is not None:
return gpu_config["nodes"][ip]["gpuCount"]
else:
logger.warning("failed to find gpu count from config %s", gpu_config)
return 0
def main(args):
config_environ()
try_remove_old_prom_file(args.log + "/gpu_exporter.prom")
try_remove_old_prom_file(args.log + "/job_exporter.prom")
try_remove_old_prom_file(args.log + "/docker.prom")
try_remove_old_prom_file(args.log + "/time.prom")
try_remove_old_prom_file(args.log + "/configured_gpu.prom")
configured_gpu_counter.set(get_gpu_count("/gpu-config/gpu-configuration.json"))
# used to exchange gpu info between GpuCollector and ContainerCollector
gpu_info_ref = collector.AtomicRef()
# used to exchange docker stats info between ContainerCollector and ZombieCollector
stats_info_ref = collector.AtomicRef()
interval = args.interval
# Because all collector except container_collector will spent little time in calling
# external command to get metrics, so they need to sleep 30s to align with prometheus
# scrape interval. The 99th latency of container_collector loop is around 20s, so it
# should only sleep 10s to adapt to scrape interval
collector_args = [
("docker_daemon_collector", interval, collector.DockerCollector),
("gpu_collector", interval, collector.GpuCollector, gpu_info_ref),
("container_collector", interval - 18, collector.ContainerCollector,
gpu_info_ref, stats_info_ref, args.interface),
("zombie_collector", interval, collector.ZombieCollector, stats_info_ref),
]
refs = list(map(lambda x: collector.make_collector(*x), collector_args))
REGISTRY.register(CustomCollector(refs))
app = make_wsgi_app(REGISTRY)
httpd = make_server("", int(args.port), app)
httpd.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--log", "-l", help="log dir to store log", default="/datastorage/prometheus")
parser.add_argument("--port", "-p", help="port to expose metrics", default="9102")
parser.add_argument("--interval", "-i", help="prometheus scrape interval", type=int, default=30)
parser.add_argument("--interface", "-n", help="network interface for job-exporter to listen on", required=True)
args = parser.parse_args()
def get_logging_level():
mapping = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING
}
result = logging.INFO
if os.environ.get("LOGGING_LEVEL") is not None:
level = os.environ["LOGGING_LEVEL"]
result = mapping.get(level.upper())
if result is None:
sys.stderr.write("unknown logging level " + level + \
", default to INFO\n")
result = logging.INFO
return result
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(threadName)s - %(filename)s:%(lineno)s - %(message)s",
level=get_logging_level())
main(args)

Просмотреть файл

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -16,13 +16,14 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import subprocess
import json
import sys
import re
import time
import logging
import collections
import subprocess
import socket
import fcntl
import struct
import array
import utils
@ -60,12 +61,20 @@ def convert_to_byte(data):
return number
def iftop():
def iftop(interface, histogram, timeout):
cmd = ["iftop", "-t", "-P", "-s", "1", "-L", "10000", "-B", "-n", "-N"]
if interface is not None:
cmd.extend(["-i", interface])
try:
output = utils.check_output(["iftop", "-t", "-P", "-s", "1", "-L", "10000",
"-B", "-n", "-N"])
output = utils.exec_cmd(
cmd,
stderr=subprocess.STDOUT, # also capture stderr output
histogram=histogram, timeout=timeout)
return parse_iftop(output)
except:
except subprocess.TimeoutExpired:
logger.warning("iftop timeout")
except Exception:
logger.exception("exec iftop error")
return None
@ -89,7 +98,7 @@ def parse_iftop(iftop_output, duration=40):
if part == 1:
data.append(line)
for line_no in xrange(0, len(data), 2):
for line_no in range(0, len(data), 2):
line1 = data[line_no].split()
line2 = data[line_no + 1].split()
src = line1[1]
@ -116,15 +125,22 @@ def parse_iftop(iftop_output, duration=40):
return result
def lsof(pid):
def lsof(pid, histogram, timeout):
""" use infilter to do setns https://github.com/yadutaf/infilter """
if pid is None:
return None
try:
output = utils.check_output(["infilter", str(pid), "/usr/bin/lsof", "-i", "-n", "-P"])
output = utils.exec_cmd(["infilter", str(pid), "/usr/bin/lsof", "-i", "-n", "-P"],
histogram=histogram,
stderr=subprocess.STDOUT, # also capture stderr output
timeout=timeout)
return parse_lsof(output)
except:
except subprocess.TimeoutExpired:
logger.warning("lsof timeout")
except subprocess.CalledProcessError as e:
logger.warning("infilter lsof returns %d, output %s", e.returncode, e.output)
except Exception:
logger.exception("exec lsof error")
return None
@ -138,9 +154,13 @@ def parse_lsof(lsof_output):
for line in data:
if "ESTABLISHED" in line:
parts = line.split()
if len(parts) == 10:
pid = parts[1]
src = parts[8].split("->")[0]
conns[pid].add(src)
else:
logger.warning("unknown format of lsof %s", parts)
continue
return conns
@ -165,19 +185,68 @@ def get_container_network_metrics(all_conns, lsof_result):
return in_byte, out_byte
def main(pids):
""" test purpose """
conns = iftop()
def format_ip(addr):
return str(addr[0]) + "." + \
str(addr[1]) + "." + \
str(addr[2]) + "." + \
str(addr[3])
logger.debug("conns is %s", conns)
logger.debug("lsof_result is %s", lsof_result)
for pid in pids:
lsof_result = lsof(pid)
in_byte, out_byte = get_container_network_metrics(conns, lsof_result)
logger.info("pid %s in %d out %d", pid, in_byte, out_byte)
def get_interfaces():
""" get all network interfaces we see, return map with interface name as key, and ip as value """
max_possible = 128
bytes = max_possible * 32
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
names = array.array("B", b"\0" * bytes)
outbytes = struct.unpack("iL", fcntl.ioctl(
s.fileno(),
0x8912, # SIOCGIFCONF
struct.pack("iL", bytes, names.buffer_info()[0])
))[0]
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s",
level=logging.DEBUG)
main(sys.argv[1:])
namestr = names.tostring()
result = {}
for i in range(0, outbytes, 40):
name = namestr[i:i+16].split(b"\0", 1)[0].decode("ascii")
ip = namestr[i+20:i+24]
result[name] = format_ip(ip)
return result
def get_ip_can_access_internet(target="hub.docker.com"):
""" return None on error """
s = socket.socket(
socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((target, 80))
except socket.gaierror:
logger.exception("failed to connect to %s", target)
return None
except Exception:
logger.exception("unknown exception when tying to connect %s", target)
return s.getsockname()[0]
def try_to_get_right_interface(configured_ifs):
""" try to return a right interface so that iftop can listen on """
ifs = get_interfaces()
logger.debug("found interfaces %s", ifs)
for interface in configured_ifs.split(","):
interface = interface.strip()
if interface in ifs:
return interface
logger.info("didn't find correct network interface in this node, configured %s, found %s",
configured_ifs, ifs)
ip = get_ip_can_access_internet()
if ip is not None:
for if_name, if_ip in ifs.items():
if ip == if_ip:
return if_name
return None

Просмотреть файл

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -17,13 +17,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import subprocess
import sys
from xml.dom import minidom
import os
import logging
import utils
from utils import Metric
logger = logging.getLogger(__name__)
@ -43,46 +41,25 @@ def parse_smi_xml_result(smi):
if gpu_util == "N/A" or gpu_mem_util == "N/A":
continue
result[str(minor)] = {"gpu_util": gpu_util, "gpu_mem_util": gpu_mem_util}
result[str(minor)] = {"gpu_util": float(gpu_util), "gpu_mem_util": float(gpu_mem_util)}
return result
def collect_gpu_info():
""" in some cases, nvidia-smi may block indefinitely, caller should be aware of this """
def nvidia_smi(histogram, timeout):
driver_path = os.environ["NV_DRIVER"]
bin_path = os.path.join(driver_path, "bin/nvidia-smi")
try:
logger.info("call %s to get gpu metrics", bin_path) # used to check if nvidia-smi hangs
smi_output = utils.check_output([bin_path, "-q", "-x"])
try:
smi_output = utils.exec_cmd([bin_path, "-q", "-x"],
histogram=histogram, timeout=timeout)
return parse_smi_xml_result(smi_output)
except subprocess.CalledProcessError as e:
if e.returncode == 127:
logger.exception("nvidia cmd error. command '%s' return with error (code %d): %s",
e.cmd, e.returncode, e.output)
else:
logger.exception("command '%s' return with error (code %d): %s",
e.cmd, e.returncode, e.output)
except OSError as e:
logger.exception("nvidia-smi not found")
except subprocess.TimeoutExpired:
logger.warning("nvidia-smi timeout")
except Exception:
logger.exception("exec nvidia-smi error")
return None
def convert_gpu_info_to_metrics(gpu_infos):
if gpu_infos is None:
return None
result = [Metric("nvidiasmi_attached_gpus", {}, len(gpu_infos))]
for minor, info in gpu_infos.items():
label = {"minor_number": minor}
result.append(Metric("nvidiasmi_utilization_gpu", label, info["gpu_util"]))
result.append(Metric("nvidiasmi_utilization_memory", label, info["gpu_mem_util"]))
return result
if __name__ == "__main__":
print collect_gpu_info()

Просмотреть файл

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -16,128 +16,27 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import re
import codecs
import subprocess
import logging
import threading
import datetime
from Queue import Queue
from Queue import Empty
logger = logging.getLogger(__name__)
class Metric(object):
""" represents one prometheus metric record
https://prometheus.io/docs/concepts/data_model/ """
def __init__(self, name, labels, value):
self.name = name
self.labels = labels
self.value = value
def __eq__(self, o):
return self.name == o.name and self.labels == o.labels and self.value == o.value
def __repr__(self):
if len(self.labels) > 0:
labels = ", ".join(map(lambda x: "{}=\"{}\"".format(x[0], x[1]),
self.labels.items()))
labels = "{" + labels + "}"
def exec_cmd(*args, **kwargs):
""" exec a cmd with timeout, also record time used using prometheus higtogram """
if kwargs.get("histogram") is not None:
histogram = kwargs.pop("histogram")
else:
labels = ""
histogram = None
return "{}{} {}".format(self.name, labels, self.value)
logger.debug("about to exec %s", args[0])
def export_metrics_to_file(path, metrics):
""" if metrics not None, should still open the path, to modify time stamp of file,
readiness probe needs this"""
with codecs.open(path, "w", encoding="utf-8") as f:
if metrics is not None:
for metric in metrics:
f.write(str(metric))
f.write("\n")
def check_output(*args, **kwargs):
""" subprocess.check_output may hanging if cmd output too much stdout or stderr """
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
process = subprocess.Popen(*args, **kwargs)
outs = []
errs = []
while process.poll() is None:
out, err = process.communicate()
outs.append(out)
errs.append(err)
if process.returncode != 0:
logger.warn("process `%s` failed with return code %d, stdout %s, stderr %s",
args, process.returncode, "".join(outs), "".join(errs))
return "".join(outs)
class Singleton(object):
""" wrapper around metrics getter, because getter may block
indefinitely, so we wrap call in thread.
Also, to avoid having too much threads, use semaphore to ensure only 1
thread is running """
def __init__(self, getter, get_timeout_s=3, name="singleton"):
self.getter = getter
self.get_timeout_s = get_timeout_s
self.name = name
self.semaphore = threading.Semaphore(1)
self.queue = Queue(1)
self.old_metrics = None
def wrapper(self, semaphore, queue):
""" wrapper assume semaphore already acquired, will release semaphore on exit """
result = None
try:
try:
# remove result put by previous thread but didn't get by main thread
queue.get(block=False)
except Empty:
pass
start = datetime.datetime.now()
result = self.getter()
except Exception as e:
logger.warn("%s get metrics failed", self.name)
logger.exception(e)
finally:
logger.info("%s get metrics spent %s", self.name, datetime.datetime.now() - start)
semaphore.release()
queue.put(result)
def try_get(self):
""" return value, is_old tuple, if value is not newly getted, return value we get last time,
and set is_old to True, otherwise return newly getted value and False """
if self.semaphore.acquire(False):
t = threading.Thread(target=Singleton.wrapper, name=self.name + "-getter",
args=(self, self.semaphore, self.queue))
t.start()
if histogram is not None:
with histogram.time():
return subprocess.check_output(*args, **kwargs).decode("utf-8")
else:
logger.warn("%s is still running", self.name + "-getter")
try:
self.old_metrics = self.queue.get(block=True, timeout=self.get_timeout_s)
return self.old_metrics, False
except Empty:
pass
return self.old_metrics, True
def camel_to_underscore(label):
""" convert camel case into underscore
https://stackoverflow.com/a/1176023 """
tmp = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', label)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', tmp).lower()
return subprocess.check_output(*args, **kwargs).decode("utf-8")
def walk_json_field_safe(obj, *fields):

Просмотреть файл

@ -0,0 +1,28 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import logging
import unittest
class TestBase(unittest.TestCase):
"""
Test Base class for job-exporter
"""
@classmethod
def setUpClass(cls):
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)s - %(message)s",
level=logging.DEBUG)

Просмотреть файл

@ -0,0 +1,180 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import copy
import unittest
import datetime
import time
import logging
import base
sys.path.append(os.path.abspath("../src/"))
import collector
from collector import ContainerCollector
logger = logging.getLogger(__name__)
class TestContainerCollector(base.TestBase):
"""
Test ContainerCollector in collecotr.py
"""
def test_parse_from_labels(self):
labels = {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"}
gpuIds, otherLabels = ContainerCollector.parse_from_labels(labels)
self.assertEqual(["0", "1"], gpuIds,)
copied = copy.deepcopy(labels)
copied.pop("container_label_GPU_ID")
self.assertEqual(copied, otherLabels)
def test_infer_service_name(self):
self.assertIsNone(ContainerCollector.infer_service_name(
"k8s_POD_alertmanager-7884c59f78-66r86_default_0a32e30a-f6ae-11e8"))
self.assertEqual(
"alertmanager",
ContainerCollector.infer_service_name(
"k8s_alertmanager_alertmanager-7884c59f78-66r86_default_0a32e30a-f6ae-11e8-a62d-000d3ab25bb6_2"))
self.assertIsNone(ContainerCollector.infer_service_name(
"k8s_kube-scheduler_kube-scheduler-10.151.40.4_kube-system_f1164d931979939cf0601155df9c748a_6"))
class TestDockerCollector(base.TestBase):
"""
Test DockerCollector in collector.py
"""
def assert_metrics(self, metrics):
self.assertEqual(1, len(metrics))
self.assertEqual(1, len(metrics[0].samples))
sample = metrics[0].samples[0]
self.assertEqual(1, len(sample[1])) # label keys
self.assertEqual(1, sample[2]) # sample value
def test_impl(self):
_, c = collector.instantiate_collector(
"test_docker_collector1",
0.5,
collector.DockerCollector)
self.assert_metrics(c.collect_impl())
def test_base_collector(self):
""" actually setup DockerCollector thread, and test, since this is multi-thread
test case, maybe sensitive to the system load """
ref = collector.make_collector(
"test_docker_collector2",
0.5,
collector.DockerCollector)
metrics = None
for i in range(10):
metrics = ref.get()
if metrics is not None:
break
time.sleep(0.1)
self.assert_metrics(metrics)
class TestZombieCollector(base.TestBase):
"""
Test ZombieCollector in collector.py
"""
def setUp(self):
# Because prometheus forbid same metric name, and we generate metric
# in from name, we need to differentiate name using time.
t = str(time.time()).replace(".", "_")
_, self.collector = collector.instantiate_collector(
"test_zombie_collector" + t,
0.5,
collector.ZombieCollector,
collector.AtomicRef())
def test_update_zombie_count_type1(self):
start = datetime.datetime.now()
one_sec = datetime.timedelta(seconds=1)
type1_recorder = self.collector.type1_zombies
self.assertEqual(0,
self.collector.update_zombie_count_type1({"a", "b"}, start))
self.assertEqual(2, len(type1_recorder))
self.assertEqual(0,
self.collector.update_zombie_count_type1({"a", "b"},
start + type1_recorder.decay_time - one_sec))
self.assertEqual(2, len(type1_recorder))
self.assertEqual(2,
self.collector.update_zombie_count_type1({"a", "b"},
start + type1_recorder.decay_time + one_sec))
self.assertEqual(2, len(type1_recorder))
self.assertEqual(1,
self.collector.update_zombie_count_type1({"a"},
start + type1_recorder.decay_time + 2 *one_sec))
self.assertEqual(1, len(type1_recorder))
self.assertEqual(0,
self.collector.update_zombie_count_type1({},
start + type1_recorder.decay_time + 3 * one_sec))
self.assertEqual(0, len(type1_recorder))
def test_update_zombie_count_type2(self):
start = datetime.datetime.now()
one_sec = datetime.timedelta(seconds=1)
stats = {"43ffe701d883":
{"name": "core-caffe2_resnet50_20181012040921.586-container_e03_1539312078880_0780_01_000002"},
"8de2f53e64cb":
{"name": "container_e03_1539312078880_0780_01_000002"}}
type2_recorder = self.collector.type2_zombies
self.assertEqual(0,
self.collector.update_zombie_count_type2(stats, start))
stats.pop("8de2f53e64cb")
self.assertEqual(0,
self.collector.update_zombie_count_type2(stats, start + one_sec))
self.assertEqual(0,
self.collector.update_zombie_count_type2(stats,
start + type2_recorder.decay_time))
self.assertEqual(1,
self.collector.update_zombie_count_type2(stats,
start + type2_recorder.decay_time + 2 * one_sec))
stats.pop("43ffe701d883")
self.assertEqual(0,
self.collector.update_zombie_count_type2(stats,
start + type2_recorder.decay_time + 3 * one_sec))
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -15,46 +15,25 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import os
import unittest
import yaml
import logging
import logging.config
import base
sys.path.append(os.path.abspath("../src/"))
from docker_inspect import parse_docker_inspect
class TestDockerInspect(unittest.TestCase):
class TestDockerInspect(base.TestBase):
"""
Test docker_inspect.py
"""
def setUp(self):
try:
os.chdir(os.path.abspath("test"))
except:
pass
configuration_path = "logging.yaml"
if os.path.exists(configuration_path):
with open(configuration_path, 'rt') as f:
logging_configuration = yaml.safe_load(f.read())
logging.config.dictConfig(logging_configuration)
logging.getLogger()
def tearDown(self):
try:
os.chdir(os.path.abspath(".."))
except:
pass
def test_parse_docker_inspect(self):
sample_path = "data/docker_inspect_sample.json"
file = open(sample_path, "r")
docker_inspect = file.read()
with open(sample_path, "r") as f:
docker_inspect = f.read()
inspect_info = parse_docker_inspect(docker_inspect)
target_inspect_info = {"labels": {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"}, "env": {"container_env_PAI_TASK_INDEX": "0"}, "pid": 95539}
self.assertEqual(target_inspect_info, inspect_info)

Просмотреть файл

@ -18,9 +18,8 @@
import os
import sys
import unittest
import yaml
import logging
import logging.config
import base
sys.path.append(os.path.abspath("../src/"))
@ -30,37 +29,17 @@ from docker_stats import parse_usage_limit
from docker_stats import parse_io
from docker_stats import parse_percentile
class TestDockerStats(unittest.TestCase):
class TestDockerStats(base.TestBase):
"""
Test docker_stats.py
"""
def setUp(self):
try:
os.chdir(os.path.abspath("test"))
except:
pass
configuration_path = "logging.yaml"
if os.path.exists(configuration_path):
with open(configuration_path, 'rt') as f:
logging_configuration = yaml.safe_load(f.read())
logging.config.dictConfig(logging_configuration)
logging.getLogger()
def tearDown(self):
try:
os.chdir(os.path.abspath(".."))
except:
pass
def test_parse_docker_inspect(self):
sample_path = "data/docker_stats_sample.txt"
file = open(sample_path, "r")
docker_stats = file.read()
with open(sample_path, "r") as f:
docker_stats = f.read()
stats_info = parse_docker_stats(docker_stats)
target_stats_info = {'722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636': {'BlockIO': {'out': 156000000.0, 'in': 28600000.0}, 'NetIO': {'out': 425000000000.0, 'in': 1580000000000.0}, 'CPUPerc': '0.00', 'MemPerc': '0.19', 'id': '722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636', 'MemUsage_Limit': {'usage': 111149056.0, 'limit': 59088012574.72}, 'name': 'alert-manager'}, '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419': {'BlockIO': {'out': 0.0, 'in': 28000000.0}, 'NetIO': {'out': 0.0, 'in': 0.0}, 'CPUPerc': '0.00', 'MemPerc': '6.23', 'id': '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419', 'MemUsage_Limit': {'usage': 19587399.68, 'limit': 314572800.0}, 'name': 'prometheus'}}
print stats_info.values()
target_stats_info = {'722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636': {'BlockIO': {'out': 156000000.0, 'in': 28600000.0}, 'NetIO': {'out': 425000000000.0, 'in': 1580000000000.0}, 'CPUPerc': 0.00, 'MemPerc': 0.19, 'id': '722dac0a62cf0243e63a268b8ef995e8386c185c712f545c0c403b295a529636', 'MemUsage_Limit': {'usage': 111149056.0, 'limit': 59088012574.72}, 'name': 'alert-manager'}, '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419': {'BlockIO': {'out': 0.0, 'in': 28000000.0}, 'NetIO': {'out': 0.0, 'in': 0.0}, 'CPUPerc': 0.00, 'MemPerc': 6.23, 'id': '33a22dcd4ba31ebc4a19fae865ee62285b6fae98a6ab72d2bc65e41cdc70e419', 'MemUsage_Limit': {'usage': 19587399.68, 'limit': 314572800.0}, 'name': 'prometheus'}}
self.assertEqual(target_stats_info, stats_info)
def test_convert_to_byte(self):
@ -83,7 +62,7 @@ class TestDockerStats(unittest.TestCase):
def test_parse_percentile(self):
data = "24.45%"
result = parse_percentile(data)
target = "24.45"
target = 24.45
self.assertEqual(target, result)
if __name__ == '__main__':

Просмотреть файл

@ -1,89 +0,0 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import unittest
import yaml
import logging
import logging.config
sys.path.append(os.path.abspath("../src/"))
import gpu_exporter
from utils import Metric
class TestGPUExporter(unittest.TestCase):
"""
Test gpu_exporter.py
"""
def setUp(self):
try:
os.chdir(os.path.abspath("test"))
except:
pass
configuration_path = "logging.yaml"
if os.path.exists(configuration_path):
with open(configuration_path, 'rt') as f:
logging_configuration = yaml.safe_load(f.read())
logging.config.dictConfig(logging_configuration)
logging.getLogger()
def tearDown(self):
try:
os.chdir(os.path.abspath(".."))
except:
pass
def test_parse_smi_xml_result(self):
sample_path = "data/nvidia_smi_sample.xml"
file = open(sample_path, "r")
nvidia_smi_result = file.read()
nvidia_smi_parse_result = gpu_exporter.parse_smi_xml_result(nvidia_smi_result)
target_smi_info = {'1': {'gpu_util': u'98', 'gpu_mem_util': u'97'},
'0': {'gpu_util': u'100', 'gpu_mem_util': u'99'}}
self.assertEqual(target_smi_info, nvidia_smi_parse_result)
def test_convert_gpu_info_to_metrics(self):
info = {'1': {'gpu_util': u'98', 'gpu_mem_util': u'97'},
'0': {'gpu_util': u'100', 'gpu_mem_util': u'99'}}
metrics = gpu_exporter.convert_gpu_info_to_metrics(info)
self.assertEqual(5, len(metrics))
self.assertIn(Metric("nvidiasmi_attached_gpus", {}, 2), metrics)
self.assertIn(Metric("nvidiasmi_utilization_gpu", {"minor_number": "0"}, "100"),
metrics)
self.assertIn(Metric("nvidiasmi_utilization_memory", {"minor_number": "0"}, "99"),
metrics)
self.assertIn(Metric("nvidiasmi_utilization_gpu", {"minor_number": "1"}, "98"),
metrics)
self.assertIn(Metric("nvidiasmi_utilization_memory", {"minor_number": "1"}, "97"),
metrics)
def test_exporter_will_not_report_unsupported_gpu(self):
sample_path = "data/nvidia_smi_outdated_gpu.xml"
file = open(sample_path, "r")
nvidia_smi_result = file.read()
nvidia_smi_parse_result = gpu_exporter.parse_smi_xml_result(nvidia_smi_result)
self.assertEqual({}, nvidia_smi_parse_result)
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -1,133 +0,0 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import copy
import unittest
import yaml
import logging
import logging.config
import datetime
sys.path.append(os.path.abspath("../src/"))
import job_exporter
log = logging.getLogger(__name__)
class TestJobExporter(unittest.TestCase):
"""
Test job_exporter.py
"""
def setUp(self):
try:
os.chdir(os.path.abspath("test"))
except:
pass
configuration_path = "logging.yaml"
if os.path.exists(configuration_path):
with open(configuration_path, 'rt') as f:
logging_configuration = yaml.safe_load(f.read())
logging.config.dictConfig(logging_configuration)
logging.getLogger()
def tearDown(self):
try:
os.chdir(os.path.abspath(".."))
except:
pass
def test_parse_from_labels(self):
labels = {"container_label_PAI_USER_NAME": "openmindstudio", "container_label_GPU_ID": "0,1,", "container_label_PAI_HOSTNAME": "paigcr-a-gpu-1058", "container_label_PAI_JOB_NAME": "trialslot_nnimain_d65bc5ac", "container_label_PAI_CURRENT_TASK_ROLE_NAME": "tuner"}
gpuIds, otherLabels = job_exporter.parse_from_labels(labels)
self.assertEqual(["0", "1"], gpuIds,)
copied = copy.deepcopy(labels)
copied.pop("container_label_GPU_ID")
self.assertEqual(copied, otherLabels)
def test_generate_zombie_count_type1(self):
zombies = job_exporter.ZombieRecorder()
start = datetime.datetime.now()
one_sec = datetime.timedelta(seconds=1)
self.assertEqual(0,
job_exporter.generate_zombie_count_type1(zombies, {"a", "b"}, start))
self.assertEqual(2, len(zombies))
self.assertEqual(0,
job_exporter.generate_zombie_count_type1(zombies, {"a", "b"},
start + zombies.decay_time - one_sec))
self.assertEqual(2, len(zombies))
self.assertEqual(2,
job_exporter.generate_zombie_count_type1(zombies, {"a", "b"},
start + zombies.decay_time + one_sec))
self.assertEqual(2, len(zombies))
self.assertEqual(1,
job_exporter.generate_zombie_count_type1(zombies, {"a"},
start + zombies.decay_time + 2 *one_sec))
self.assertEqual(1, len(zombies))
self.assertEqual(0,
job_exporter.generate_zombie_count_type1(zombies, {},
start + zombies.decay_time + 3 * one_sec))
self.assertEqual(0, len(zombies))
def test_generate_zombie_count_type2(self):
zombies = job_exporter.ZombieRecorder()
start = datetime.datetime.now()
one_sec = datetime.timedelta(seconds=1)
stats = {"43ffe701d883":
{"name": "core-caffe2_resnet50_20181012040921.586-container_e03_1539312078880_0780_01_000002"},
"8de2f53e64cb":
{"name": "container_e03_1539312078880_0780_01_000002"}}
self.assertEqual(0,
job_exporter.generate_zombie_count_type2(zombies, stats, start))
stats.pop("8de2f53e64cb")
self.assertEqual(0,
job_exporter.generate_zombie_count_type2(zombies, stats, start + one_sec))
self.assertEqual(0,
job_exporter.generate_zombie_count_type2(zombies, stats,
start + zombies.decay_time))
self.assertEqual(1,
job_exporter.generate_zombie_count_type2(zombies, stats,
start + zombies.decay_time + 2 * one_sec))
stats.pop("43ffe701d883")
self.assertEqual(0,
job_exporter.generate_zombie_count_type2(zombies, stats,
start + zombies.decay_time + 3 * one_sec))
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -1,4 +1,3 @@
#!/usr/bin/python
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
@ -16,27 +15,27 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import argparse
import datetime
import os
import sys
import unittest
def check_no_older_than(paths, delta):
""" raise RuntimeError exception if any path in paths is older than `now - delta` """
now = datetime.datetime.now()
delta = datetime.timedelta(seconds=delta)
oldest = now - delta
import base
for path in paths:
mtime = os.path.getmtime(path)
mtime = datetime.datetime.fromtimestamp(mtime)
if oldest > mtime:
raise RuntimeError("{} was updated more than {} seconds ago".format(path, delta))
sys.path.append(os.path.abspath("../src/"))
import network
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("paths", nargs="+", help="file to be checked")
parser.add_argument("-d", "--delta", type=int, default=60, help="check file is no older than -d seconds")
args = parser.parse_args()
class TestNetwork(base.TestBase):
"""
Test network.py
"""
check_no_older_than(args.paths, args.delta)
def test_parse_lsof(self):
output = """COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
python3 52485 dixu 5u IPv4 420398429 0t0 TCP 10.150.148.166:43682->198.100.183.212:443 (ESTABLISHED)
0t0 TCP 10.151.40.4:36090->10.151.40.4:8031 (ESTABLISHED)
"""
result = network.parse_lsof(output)
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -0,0 +1,50 @@
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import sys
import unittest
import base
sys.path.append(os.path.abspath("../src/"))
import nvidia
class TestNvidia(base.TestBase):
"""
Test nvidia.py
"""
def test_parse_smi_xml_result(self):
sample_path = "data/nvidia_smi_sample.xml"
with open(sample_path, "r") as f:
nvidia_smi_result = f.read()
nvidia_smi_parse_result = nvidia.parse_smi_xml_result(nvidia_smi_result)
target_smi_info = {'1': {'gpu_util': 98, 'gpu_mem_util': 97},
'0': {'gpu_util': 100, 'gpu_mem_util': 99}}
self.assertEqual(target_smi_info, nvidia_smi_parse_result)
def test_exporter_will_not_report_unsupported_gpu(self):
sample_path = "data/nvidia_smi_outdated_gpu.xml"
with open(sample_path, "r") as f:
nvidia_smi_result = f.read()
nvidia_smi_parse_result = nvidia.parse_smi_xml_result(nvidia_smi_result)
self.assertEqual({}, nvidia_smi_parse_result)
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -18,139 +18,35 @@
import os
import sys
import unittest
import yaml
import threading
import logging
import logging.config
import tempfile
import subprocess
import base
sys.path.append(os.path.abspath("../src/"))
import utils
from utils import Metric
log = logging.getLogger(__name__)
class TestUtils(unittest.TestCase):
class TestUtils(base.TestBase):
"""
Test utils.py
"""
def setUp(self):
try:
os.chdir(os.path.abspath("test"))
except:
pass
def test_walk_json_field_safe(self):
self.assertIsNone(utils.walk_json_field_safe(None, 1, "abc"))
self.assertIsNone(utils.walk_json_field_safe([], 1, "abc"))
self.assertIsNone(utils.walk_json_field_safe([{"abc"}], 1, "abc"))
self.assertEqual("345",
utils.walk_json_field_safe([{"name": "123"}, {"name": "345"}], 1, "name"))
configuration_path = "logging.yaml"
def test_exec_cmd_with_0_return_value(self):
self.assertEqual("10\n", utils.exec_cmd(["echo", "10"]))
if os.path.exists(configuration_path):
with open(configuration_path, 'rt') as f:
logging_configuration = yaml.safe_load(f.read())
logging.config.dictConfig(logging_configuration)
logging.getLogger()
def test_exec_cmd_with_timeout(self):
with self.assertRaises(subprocess.TimeoutExpired) as context:
utils.exec_cmd(["sleep", "10"], timeout=1)
def tearDown(self):
try:
os.chdir(os.path.abspath(".."))
except:
pass
def test_metrics_eq(self):
self.assertEqual(Metric("foo", {"abc": "1"}, "3"),
Metric("foo", {"abc": "1"}, "3"))
self.assertNotEqual(Metric("foo", {"abc": "1"}, "3"),
Metric("bar", {"abc": "1"}, "3"))
self.assertNotEqual(Metric("foo", {"abc": "2"}, "3"),
Metric("foo", {"abc": "1"}, "3"))
self.assertNotEqual(Metric("foo", {"abc": "2"}, "3"),
Metric("foo", {"abc": "2"}, "5"))
def test_export_metrics_to_file(self):
metrics = []
metrics.append(Metric("foo", {"bar": 2}, "3"))
metrics.append(Metric("bar", {}, "4"))
with tempfile.NamedTemporaryFile() as f:
utils.export_metrics_to_file(f.name, metrics)
lines = f.readlines()
self.assertEqual("foo{bar=\"2\"} 3", lines[0].strip())
self.assertEqual("bar 4", lines[1].strip())
def test_singleton_normal(self):
def getter():
return 100
singleton = utils.Singleton(getter)
for _ in xrange(10):
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertFalse(is_old)
def test_singleton_with_blocking_getter_no_old_data(self):
semaphore = threading.Semaphore(1)
def blocking_getter():
semaphore.acquire(blocking=True)
semaphore.release()
return 100
singleton = utils.Singleton(blocking_getter, get_timeout_s=0.2)
val, is_old = singleton.try_get()
self.assertIsNotNone(val)
self.assertFalse(is_old)
for _ in xrange(3):
semaphore.acquire()
for _ in xrange(3):
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertTrue(is_old)
semaphore.release()
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertFalse(is_old)
def test_singleton_with_blocking_getter_allow_old_data(self):
semaphore = threading.Semaphore(1)
def blocking_getter():
semaphore.acquire(blocking=True)
semaphore.release()
return 100
singleton = utils.Singleton(blocking_getter, get_timeout_s=0.2)
semaphore.acquire()
for _ in xrange(3):
val, is_old = singleton.try_get()
self.assertIsNone(val)
self.assertTrue(is_old)
semaphore.release()
# let singleton cache one value
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertFalse(is_old)
for _ in xrange(3):
semaphore.acquire()
for _ in xrange(3):
# singleton returns old value
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertTrue(is_old)
semaphore.release()
val, is_old = singleton.try_get()
self.assertEqual(100, val)
self.assertFalse(is_old)
def test_exec_cmd_with_non_0_return_value(self):
with self.assertRaises(subprocess.CalledProcessError) as context:
utils.exec_cmd(["false"])
if __name__ == '__main__':
unittest.main()

Просмотреть файл

@ -9,8 +9,6 @@ class NodeExporter(object):
self.default_service_conf = default_service_conf
def validation_pre(self):
print("cluster_config is %s, service_config is %s, default is %s" % \
(self.cluster_conf, self.service_conf, self.default_service_conf))
return True, None
def run(self):

Просмотреть файл

@ -18,8 +18,6 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
annotations:
prometheus.io/scrape: 'true'
name: node-exporter
spec:
selector:
@ -29,6 +27,10 @@ spec:
metadata:
labels:
app: node-exporter
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "{{ cluster_cfg["node-exporter"]["port"] }}"
name: node-exporter
spec:
containers:
@ -38,17 +40,13 @@ spec:
httpGet:
path: /metrics
port: {{ cluster_cfg["node-exporter"]["port"] }}
initialDelaySeconds: 30
initialDelaySeconds: 3
periodSeconds: 30
resources:
limits:
memory: "128Mi"
volumeMounts:
- mountPath: /datastorage/prometheus
name: collector-mount
name: node-exporter
args:
- '--collector.textfile.directory=/datastorage/prometheus'
- '--no-collector.arp'
- '--no-collector.bcache'
- '--no-collector.bonding'
@ -82,84 +80,9 @@ spec:
- containerPort: {{ cluster_cfg["node-exporter"]["port"] }}
hostPort: {{ cluster_cfg["node-exporter"]["port"] }}
name: scrape
- image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}job-exporter:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
readinessProbe:
exec:
command:
- "python"
- "/job_exporter/no_older_than.py"
- "--delta"
- "120"
- "/datastorage/prometheus/job_exporter.prom"
- "/datastorage/prometheus/gpu_exporter.prom"
initialDelaySeconds: 30
periodSeconds: 30
resources:
limits:
memory: "128Mi"
securityContext:
privileged: true # this is required by job-exporter
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: LOGGING_LEVEL
value: INFO
- name: NV_DRIVER
value: /var/drivers/nvidia/current
volumeMounts:
- mountPath: /root/.docker
name: docker-cred-volume
- mountPath: /bin/docker
name: docker-bin
- mountPath: /var/run/docker.sock
name: docker-socket
- mountPath: /dev
name: device-mount
- mountPath: /var/drivers/nvidia/current
name: driver-path
- mountPath: /datastorage/prometheus
name: collector-mount
- mountPath: /gpu-config
name: gpu-config
name: job-exporter
volumes:
- name: docker-bin
hostPath:
path: /bin/docker
- name: docker-socket
hostPath:
path: /var/run/docker.sock
- name: docker-cred-volume
configMap:
name: docker-credentials
- name: device-mount
hostPath:
path: /dev
- name: driver-path
hostPath:
path: /var/drivers/nvidia/current
- name: collector-mount
hostPath:
path: {{ cluster_cfg["cluster"]["common"]["data-path"] }}/prometheus
- name: rootfs
hostPath:
path: /
- name: var-run
hostPath:
path: /var/run
- name: sys
hostPath:
path: /sys
- name: gpu-config
configMap:
name: gpu-configuration
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
hostNetwork: true
hostPID: true # This is required since job-exporter should get list of pid in container
tolerations:
- key: node.kubernetes.io/memory-pressure
operator: "Exists"

Просмотреть файл

@ -17,7 +17,6 @@
prerequisite:
- cluster-configuration
- drivers
template-list:
- node-exporter.yaml

Просмотреть файл

@ -41,3 +41,11 @@ groups:
type: pai_service
annotations:
summary: "{{$labels.job}} in {{$labels.instance}} not up detected"
- alert: JobExporterHangs
expr: irate(collector_iteration_count_total[5m]) == 0
for: 5m
labels:
type: pai_service
annotations:
summary: "{{$labels.name}} in {{$labels.instance}} hangs detected"

Просмотреть файл

@ -29,19 +29,7 @@ data:
rule_files:
- "/etc/prometheus-alert/*.rules"
scrape_configs:
- job_name: "node_exporter"
scrape_interval: {{ prom_info["scrape_interval"] }}s
kubernetes_sd_configs:
- api_server: "{{ cluster_cfg["kubernetes"]["api-servers-url"] }}"
role: node
# Extract label __address__ inner ip address and replace the port to exporter"s port to build target address.
# Prometheus will fetch data from target address.
relabel_configs:
- source_labels: [__address__]
regex: '([^:]+)(:\d*)?'
replacement: "${1}:{{ cluster_cfg["node-exporter"]["port"] }}"
target_label: __address__
- job_name: "pai_serivce_exporter"
- job_name: 'pai_serivce_exporter'
scrape_interval: {{ prom_info["scrape_interval"] }}s
kubernetes_sd_configs:
- api_server: "{{ cluster_cfg["kubernetes"]["api-servers-url"] }}"

Просмотреть файл

@ -39,8 +39,7 @@ spec:
image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}watchdog:{{cluster_cfg["cluster"]["docker-registry"]["tag"]}}
imagePullPolicy: Always
readinessProbe:
httpGet:
path: /
tcpSocket: # because http get will trigger job-exporter abandom old metrics, so we use tcp instead of http
port: 9101
initialDelaySeconds: 30
periodSeconds: 30

Просмотреть файл

@ -23,7 +23,6 @@ import json
import sys
import requests
import logging
from logging.handlers import RotatingFileHandler
import time
import threading