# Orchestration delays Investigation
1. Run all cells.
1. Scroll down to see for any authentication messages
1. View report at the bottom.

In [None]:
# These are just defaults will be overwritten if you use nimport pip
start = "2019-08-08T23:50:00.0000000Z"
end = "2019-08-09T00:24:36.0000000Z"
service = "tfs"
hub = "Build"
su = "tfs-wcus-0"
url = "https://notebooksv2.azure.com/yaananth/projects/06OasuNRs6rK/delays.ipynb"
baseUrl = "https://notebooksv2.azure.com/yaananth/projects/06OasuNRs6rK"

In [None]:
%%capture
!pip install nimport azure-kusto-notebooks

In [None]:
# Import the things we use

# Note you can also use kql https://docs.microsoft.com/en-us/azure/data-explorer/kqlmagic
# %kql is single line magic
# %%kql is cell magic

# https://nbviewer.jupyter.org/github/ipython/ipython/blob/4.0.x/examples/IPython%20Kernel/Rich%20Output.ipynb#HTML
# https://ipython.readthedocs.io/en/stable/inte/magics.html
from IPython.display import display, HTML, Markdown, Javascript, clear_output

# http://pandas-docs.github.io/pandas-docs-travis/user_guide/reshaping.html
import pandas as pd
pd.options.display.html.table_schema = True
from pandas import Series, DataFrame
from datetime import datetime, timedelta, timezone
from urllib.parse import urlencode, quote_plus
from requests.utils import requote_uri
import time
import numpy as np
from matplotlib import pyplot as plt
from nimport.utils import tokenize, open_nb
import json
import os
import calendar as cal
import concurrent.futures
from azure.kusto.notebooks import utils as akn

In [None]:
params = {
 "su": su,
 "start": start,
 "end": end,
 "url": url,
 "baseUrl": baseUrl,
 "service": service,
 "hub": hub
}
root = 'devops-pipelines' if os.path.basename(os.getcwd()) != 'devops-pipelines' else ''
queryPath = os.path.join(root, 'queries')

In [None]:
# authenticate kusto client
# you will need to copy the token into a browser window for AAD auth. 
client = akn.get_client('https://vso.kusto.windows.net')

In [None]:
# authenticate kusto client
# you will need to copy the token into a browser window for AAD auth. 
icm_client = akn.get_client('https://icmcluster.kusto.windows.net')

In [None]:
q_loc = os.path.join(queryPath, "LocationName.csl")
q_whatChanged = os.path.join(queryPath, "WhatChanged.csl")
q_haActions = os.path.join(queryPath, "HealthAgentActions.csl")
q_mdm = os.path.join(queryPath, "MDMAccount.csl")

delaysPath = os.path.join(queryPath, "delays")
q_affectedAccounts = os.path.join(delaysPath, "AffectedAccounts.csl")
q_abusers = os.path.join(delaysPath, "Abusers.csl")
q_affAccounts = os.path.join(delaysPath, "AffectedAccounts.csl")
q_delayedAccountsAreAbusers = os.path.join(delaysPath, "DelayedAccountsAreAbusers.csl")
q_whatDelayed = os.path.join(delaysPath, "WhatDelayed.csl")
q_load = os.path.join(delaysPath, "Load.csl")

with concurrent.futures.ThreadPoolExecutor() as executor:
 # materialize location name immediately as we need this for other queries
 p1 = executor.submit(akn.execute_file, client, 'VSO', q_loc, params)
 locationNameResult = akn.to_dataframe_from_future(p1)
 locationName = locationNameResult["Tenant"][0]
 params["locationName"] = locationName
 p2 = executor.submit(akn.execute_file, client, 'VSO', q_whatChanged, params)
 p4 = executor.submit(akn.execute_file, client, 'VSO', q_haActions, params) 
 
 p5 = executor.submit(akn.execute_file, client, 'VSO', q_affectedAccounts, params)
 p6 = executor.submit(akn.execute_file, client, 'VSO', q_abusers, params)
 p7 = executor.submit(akn.execute_file, client, 'VSO', q_affAccounts, params)
 p8 = executor.submit(akn.execute_file, client, 'VSO', q_delayedAccountsAreAbusers, params)
 p9 = executor.submit(akn.execute_file, client, 'VSO', q_whatDelayed, params)
 p10 = executor.submit(akn.execute_file, client, 'VSO', q_load, params)
 
 p11 = executor.submit(akn.execute_file, icm_client, 'IcmDataWarehouse', 
 os.path.join(queryPath, 'ActiveIncidents.csl'), params)
 p12 = executor.submit(akn.execute_file, client, 'VSO', q_mdm, params)

