Removing azure_data requirement from host_summary notebooklet for test (#22)

* Removing azure_data requirement from host_summary notebooklet for test

* Updating azure-pipelines to python 3.8

type hints in data_providers

* Linting errors

* Another failing test in test_metadata.py

* Addressing McCabe and Prospector warnings

* Fixing test breaks in ti_enrich.py and account_summary.py

Adding additional McCabe suppressions to deal with diff versions (sometimes McCabe IDs the start of decorated function as the decorator line, in newer versions, it uses the def line)

* Fixing error caused by msticpy bug in ti_enrich

* Reverting change to calling SelectAlert since it fails on 1.4.5 and earlier
This commit is contained in:
Ian Hellen 2021-11-12 14:41:41 -08:00 коммит произвёл GitHub
Родитель bb908cd995
Коммит 2680ce2f7f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 66 добавлений и 43 удалений

Просмотреть файл

@ -24,7 +24,7 @@ stages:
pool:
vmImage: $(imageName)
variables:
python.version: '3.6'
python.version: '3.8'
steps:
# Add an alias for Windows python=>python3
- script: alias python='python3' pip='pip3'

Просмотреть файл

@ -200,10 +200,10 @@ def _format_func_doc(func_name, func, full_doc=False, prop_set=None):
if func_doc:
if not full_doc:
# Get the first line of the doc string
doc_lines.append(func_doc.split("\n")[0])
doc_lines.append(func_doc.split("\n", maxsplit=1)[0])
return doc_lines
func_doc = inspect.cleandoc(func_doc).split("\n")[0]
func_doc = inspect.cleandoc(func_doc).split("\n", maxsplit=1)[0]
if func_doc:
refmt_headings = []
for doc_line in func_doc.split("\n"):

Просмотреть файл

@ -7,7 +7,7 @@
import inspect
import sys
from collections import namedtuple
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from msticpy.common.exceptions import MsticpyAzureConfigError
from msticpy.common.wsconfig import WorkspaceConfig
@ -336,7 +336,7 @@ class DataProviders:
"""
return cls._DEFAULT_PROVIDERS
def _query_prov(self, provider, provider_defn, **kwargs):
def _query_prov(self, provider: str, provider_defn: ProviderDefn, **kwargs) -> Any:
try:
# Get any keys with the provider prefix and initialize the provider
prov_kwargs_args = self._get_provider_kwargs(provider, **kwargs)
@ -353,25 +353,30 @@ class DataProviders:
if not prov_connect_args and provider_defn.get_config:
prov_connect_args = provider_defn.get_config()
# call the connect function
created_provider.connect(**prov_connect_args)
try:
created_provider.connect(**prov_connect_args)
except Exception as err: # pylint: disable=broad-except
print(f"Connection attempt for {provider} failed.\n{err}")
return created_provider
except MsticpyAzureConfigError as mp_ex:
if get_opt("verbose"):
print("Warning:", mp_ex.args)
return None
def _no_connect_prov(self, provider, provider_defn, **kwargs):
def _no_connect_prov(
self, provider: str, provider_defn: ProviderDefn, **kwargs
) -> Any:
# Get the args passed to __init__ for this provider
prov_args = self._get_provider_kwargs(provider, **kwargs)
# If there are none and there's a config function, call that.
if not prov_args and provider_defn.get_config:
prov_args = provider_defn.get_config()
# Instatiate the provider
# Instantiate the provider
return provider_defn.prov_class(prov_args)
# Helper methods
@staticmethod
def _get_provider_kwargs(prefix, **kwargs):
def _get_provider_kwargs(prefix: str, **kwargs) -> Dict[str, str]:
"""Return the kwargs prefixed with "prefix_"."""
if prefix == "LogAnalytics" and any(
key for key in kwargs if key.startswith("AzureSentinel")
@ -389,7 +394,7 @@ class DataProviders:
}
@staticmethod
def _get_connect_args(func, **kwargs):
def _get_connect_args(func: Callable, **kwargs) -> Dict[str, str]:
"""Get the arguments required by the `connect` function."""
connect_params = inspect.signature(func).parameters
return {
@ -398,7 +403,7 @@ class DataProviders:
# Provider get_config functions
@staticmethod
def _azsent_get_config(**kwargs):
def _azsent_get_config(**kwargs) -> Dict[str, str]:
if "workspace" in kwargs:
ws_config = WorkspaceConfig(workspace=kwargs["workspace"])
elif "config_file" in kwargs:

Просмотреть файл

@ -1178,6 +1178,7 @@ def _create_ip_summary(data, ip_col, geoip):
.pipe(
(get_geoip_whois, "data"), geo_lookup=geoip, ip_col=ip_col
) # get geoip and whois
.drop(columns=["TimeGenerated", "Type"], errors="ignore")
.merge(data, left_on="IpAddress", right_on=ip_col)
)
for col in group_cols:

Просмотреть файл

@ -334,7 +334,7 @@ def _color_cells(val):
color = color_chart[val.casefold()]
else:
color = "none"
return "background-color: %s" % color
return f"background-color: {color}"
def _sev_score(sev):

Просмотреть файл

@ -107,9 +107,9 @@ class HostLogonsSummary(Notebooklet): # pylint: disable=too-few-public-methods
metadata = _CLS_METADATA
@set_text(docs=_CELL_DOCS, key="run") # noqa:MC0001
@set_text(docs=_CELL_DOCS, key="run") # noqa: MC0001
# pylint: disable=too-many-locals, too-many-branches
def run(
def run( # noqa:MC0001
self,
value: Any = None,
data: Optional[pd.DataFrame] = None,

Просмотреть файл

@ -106,8 +106,8 @@ class HostSummary(Notebooklet):
_cell_docs = _CELL_DOCS
# pylint: disable=too-many-branches
@set_text(docs=_CELL_DOCS, key="run") # noqa MC0001
def run(
@set_text(docs=_CELL_DOCS, key="run") # noqa: MC0001
def run( # noqa:MC0001
self,
value: Any = None,
data: Optional[pd.DataFrame] = None,
@ -206,7 +206,7 @@ class HostSummary(Notebooklet):
)
if azure_api:
host_entity.AzureDetails["ResourceDetails"] = azure_api[
"resoure_details"
"resource_details"
]
host_entity.AzureDetails["SubscriptionDetails"] = azure_api[
"sub_details"
@ -254,39 +254,49 @@ def _azure_api_details(az_cli, host_record):
)
# Get details of attached disks and network interfaces
disks = [
disk["name"]
for disk in resource_details["properties"]["storageProfile"]["dataDisks"]
disk.get("name")
for disk in resource_details.get("properties", {})
.get("storageProfile", {})
.get("dataDisks", {})
]
network_ints = [
net["id"]
for net in resource_details["properties"]["networkProfile"][
"networkInterfaces"
]
net.get("id")
for net in resource_details.get("properties", {})
.get("networkProfile", {})
.get("networkInterfaces")
]
image = (
str(
resource_details["properties"]["storageProfile"]
resource_details.get("properties", {})
.get("storageProfile", {})
.get("imageReference", {})
.get("offer", {})
)
+ " "
+ str(
resource_details["properties"]["storageProfile"]
resource_details.get("properties", {})
.get("storageProfile", {})
.get("imageReference", {})
.get("sku", {})
)
)
# Extract key details and add host_entity
resource_details = {
"Azure Location": resource_details["location"],
"VM Size": resource_details["properties"]["hardwareProfile"]["vmSize"],
"Azure Location": resource_details.get("location"),
"VM Size": (
resource_details.get("properties", {})
.get("hardwareProfile", {})
.get("vmSize")
),
"Image": image,
"Disks": disks,
"Admin User": resource_details["properties"]["osProfile"]["adminUsername"],
"Admin User": resource_details.get("properties", {})
.get("osProfile", {})
.get("adminUsername"),
"Network Interfaces": network_ints,
"Tags": str(resource_details["tags"]),
"Tags": str(resource_details.get("tags")),
}
return {"resoure_details": resource_details, "sub_details": sub_details}
return {"resource_details": resource_details, "sub_details": sub_details}
except CloudError:
return None

Просмотреть файл

@ -3,8 +3,9 @@ metadata:
description: Host summary
default_options:
- heartbeat: Query Heartbeat table for host information.
- azure_net: 'Query AzureNetworkAnalytics table for host
network topology information.'
- azure_net: '
Query AzureNetworkAnalytics table for host
network topology information.'
- alerts: Query any alerts for the host.
- bookmarks: Query any bookmarks for the host.
- azure_api: Query Azure API for VM information.

Просмотреть файл

@ -181,7 +181,7 @@ class IpAddressSummary(Notebooklet):
# pylint: disable=too-many-branches
@set_text(docs=_CELL_DOCS, key="run") # noqa: MC0001
def run(
def run( # noqa: MC0001
self,
value: Any = None,
data: Optional[pd.DataFrame] = None,

Просмотреть файл

@ -156,7 +156,7 @@ class NetworkFlowSummary(Notebooklet):
# pylint: disable=too-many-branches
@set_text(docs=_CELL_DOCS, key="run") # noqa: MC0001
def run(
def run( # noqa: MC0001
self,
value: Any = None,
data: Optional[pd.DataFrame] = None,

Просмотреть файл

@ -165,7 +165,7 @@ def _read_metadata_file(mod_path):
if not md_path.is_file():
md_path = Path(str(mod_path).replace(".py", ".yml"))
if md_path.is_file():
with open(md_path, "r") as _md_file:
with open(md_path, "r", encoding="utf-8") as _md_file:
return yaml.safe_load(_md_file)
return None

Просмотреть файл

@ -100,7 +100,7 @@ HostNameVerif = namedtuple("HostNameVerif", "host_name, host_type, host_names")
@lru_cache() # noqa:MC0001
def verify_host_name(
def verify_host_name( # noqa: MC0001
qry_prov: QueryProvider, host_name: str, timespan: TimeSpan = None, **kwargs
) -> HostNameVerif:
"""

Просмотреть файл

@ -455,7 +455,7 @@ class Notebooklet(ABC):
def import_cell(cls):
"""Import the text of this module into a new cell."""
if cls.module_path:
with open(cls.module_path, "r") as mod_file:
with open(cls.module_path, "r", encoding="utf-8") as mod_file:
mod_text = mod_file.read()
if mod_text:
# replace relative references with absolute paths
@ -553,7 +553,9 @@ class Notebooklet(ABC):
def get_methods(self) -> Dict[str, Callable[[Any], Any]]:
"""Return methods available for this class."""
meths = inspect.getmembers(self, inspect.ismethod)
cls_selector = f"bound method {self.__class__.__name__.rsplit('.')[0]}"
cls_selector = (
f"bound method {self.__class__.__name__.rsplit('.', maxsplit=1)[0]}"
)
return {
meth[0]: meth[1]
for meth in meths

Просмотреть файл

@ -9,14 +9,14 @@ import re
import setuptools
with open("requirements.txt", "r") as fh:
with open("requirements.txt", "r", encoding="utf-8") as fh:
INSTALL_REQUIRES = fh.readlines()
# pylint: disable=locally-disabled, invalid-name
with open("README.md", "r") as fh:
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("msticnb/_version.py", "r") as fd:
with open("msticnb/_version.py", "r", encoding="utf-8") as fd:
v_match = re.search(r'^VERSION\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE)
__version__ = v_match.group(1) if v_match else "no version"
# pylint: enable=locally-disabled, invalid-name

Просмотреть файл

@ -5,7 +5,7 @@
# --------------------------------------------------------------------------
"""NB metadata test class."""
import pytest_check as check
from msticnb import data_providers
from msticnb import data_providers, nblts
from msticnb.nb_metadata import NBMetadata, read_mod_metadata
from msticnb.nb.azsent.host import host_summary
@ -33,6 +33,8 @@ def test_read_metadata():
def test_class_metadata(monkeypatch):
"""Test class correctly loads yaml metadata."""
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
if "azuredata" in nblts.azsent.host.HostSummary.metadata.req_providers:
nblts.azsent.host.HostSummary.metadata.req_providers.remove("azuredata")
data_providers.init(
query_provider="LocalData", providers=["tilookup", "geolitelookip"]
)

Просмотреть файл

@ -12,7 +12,7 @@ import pytest_check as check
from msticpy.datamodel import entities
from msticpy.datamodel.pivot import Pivot
from msticnb import data_providers
from msticnb import data_providers, nblts
from msticnb.nb_pivot import add_pivot_funcs
from msticnb.notebooklet import NotebookletResult
@ -42,6 +42,8 @@ _EXPECTED_FUNCS = [
def _init_pivot(monkeypatch):
test_data = str(Path(TEST_DATA_PATH).absolute())
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
if "azuredata" in nblts.azsent.host.HostSummary.metadata.req_providers:
nblts.azsent.host.HostSummary.metadata.req_providers.remove("azuredata")
data_providers.init(
query_provider="LocalData",
providers=["geolitelookup"],