q_whatChanged_df = akn.to_dataframe_from_future(p2)
q_haActions_df = akn.to_dataframe_from_future(p4)
q_affectedAccountsResultDf = akn.to_dataframe_from_future(p5)

abusersDf = akn.to_dataframe_from_future(p6)
finalabusersList = np.intersect1d(q_affectedAccountsResultDf["HostId"].values, abusersDf["HostId"].values);

q_affAccounts_df = akn.to_dataframe_from_future(p7)
q_delayedAccountsAreAbusers_df = akn.to_dataframe_from_future(p8)
q_whatDelayedResultDf = akn.to_dataframe_from_future(p9)
q_loadResultDf = akn.to_dataframe_from_future(p10)

q_activeIncidentsResultDf = akn.to_dataframe_from_future(p11)

q_mdmDf = akn.to_dataframe_from_future(p12)
params["mdmAccount"] = q_mdmDf["monitoringAccount"][0]

In [None]:
query = os.path.join(delaysPath, "OrchestrationLogSpike.csl")
with concurrent.futures.ThreadPoolExecutor() as executor:
 sfs = [executor.submit(akn.execute_file, client, 'VSO', query, 
 {
 **params,
 "hostId": r
 }) for r in q_delayedAccountsAreAbusers_df["HostId"].values]
 sfsResults = [s.result() for s in concurrent.futures.as_completed(sfs)]

# convert to data frames
primary_results = [s.primary_results[0] for s in sfsResults]
spikeResultsDfs = None
with concurrent.futures.ThreadPoolExecutor() as executor:
 dataframe_futures = [executor.submit(akn.to_dataframe, r) for r in primary_results]
 spikeResultsDfs = [dff.result() for dff in concurrent.futures.as_completed(dataframe_futures)]
sfsResults = None

In [None]:
# visualize delays
import plotly
from plotly import graph_objs as go
delays = go.Scatter(
 x=q_affAccounts_df["PreciseTimeStamp"],
 y=q_affAccounts_df["MessageDelayInSeconds"],
 mode = 'lines',
 name = 'Delays in seconds',
 text= q_affAccounts_df['Name']
)

changed = go.Scatter(
 x=q_whatChanged_df["TIMESTAMP"],
 y=np.repeat(50, len(q_whatChanged_df["TIMESTAMP"].values)),
 mode = 'lines+markers',
 name = 'What Changed',
 text = q_whatChanged_df["Name"],
 marker=dict(
 size=32,
 color = np.random.randn(500),
 colorscale='Viridis'
 )
)

mitigations = go.Scatter(
 x=q_haActions_df["PreciseTimeStamp"],
 y=np.repeat(50, len(q_haActions_df["PreciseTimeStamp"].values)),
 mode = 'markers',
 name = 'Mitigations',
 text = q_haActions_df[["MitigationName", "RoleInstance"]].apply(lambda x: ''.join(x), axis=1),
 marker = dict(
 size = 10,
 color = 'rgba(152, 0, 0, .8)',
 line = dict(
 width = 2,
 color = 'rgb(0, 0, 0)'
 )
 )
)

data = [delays, changed, mitigations]
plotly.offline.iplot(data)

In [None]:
# utility functions
content = ''
def r(*args):
 '''construct a markdown report'''
 global content
 content += ''.join([str(a) for a in args]) + '\n'

def pandas_df_to_markdown_table(df):
 from IPython.display import Markdown, display
 fmt = ['---' for i in range(len(df.columns))]
 df_fmt = pd.DataFrame([fmt], columns=df.columns)
 df_formatted = pd.concat([df_fmt, df])
 return df_formatted.to_csv(sep="|", index=False)

startTime = akn.to_datetime(start)
# report! 
r('# OK SO WHAT HAPPENED')
r('|parameter|value|')
r('|---|---|')
r('|startTime|', startTime, '|')
r('|endTime|', akn.to_datetime(end), '|')
r('|scale unit|', su, '|')
r('|service|', service, '|')

# jarvis params
jarvisParams = {
 'su': su, 
 'start': akn.get_time(start, -10), 
 'end': akn.get_time(end, 10), 
 'service': service,
 'location': locationName,
 'account': params["mdmAccount"]
}

# what changed? analysis
r('## What changed?')
if(len(q_whatChanged_df.index) == 0):
 r("...no relevant config changes recorded during this period.")
else:
 up_prefix = ""
 mit_prefix = ""
 vip_prefix = ""
 text = ""
 for index, row in q_whatChanged_df.iterrows():
 delta = startTime.replace(tzinfo=None) - row.TIMESTAMP.replace(tzinfo=None)
 when = "before"
 if delta.total_seconds() < 0:
 when = "after"
 delta = row.TIMESTAMP.replace(tzinfo=None) - startTime.replace(tzinfo=None)
 hoursHappened = delta.total_seconds()//3600
 considerTime = hoursHappened <= 1
 def getText(row):
 return """%s %s %s (%s days %s hours %s minutes %s the start time) \n\n""" % (row.TIMESTAMP, row.title, row.buildNumber, delta.days, delta.seconds//3600, delta.seconds//60, when)
 if(row.title.lower().find('upgrade') != -1):
 if not up_prefix:
 up_prefix += "Looks like, there's upgrade...\n\n"
 text += getText(row)
 if(row.title.lower().find('mitigation') != -1):
 if considerTime and not mit_prefix:
 mit_prefix += "Looks like, there are some mitigations by health agent...\n\n"
 text += getText(row)
 if(row.title.lower().find('vip') != -1):
 if considerTime and not mit_prefix:
 mit_prefix += "Looks like, there is VIP swap...\n\n"
 text += getText(row)
 if(row.title.lower().find('feature flag') != -1):
 if considerTime and not mit_prefix:
 mit_prefix += "Looks like, some feature flags are enabled...\n\n"
 text += getText(row) 
 if text:
 r(up_prefix + mit_prefix + vip_prefix + text)
 else:
 r("...no relevant changes during this period.")
 
 
 
# active incidents?
r('## Active incidents?')
otherIncidentsCount = 0;
for index, row in q_activeIncidentsResultDf.iterrows():
 if(row.Title.find("Kalypso: Build Orchestrator Delays ICM") == -1):
 otherIncidentsCount += 1
 
if(otherIncidentsCount > 0):
 r("INSIGHT: There were incidents recorded during this period. These might be related:")
 newDf = q_activeIncidentsResultDf.assign(URL=[*map(lambda x: """[%s](https://icm.ad.msft.net/imp/v3/incidents/details/%s/home)""" % (x,x), q_activeIncidentsResultDf.IncidentId)]) 
 r("\n")
 r(pandas_df_to_markdown_table(newDf[['URL','Severity','Title']]))
else:
 r("...no relevant incidents during this period.")

 
 
 
r('## Queue Load')
ar = q_loadResultDf[q_loadResultDf["Name"] == "DTPlanQueued"].values[:, 2]
queuedGreatherThan500 = np.where(ar > 500)
if len(queuedGreatherThan500[0]) > 0:
 r('INSIGHT: There was a high rate of jobs queued during this period (max: ', np.amax(ar), ' / minute)...')
else: 
 r('...everything looks good? (max: ', np.amax(ar), ' / minute)')
 
r('## Orchestration phase Load')
for spikeResultDf in spikeResultsDfs:
 countResult = spikeResultDf.C.describe()
 hostId = spikeResultDf["HostId"].values[0]
 upper = countResult["75%"]
 lower = countResult["25%"]
 # Wondering what's going on here? We detect anomolies, see https://www.purplemath.com/modules/boxwhisk3.htm
 IQR = upper - lower
 countResultOfInterest = spikeResultDf[spikeResultDf["C"] > upper + 1.5 * IQR ].head(5)
 unqCommands = list(dict.fromkeys(countResultOfInterest["Command"].values).keys())
 if len(unqCommands) > 0:
 commands = ','.join(str(e) for e in unqCommands)
 r("INSIGHT: Found anomalies for these phases in order highest to lowest for host %s: %s, max being %s \n" % (hostId, commands, countResult["max"]))
 r(pandas_df_to_markdown_table(countResultOfInterest[["Command", "C"]])) 
 newParams = dict(params)
 newParams["command"] = next(iter(unqCommands)) 
 newParams["hostId"] = hostId
 if "PlanCompleted" in commands:
 if "StartPlan" in commands or "PlanStarted" in commands:
 r("\nTIP: Lot of jobs might have started... creating this spike")
 else:
 r("\nTIP: Jobs that are queued long ago might have completed now... creating this spike") 
 r("\nConsider running these queries by changing times, if you need to dig in further: \n")
 r("```\n" + tokenize(os.path.join(os.path.join(queryPath, "delays"), "OrchestrationLogSpikeTip.csl"), newParams) + "\n```") 
 r("```\n" + tokenize(q_load, newParams) + "\n```") 
 else:
 r('...everything looks good?') 
 
# ja load
r('## JA Load')
q_whatDelayedResultPendingJobsDf = q_whatDelayedResultDf[q_whatDelayedResultDf.Pivot == "\JobService(_Total)\Total Pending Jobs"]
pendingGreaterThan50Result = np.where(q_whatDelayedResultPendingJobsDf.avg_CounterValue.values > 50)
if len(pendingGreaterThan50Result[0]) > 0:
 max_pending_jobs = np.max(q_whatDelayedResultPendingJobsDf.avg_CounterValue.values)
 r("INSIGHT: There was a high number of pending jobs during this period (max was %s). Note that this is for jobs including all priorities (even low priority ones)" % (max_pending_jobs)) 
 
 open_nb(os.path.join(root, 'ja.ipynb'), params, redirect=False)
 jaUrl = baseUrl + "/devops-pipelines/ja.ipynb"
 r('\n\n[JobAgent investigation notebook](', requote_uri(jaUrl), ')')

 jaJarvisLink = """https://jarvis-west.dc.ad.msft.net/dashboard/VSO-ServiceInsights/PlatformViews/Compute-JA""" \
 """?overrides=[{"query":"//*[id='Service']","key":"value","replacement":"%(service)s"},""" \
 """{"query":"//*[id='RoleInstance']","key":"value","replacement":""},""" \
 """{"query":"//*[id='LocationName']","key":"value","replacement":"%(location)s"},""" \
 """{"query":"//dataSources","key":"namespace","replacement":"%(su)s"},""" \
 """{"query":"//dataSources","key":"account","replacement":"%(account)s"},""" \
 """{"query":"//*[id='ApplicationEndpoint']","key":"regex","replacement":"*%(location)s*"},""" \
 """{"query":"//*[id='ScaleUnit']","key":"value","replacement":"%(su)s"}]""" \
 """&globalStartTime=%(start)s&globalEndTime=%(end)s&pinGlobalTimeRange=true""" % jarvisParams
 r('\n\n[JobAgent health dashboard](', requote_uri(jaJarvisLink), ')')
else:
 r('...everything looks good?')
 
# abuse detection?
r('## What users are impacted?')
if len(finalabusersList) > 0:
 r('Found abusers -- this alert is likely a false alarm.')
r(pandas_df_to_markdown_table(q_delayedAccountsAreAbusers_df)) 
 
 
# more analysis? 
r('## More analysis')
url = baseUrl + "/devops-pipelines/sla.ipynb"
SLAParams = {
 "triggerTime": params["start"],
 "scaleUnit": params["su"],
 "service": params["service"],
 "lookback": "1h",
 "region": ""
}
open_nb(os.path.join(root, 'sla.ipynb'), SLAParams, redirect=False)
r('\n\n[SLA investigation notebook](', requote_uri(url), ')') 

url = baseUrl + "/devops-pipelines/impact.ipynb"
open_nb(os.path.join(root, 'impact.ipynb'), params, redirect=False)
r('\n\n[Customer impact investigation notebook](', requote_uri(url), ')') 

# Scale unit health
jarvisLink = """https://jarvis-west.dc.ad.msft.net/dashboard/VSO-ServiceInsights/DevOpsReports/DevOpsReports""" \
 """?overrides=[{"query":"//*[id='Service']","key":"value","replacement":"%(service)s"},""" \
 """{"query":"//*[id='RoleInstance']","key":"value","replacement":""},""" \
 """{"query":"//*[id='LocationName']","key":"value","replacement":"%(location)s"},""" \
 """{"query":"//dataSources","key":"namespace","replacement":"%(su)s"},""" \
 """{"query":"//dataSources","key":"account","replacement":"%(account)s"},""" \
 """{"query":"//*[id='ApplicationEndpoint']","key":"regex","replacement":"*%(location)s*"},""" \
 """{"query":"//*[id='ScaleUnit']","key":"value","replacement":"%(su)s"}]""" \
 """&globalStartTime=%(start)s&globalEndTime=%(end)s&pinGlobalTimeRange=true""" % jarvisParams;
r('\n\n[Scale unit health dashboard (' + su + ', ' + service + ')](', requote_uri(jarvisLink), ')')


Markdown(content)
# print(content)