* Fixes to URLSummary

* - Added .readthedocs.yaml to specify Python 3.9 build

- Added docs requirements.txt
- Updated make file to regenerate API docs each time make is run
- Corrected some doc strings in __init__.py and data_providers.py
- Adding regenerated API docs

* Restructred URLSummary

* HostSummaryFixes

* Fixing lots of test errors - many due to msticpy changes.

Fixed bokeh 3.x compatibility
Fixed a few random test errors due to moving of init_function
Added mocks for who_is responses to relevant unittests.

* Removing Bokeh from requirements.txt so that we depend entirely on msticpy

Correcting a couple of typos in url_summary
Suppressing some mypy issues due to Bokeh typing changes.

* pinning msticpy version and forcing bokeh < 3.0.0

* Update azure-pipelines.yml for Azure Pipelines

* Updated 2 test cases for failures (calling out to online services)

Fixing calls to ti_lookup - "observable" parameter changed to "ioc" in ip_summary.py, url_summary.py, ti_enrich.py
Removing alert summary code to separate function in host_summary.py

* Fixing pylint in azure-pipelines.yml

Add mock for missed call in test_ip_summary.py

* Missed a mock http call in test-test_ip_summary.py

* Adding mocked TI and Geoiplookup to test_network_flow_summary

---------

Co-authored-by: Pete Bryan <peter.bryan@microsoft.com>
This commit is contained in:
Ian Hellen 2023-03-16 11:50:03 -07:00 коммит произвёл GitHub
Родитель 097df22fff
Коммит b4a748241e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
87 изменённых файлов: 1306 добавлений и 653 удалений

Просмотреть файл

@ -483,4 +483,4 @@ known-third-party=enchant
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception".
overgeneral-exceptions=Exception
overgeneral-exceptions=builtins.Exception

19
.readthedocs.yaml Normal file
Просмотреть файл

@ -0,0 +1,19 @@
version: 2
build:
os: ubuntu-22.04
tools:
python: "3.9"
sphinx:
configuration: docs/source/conf.py
builder: html
fail_on_warning: false
# If using Sphinx, optionally build your docs in additional formats such as PDF
# formats:
# Optionally declare the Python requirements required to build your docs
python:
install:
- requirements: docs/requirements.txt

Просмотреть файл

@ -59,8 +59,8 @@ stages:
continueOnError: true
condition: succeededOrFailed()
- script: |
pip install --upgrade pylint pylint_junit
pylint msticnb --disable=bad-continuation --output-format=pylint_junit.JUnitReporter
pip install --upgrade pylint==2.16.1 pylint_junit
pylint msticnb --disable=bad-continuation --disable=duplicate-code --disable=E1135,E1101,E1133 --output-format=pylint_junit.JUnitReporter
displayName: Pylint
continueOnError: true
condition: succeededOrFailed()

Просмотреть файл

@ -12,6 +12,10 @@ set BUILDDIR=build
if "%1" == "" goto help
REM Re-generate API module docs
sphinx-apidoc --o source --force --module-first --separate ../msticnb
DEL %SOURCE%/modules.rst
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.

17
docs/requirements.txt Normal file
Просмотреть файл

@ -0,0 +1,17 @@
attrs>=18.2.0
cryptography
deprecated>=1.2.4
docutils<0.20.0
httpx>=0.21
ipython >= 7.1.1
jinja2<3.2.0
numpy>=1.15.4
pandas>=1.1.5
python-dateutil>=2.8.1
pytz>=2019.2
pyyaml>=3.13
typing-extensions>=4.2.0
readthedocs-sphinx-ext==2.1.8
seed_intersphinx_mapping
sphinx-rtd-theme==1.0.0
sphinx==5.1.1

Просмотреть файл

@ -9,6 +9,14 @@
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config
# type: ignore
# pylint: disable=invalid-name, missing-module-docstring, import-error
# noqa D100
# flake8: noqa
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
@ -19,6 +27,7 @@
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import os
import re
import sys
sys.path.insert(0, os.path.abspath("../.."))
@ -30,17 +39,21 @@ project = "msticnb"
copyright = "2020, (c) Microsoft Corporation."
author = "Ian Hellen, Pete Bryan"
# The short X.Y version
version = ""
# The full version, including alpha/beta/rc tags
release = "0.2.0"
with open(f"../../{project}/_version.py", "r", encoding="utf-8") as fd:
v_match = re.search(r'^VERSION\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE)
_ver = v_match[1] if v_match else "no version"
# The full version, including alpha/beta/rc tags
release = _ver
# The short X.Y version
version = _ver
# -- General configuration ---------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
needs_sphinx = "5.0"
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
@ -51,10 +64,14 @@ extensions = [
"sphinx.ext.coverage",
"sphinx.ext.githubpages",
"sphinx.ext.napoleon",
"sphinx.ext.intersphinx",
"sphinx.ext.autosectionlabel",
# "sphinx.ext.intersphinx",
# "seed_intersphinx_mapping",
]
intersphinx_mapping = {"msticpy": ("https://msticpy.readthedocs.io/en/latest", None)}
autosectionlabel_prefix_document = True
intersphinx_mapping = {"msticnb": ("https://msticnb.readthedocs.io/en/latest", None)}
# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
@ -73,7 +90,7 @@ master_doc = "index"
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = "en"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
@ -148,9 +165,9 @@ latex_elements: dict = {
latex_documents = [
(
master_doc,
"mstinb.tex",
"mstic Notebooklets Documentation",
"Ian Hellen",
"msticnb.tex",
"MSTIC Notebooklets Documentation",
author,
"manual",
)
]
@ -160,7 +177,7 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, "msticnb", "mstic Notebooklets Documentation", [author], 1)]
man_pages = [(master_doc, project, "MSTIC Notebooklets Documentation", [author], 1)]
# -- Options for Texinfo output ----------------------------------------------
@ -171,10 +188,10 @@ man_pages = [(master_doc, "msticnb", "mstic Notebooklets Documentation", [author
texinfo_documents = [
(
master_doc,
"msticnb",
project,
"mstic Notebooklets Documentation",
author,
"msticnb",
project,
"MSTIC notebooklets Jupyter notebook snippets for InfoSec investigators.",
)
]
@ -206,3 +223,66 @@ autodoc_default_options = {
}
autoclass_content = "both"
autoapi_dirs = ["../../msticnb"]
autodoc_mock_imports = [
"adal",
"azure",
"azure.common.exceptions",
"azure.core.exceptions",
"azure.core.pipeline.policies",
"azure.core.pipeline.transport",
"azure.core.pipeline",
"azure.identity",
"azure.keyvault.secrets",
"azure.keyvault",
"azure.mgmt.compute.models",
"azure.mgmt.compute",
"azure.mgmt.keyvault.models",
"azure.mgmt.keyvault",
"azure.mgmt.monitor",
"azure.mgmt.network",
"azure.mgmt.resource",
"azure.mgmt.resourcegraph",
"azure.mgmt.subscription",
"azure.storage.blob",
"azure.storage",
"bokeh",
"dnspython",
"dns",
"folium",
"geoip2",
"ipwhois",
"IPython",
"ipywidgets",
"keyring",
"Kqlmagic",
"matplotlib.pyplot",
"matplotlib",
"mo-sql-parsing",
"msal",
"msal_extensions",
"msrest",
"msrestazure",
"msrest.authentication",
"nest_asyncio",
"networkx",
"openpyxl",
"passivetotal",
"pygeohash",
"pygments",
"python-dateutil",
"respx",
"scipy",
"seaborn",
"sklearn",
"splunk-sdk",
"splunklib",
"statsmodels",
"sumologic",
"tldextract",
"tqdm",
"vt_graph_api",
"vt",
]

Просмотреть файл

@ -65,11 +65,10 @@ API
---
.. toctree::
:maxdepth: 4
:maxdepth: 3
msticnb-api
msticnb
msticnb.nb
msticnb.nblib
Indices and tables
==================

7
docs/source/modules.rst Normal file
Просмотреть файл

@ -0,0 +1,7 @@
msticnb
=======
.. toctree::
:maxdepth: 4
msticnb

Просмотреть файл

@ -0,0 +1,9 @@
msticnb
=======
.. toctree::
:maxdepth: 2
msticnb
msticnb.nb.azsent
msticnb.nblib

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.class\_doc module
=========================
.. automodule:: msticnb.class_doc
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.common module
=====================
.. automodule:: msticnb.common
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.data\_providers module
==============================
.. automodule:: msticnb.data_providers
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.data\_viewers module
============================
.. automodule:: msticnb.data_viewers
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.account.account\_summary module
=================================================
.. automodule:: msticnb.nb.azsent.account.account_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,16 +1,15 @@
Account notebooklets
====================
msticnb.nb.azsent.account package
=================================
.. autosummary::
msticnb.nb.azsent.account.account_summary
.. automodule:: msticnb.nb.azsent.account
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
msticnb.nb.azsent.account.account\_summary module
-------------------------------------------------
.. toctree::
:maxdepth: 4
.. automodule:: msticnb.nb.azsent.account.account_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.account.account_summary

Просмотреть файл

@ -1,16 +1,15 @@
Alerts notebooklets
===================
msticnb.nb.azsent.alert package
===============================
.. autosummary::
msticnb.nb.azsent.alert.ti_enrich
.. automodule:: msticnb.nb.azsent.alert
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
msticnb.nb.azsent.alert.ti\_enrich module
-----------------------------------------
.. toctree::
:maxdepth: 4
.. automodule:: msticnb.nb.azsent.alert.ti_enrich
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.alert.ti_enrich

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.alert.ti\_enrich module
=========================================
.. automodule:: msticnb.nb.azsent.alert.ti_enrich
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.host.host\_logons\_summary module
===================================================
.. automodule:: msticnb.nb.azsent.host.host_logons_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.host.host\_network\_summary module
====================================================
.. automodule:: msticnb.nb.azsent.host.host_network_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.host.host\_summary module
===========================================
.. automodule:: msticnb.nb.azsent.host.host_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.host.logon\_session\_rarity module
====================================================
.. automodule:: msticnb.nb.azsent.host.logon_session_rarity
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,44 +1,19 @@
Host notebooklets
=================
msticnb.nb.azsent.host package
==============================
.. autosummary::
msticnb.nb.azsent.host.host_logons_summary
msticnb.nb.azsent.host.host_network_summary
msticnb.nb.azsent.host.host_summary
msticnb.nb.azsent.host.win_host_events
.. automodule:: msticnb.nb.azsent.host
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
msticnb.nb.azsent.host.host\_logons\_summary module
---------------------------------------------------
.. automodule:: msticnb.nb.azsent.host.host_logons_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.host.host\_summary module
-------------------------------------------
.. automodule:: msticnb.nb.azsent.host.host_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.host.host\_network_summary module
-------------------------------------------
.. automodule:: msticnb.nb.azsent.host.host_network_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.host.win\_host\_events module
-----------------------------------------------
.. automodule:: msticnb.nb.azsent.host.win_host_events
:members:
:undoc-members:
:show-inheritance:
.. toctree::
:maxdepth: 4
msticnb.nb.azsent.host.host_logons_summary
msticnb.nb.azsent.host.host_network_summary
msticnb.nb.azsent.host.host_summary
msticnb.nb.azsent.host.logon_session_rarity
msticnb.nb.azsent.host.win_host_events

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.host.win\_host\_events module
===============================================
.. automodule:: msticnb.nb.azsent.host.win_host_events
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.network.ip\_summary module
============================================
.. automodule:: msticnb.nb.azsent.network.ip_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.network.network\_flow\_summary module
=======================================================
.. automodule:: msticnb.nb.azsent.network.network_flow_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,25 +1,16 @@
Network notebooklets
====================
msticnb.nb.azsent.network package
=================================
.. autosummary::
msticnb.nb.azsent.network.network_flow_summary
msticnb.nb.azsent.network.ip_summary
.. automodule:: msticnb.nb.azsent.network
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
msticnb.nb.azsent.network.network\_flow\_summary module
-------------------------------------------------------
.. toctree::
:maxdepth: 4
.. automodule:: msticnb.nb.azsent.network.network_flow_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.network.ip\_summary module
-------------------------------------------------------
.. automodule:: msticnb.nb.azsent.network.ip_summary
:members:
:undoc-members:
:show-inheritance:
msticnb.nb.azsent.network.ip_summary
msticnb.nb.azsent.network.network_flow_summary

Просмотреть файл

@ -1,12 +1,16 @@
Azure Sentinel Notebooklets
===========================
msticnb.nb.azsent package
=========================
.. automodule:: msticnb.nb.azsent
:members:
:undoc-members:
:show-inheritance:
Subpackages
-----------
.. toctree::
:maxdepth: 4
:caption: Categories
msticnb.nb.azsent.account
msticnb.nb.azsent.alert

Просмотреть файл

@ -1,17 +1,15 @@
URL notebooklets
=================
msticnb.nb.azsent.url package
=============================
.. autosummary::
msticnb.nb.azsent.url.UrlSummary
Submodules
----------
msticnb.nb.azsent.url.url\_summary module
---------------------------------------------------
.. automodule:: msticnb.nb.azsent.url.url_summary
.. automodule:: msticnb.nb.azsent.url
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
.. toctree::
:maxdepth: 4
msticnb.nb.azsent.url.url_summary

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.azsent.url.url\_summary module
=========================================
.. automodule:: msticnb.nb.azsent.url.url_summary
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,12 +1,16 @@
Notebooklets source documentation
=================================
msticnb.nb package
==================
Categories
----------
.. automodule:: msticnb.nb
:members:
:undoc-members:
:show-inheritance:
Subpackages
-----------
.. toctree::
:maxdepth: 4
msticnb.nb.azsent
msticnb.nb.template

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb.template.nb\_template module
=======================================
.. automodule:: msticnb.nb.template.nb_template
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,14 +1,15 @@
Template notebooklet
====================
msticnb.nb.template package
===========================
Submodules
----------
msticnb.nb.template.nb\_template module
---------------------------------------
.. automodule:: msticnb.nb.template.nb_template
.. automodule:: msticnb.nb.template
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
.. toctree::
:maxdepth: 4
msticnb.nb.template.nb_template

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb\_browser module
==========================
.. automodule:: msticnb.nb_browser
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb\_metadata module
===========================
.. automodule:: msticnb.nb_metadata
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nb\_pivot module
========================
.. automodule:: msticnb.nb_pivot
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nblib.azsent.alert module
=================================
.. automodule:: msticnb.nblib.azsent.alert
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nblib.azsent.host module
================================
.. automodule:: msticnb.nblib.azsent.host
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,21 +1,16 @@
Azure Sentinel library modules
==============================
msticnb.nblib.azsent package
============================
.. automodule:: msticnb.nblib.azsent
:members:
:undoc-members:
:show-inheritance:
Submodules
----------
msticnb.nblib.azsent.host module
--------------------------------
.. toctree::
:maxdepth: 4
.. automodule:: msticnb.nblib.azsent.host
:members:
:undoc-members:
:show-inheritance:
msticnb.nblib.azsent.alert module
---------------------------------
.. automodule:: msticnb.nblib.azsent.alert
:members:
:undoc-members:
:show-inheritance:
msticnb.nblib.azsent.alert
msticnb.nblib.azsent.host

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nblib.entity\_tools module
==================================
.. automodule:: msticnb.nblib.entity_tools
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nblib.iptools module
============================
.. automodule:: msticnb.nblib.iptools
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,7 +1,12 @@
Notebook Common Library modules
===============================
msticnb.nblib package
=====================
Categories
.. automodule:: msticnb.nblib
:members:
:undoc-members:
:show-inheritance:
Subpackages
-----------
.. toctree::
@ -9,10 +14,12 @@ Categories
msticnb.nblib.azsent
msticnb.nblib.iptools module
----------------------------
Submodules
----------
.. automodule:: msticnb.nblib.iptools
:members:
:undoc-members:
:show-inheritance:
.. toctree::
:maxdepth: 4
msticnb.nblib.entity_tools
msticnb.nblib.iptools
msticnb.nblib.ti

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.nblib.ti module
=======================
.. automodule:: msticnb.nblib.ti
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.notebooklet module
==========================
.. automodule:: msticnb.notebooklet
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.notebooklet\_result module
==================================
.. automodule:: msticnb.notebooklet_result
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.options module
======================
.. automodule:: msticnb.options
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.read\_modules module
============================
.. automodule:: msticnb.read_modules
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -1,105 +1,35 @@
Core modules and classes
========================
msticnb package
===============
.. automodule:: msticnb
:members:
:undoc-members:
:show-inheritance:
Subpackages
-----------
.. toctree::
:maxdepth: 4
msticnb.nb
msticnb.nblib
Submodules
----------
.. autosummary::
.. toctree::
:maxdepth: 4
msticnb.class_doc
msticnb.common
msticnb.data_providers
msticnb.data_viewers
msticnb.nb_browser
msticnb.nb_metadata
msticnb.nb_pivot
msticnb.notebooklet
msticnb.notebooklet_result
msticnb.options
msticnb.read_modules
msticnb.class\_doc module
-------------------------
.. automodule:: msticnb.class_doc
:members:
:undoc-members:
:show-inheritance:
msticnb.common module
---------------------
.. automodule:: msticnb.common
:members:
:undoc-members:
:show-inheritance:
msticnb.data\_providers module
------------------------------
.. automodule:: msticnb.data_providers
:members:
:undoc-members:
:show-inheritance:
msticnb.nb\_browser module
--------------------------
.. automodule:: msticnb.nb_browser
:members:
:undoc-members:
:show-inheritance:
msticnb.nb\_metadata module
---------------------------
.. automodule:: msticnb.nb_metadata
:members:
:undoc-members:
:show-inheritance:
msticnb.notebooklet module
--------------------------
.. automodule:: msticnb.notebooklet
:members:
:undoc-members:
:show-inheritance:
msticnb.notebooklet\_result module
----------------------------------
.. automodule:: msticnb.notebooklet_result
:members:
:undoc-members:
:show-inheritance:
msticnb.options module
----------------------
.. automodule:: msticnb.options
:members:
:undoc-members:
:show-inheritance:
msticnb.read\_modules module
----------------------------
.. automodule:: msticnb.read_modules
:members:
:undoc-members:
:show-inheritance:
msticnb.template module
-----------------------
.. automodule:: msticnb.template
:members:
:undoc-members:
:show-inheritance:
msticnb.data\viewers module
---------------------------
.. automodule:: msticnb.data_viewers
:members:
:undoc-members:
:show-inheritance:
msticnb.template

Просмотреть файл

@ -0,0 +1,7 @@
msticnb.template module
=======================
.. automodule:: msticnb.template
:members:
:undoc-members:
:show-inheritance:

Просмотреть файл

@ -7,39 +7,41 @@
msticnb Notebooklets main package.
To start using notebooklets:
>>> import msticnb as nb
>>> # optionally give a query provider nb.init(query_provider=qry_prov)
>>> nb.init()
>>>
>>> # Auto-complete tree of notebooklets
>>> nb.nblts
>>>
>>> # List notebooklets
>>> nb.nb_index
>>>
>>> # Use a notebooklet
>>> host_summary = nb.nblts.azent.host.HostSummary()
>>> host_summary.run();
>>>
>>> # help
>>> help(host_summary)
>>> print("Options:", host_summary.all_options())
>>> print("Settings:", host_summary.get_settings())
>>>
>>> # find a notebooklet
>>> nb.find("host linux azure")
>>>
>>> # Interactive notebook browser
>>> nb.browse()
.. code:: python
>>> import msticnb as nb
>>> # optionally give a query provider nb.init(query_provider=qry_prov)
>>> nb.init()
>>>
>>> # Auto-complete tree of notebooklets
>>> nb.nblts
>>>
>>> # List notebooklets
>>> nb.nb_index
>>>
>>> # Use a notebooklet
>>> host_summary = nb.nblts.azent.host.HostSummary()
>>> host_summary.run();
>>>
>>> # help
>>> help(host_summary)
>>> print("Options:", host_summary.all_options())
>>> print("Settings:", host_summary.get_settings())
>>>
>>> # find a notebooklet
>>> nb.find("host linux azure")
>>>
>>> # Interactive notebook browser
>>> nb.browse()
for more help see https://msticnb.readthedocs.org/
"""
import sys
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from ._version import VERSION
from .data_providers import DataProviders # noqa:F401
from .data_providers import DataProviders, QueryProvider # noqa:F401
from .data_providers import init as dp_init # noqa:F401
from .nb_browser import NBBrowser # noqa:F401
from .nb_pivot import add_pivot_funcs # noqa:F401
@ -54,18 +56,20 @@ browse = NBBrowser
def init(
query_provider: str,
query_provider: Union[str, QueryProvider] = "MSSentinel",
namespace: Optional[Dict[str, Any]] = None,
providers: Optional[List[str]] = None,
**kwargs,
):
"""
Initialize notebooklets dataproviders and pivots.
Initialize notebooklets data providers and pivots.
Parameters
----------
query_provider : str
The default query provider to use with notebooklets
query_provider : Union[str, QueryProvider], optional
DataEnvironment name of the primary query provider,
or an instance of an existing query provider,
by default "MSSentinel"
namespace : Optional[Dict[str, Any]], optional
The global namespace - used to add pivot functions
providers : Optional[List[str]], optional

Просмотреть файл

@ -1,2 +1,2 @@
"""Version file."""
VERSION = "1.0.1"
VERSION = "1.1.0"

Просмотреть файл

@ -62,7 +62,7 @@ def _get_main_class_doc_md(doc_cls) -> str:
cls_doc_lines.extend(fmt_doc_lines)
cls_doc_lines.append("\n---\n")
cls_doc_lines.append("# Display Sections")
cls_doc_lines.append("## Display Sections")
for _, func in inspect.getmembers(doc_cls, inspect.isfunction):
cls_doc_lines.extend(_get_closure_vars(func, doc_cls))
@ -70,7 +70,7 @@ def _get_main_class_doc_md(doc_cls) -> str:
cls_doc_lines.extend(_get_closure_vars(func, doc_cls))
cls_doc_lines.append("\n---\n")
cls_doc_lines.append("# Results Class\n")
cls_doc_lines.append("## Results Class\n")
for cls_name, cls in inspect.getmembers(
inspect.getmodule(doc_cls), inspect.isclass
):
@ -79,10 +79,10 @@ def _get_main_class_doc_md(doc_cls) -> str:
cls_doc_lines.append(_get_result_doc(cls))
break
cls_doc_lines.append("\n---\n")
cls_doc_lines.append("# Methods")
cls_doc_lines.append("## Instance Methods")
cls_doc_lines.append("## Methods")
cls_doc_lines.append("### Instance Methods")
cls_doc_lines.append(_get_class_methods_doc(doc_cls))
cls_doc_lines.append("## Other Methods")
cls_doc_lines.append("### Other Methods")
cls_doc_lines.append(_get_class_func_doc(doc_cls))
return "\n".join(cls_doc_lines)
@ -188,7 +188,7 @@ def _get_class_func_doc(doc_cls: type) -> str:
def _format_func_doc(func_name, func, full_doc=False, prop_set=None):
"""Format function signature."""
func_disp_name = func_name.replace("_", "\\_")
doc_lines = [f"### {func_disp_name}\n"]
doc_lines = [f"#### {func_disp_name}\n"]
if prop_set and func_name in prop_set:
doc_lines.append(f"{func_disp_name} [property]")
else:

Просмотреть файл

@ -8,6 +8,7 @@ import functools
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import bokeh.io
import pandas as pd
from IPython import get_ipython
from IPython.display import HTML, display
from markdown import markdown
@ -133,8 +134,8 @@ def set_text( # noqa: MC0001
hd_level: int = 2,
text: Optional[str] = None,
md: bool = False,
docs: Dict[str, Any] = None,
key: str = None,
docs: Optional[Dict[str, Any]] = None,
key: Optional[str] = None,
):
"""
Decorate function to print title/text before execution.
@ -213,7 +214,7 @@ def add_result(result: Any, attr_name: Union[str, List[str]]):
result : Any
Object that will have result attributes set.
attr_name: str or List[str]
Name of return attribute to set on `result`
Name of return attribute to set on `result`.
Returns
-------
@ -289,3 +290,34 @@ def mp_version():
def check_mp_version(required_version: str) -> bool:
"""Return true if the installed version is >= `required_version`."""
return mp_version().major >= parse_version(required_version).major
def check_current_result(
result, attrib: Optional[str] = None, silent: bool = False
) -> bool:
"""
Check that the result is valid and `attrib` contains data.
Parameters
----------
result: NotebookletResult
The result data to check in.
attrib : str
Name of the attribute to check, if None this function
silent : bool
If True, suppress output.
Returns
-------
bool
Returns True if valid data is available, else False.
"""
if not attrib:
return True
data_obj = getattr(result, attrib)
if data_obj is None or isinstance(data_obj, pd.DataFrame) and data_obj.empty:
if not silent:
nb_markdown(f"No data is available for {attrib}.")
return False
return True

Просмотреть файл

@ -95,7 +95,7 @@ class DataProviders:
def __init__(
self,
query_provider: Union[str, QueryProvider] = "AzureSentinel",
query_provider: Union[str, QueryProvider] = "MSSentinel",
providers: Optional[List[str]] = None,
**kwargs,
):
@ -107,7 +107,7 @@ class DataProviders:
query_provider : Union[str, QueryProvider], optional
DataEnvironment name of the primary query provider,
or an instance of an existing query provider,
by default "AzureSentinel"
by default "MSSentinel"
providers : Optional[List[str]], optional
A list of provider names to load.
You can add additional query providers by including them
@ -432,7 +432,7 @@ class DataProviders:
if not ws_config.config_loaded:
raise MsticnbDataProviderError(
"Could not find valid Azure Sentinel configuration.",
"Could not find valid MS Sentinel configuration.",
"Please ensure configuration files are set correctly or supply",
"azure_sentinel.workspace_id and azure_sentinel.tenant_id",
"arguments to this class.",
@ -441,7 +441,7 @@ class DataProviders:
def init(
query_provider: str = "AzureSentinel",
query_provider: Union[str, QueryProvider] = "MSSentinel",
providers: Optional[List[str]] = None,
**kwargs,
):
@ -450,10 +450,11 @@ def init(
Parameters
----------
query_provider : str, optional
DataEnvironment name of the primary query provider.
By default, "AzureSentinel".
You can add addtional query providers by including them
query_provider : Union[str, QueryProvider], optional
DataEnvironment name of the primary query provider,
or an instance of an existing query provider,
by default "MSSentinel"
You can add additional query providers by including them
in the `providers` list.
providers : Optional[List[str]], optional
A list of provider names, by default None

Просмотреть файл

@ -27,7 +27,7 @@ class DFViewer:
def view_events(
self,
summary_cols: List[str] = None,
summary_cols: Optional[List[str]] = None,
attrib: Optional[str] = None,
data: Optional[pd.DataFrame] = None,
**kwargs,

Просмотреть файл

@ -55,11 +55,8 @@ class AccountType(Flag):
Office365 = auto()
Windows = auto()
Linux = auto()
Azure = (
AzureActiveDirectory
| AzureActivity
| Office365 # pylint:disable=unsupported-binary-operation
)
# pylint:disable=unsupported-binary-operation
Azure = AzureActiveDirectory | AzureActivity | Office365
All = Azure | Windows | Linux # pylint:disable=unsupported-binary-operation
def in_list(self, acct_types: Iterable[Union["AccountType", str]]):
@ -79,6 +76,7 @@ class AccountType(Flag):
# pylint: enable=invalid-name
# pylint: disable=too-few-public-methods, too-many-instance-attributes
class AccountSummaryResult(NotebookletResult):
"""
@ -520,6 +518,7 @@ class AccountSummary(Notebooklet):
# pylint: disable=no-member
# %%
# Account Query functions
def _df_clean(dataframe):
@ -725,7 +724,6 @@ def _create_display_callback(
def _create_account_entity(
account_name, acct_type, acct_activity_dfs, geoip
) -> entities.Account:
if acct_type == AccountType.Windows:
acct_activity_df = acct_activity_dfs[AccountType.Windows]
return _create_win_account_entity(account_name, acct_activity_df, geoip)
@ -944,7 +942,7 @@ def _get_related_alerts_summary(related_alerts: pd.DataFrame):
]
total_alerts = 0
for (name, count) in alert_items.items():
for name, count in alert_items.items():
output.append(f"- {name}, # Alerts: {count}")
total_alerts += count

Просмотреть файл

@ -317,13 +317,9 @@ def _lookup(row, ti_prov, secondary: bool = False):
for entity in row["Entities"]:
try:
if entity["Type"] in ("ipaddress", "ip"):
resp = ti_prov.lookup_ioc(
observable=entity["Address"], prov_scope=prov_scope
)
resp = ti_prov.lookup_ioc(entity["Address"], prov_scope=prov_scope)
elif entity["Type"] == "url":
resp = ti_prov.lookup_ioc(
observable=entity["Url"], prov_scope=prov_scope
)
resp = ti_prov.lookup_ioc(entity["Url"], prov_scope=prov_scope)
else:
resp = None
if resp:

Просмотреть файл

@ -26,12 +26,7 @@ except ImportError:
from msticpy.nbtools.foliummap import FoliumMap
from ...._version import VERSION
from ....common import (
MsticnbMissingParameterError,
nb_data_wait,
nb_print,
set_text,
)
from ....common import MsticnbMissingParameterError, nb_data_wait, nb_print, set_text
from ....nb_metadata import read_mod_metadata
from ....nblib.azsent.host import verify_host_name
from ....notebooklet import NBMetadata, Notebooklet, NotebookletResult
@ -266,10 +261,10 @@ def _gen_timeline(data: pd.DataFrame, silent: bool):
@set_text(docs=_CELL_DOCS, key="show_map")
def _map_logons(data: pd.DataFrame, silent: bool) -> FoliumMap:
"""Produce a map of source IP logon locations."""
map_data = data[data["IpAddress"].isin(["-", "::1", "", "NaN"]) == False] # noqa: E712
map_data = data[~(data["IpAddress"].isin(["-", "::1", "", "NaN"]))]
if not isinstance(map_data, pd.DataFrame) or map_data.empty:
if not silent:
md("No plotable logins avaliable")
md("No plottable logins available")
return None
if not silent:
display(
@ -285,7 +280,7 @@ def _map_logons(data: pd.DataFrame, silent: bool) -> FoliumMap:
@set_text(docs=_CELL_DOCS, key="show_pie")
def _users_pie(data: pd.DataFrame, silent: bool) -> figure:
"""Produce pie chart based on observence of user names in data."""
"""Produce pie chart based on observance of user names in data."""
output_notebook()
user_logons = (
data["Account"]
@ -301,7 +296,7 @@ def _users_pie(data: pd.DataFrame, silent: bool) -> figure:
user_logons["color"] = viridis(len(user_logons))
viz = figure(
plot_height=350,
height=350,
title="20 most prevelent users",
toolbar_location=None,
tools="hover",
@ -369,7 +364,7 @@ def _process_stack_bar(data: pd.DataFrame, silent: bool) -> figure:
viz = figure(
x_range=processes,
plot_height=350,
height=350,
title="Logon Result % by Logon Type",
toolbar_location=None,
tools="hover",
@ -386,11 +381,11 @@ def _process_stack_bar(data: pd.DataFrame, silent: bool) -> figure:
)
viz.y_range.start = 0
viz.x_range.range_padding = 0.1
viz.xgrid.grid_line_color = None
viz.x_range.range_padding = 0.1 # type: ignore[attr-defined]
viz.xgrid.grid_line_color = None # type: ignore[attr-defined]
viz.axis.minor_tick_line_color = None
viz.yaxis.axis_label = "% of logons"
viz.xaxis.axis_label = "Process name"
viz.xaxis.axis_label = "Process name" # type: ignore[assignment]
viz.outline_line_color = None
viz.legend.location = "top_left"
viz.legend.orientation = "horizontal"

Просмотреть файл

@ -12,7 +12,7 @@ from bokeh.models import LayoutDOM
from IPython.display import display
try:
from msticpy.analysis.ip_utils import get_whois_info
from msticpy.context.ip_utils import ip_whois as get_whois_info
from msticpy.vis.foliummap import FoliumMap
except ImportError:
# Fall back to msticpy locations prior to v2.0.0
@ -27,6 +27,7 @@ from ....common import (
MsticnbDataProviderError,
MsticnbMissingParameterError,
nb_data_wait,
nb_markdown,
set_text,
)
from ....nb_metadata import read_mod_metadata, update_class_doc
@ -183,11 +184,6 @@ class HostNetworkSummary(Notebooklet):
)
if isinstance(ti_results, pd.DataFrame) and not ti_results.empty:
result.flow_ti = ti_results_merged
if not self.silent:
md("TI results found in Network Traffic:")
display(ti_results_merged)
else:
md("No results found in TI")
if (
"map" in self.options
@ -195,16 +191,52 @@ class HostNetworkSummary(Notebooklet):
and not result.flows.empty
):
result.flow_map = result.flows.mp_plot.folium_map(ip_column=remote_ip_col)
if not self.silent:
md("Map of remote network locations connected to", "bold")
display(result.flow_map)
if "whois" in self.options:
result.flow_whois = _get_whois_data(result.flows, col=remote_ip_col)
self._last_result = result
if not self.silent:
self._display_results()
return self._last_result
@set_text(docs=_CELL_DOCS, key="flows")
def _display_flows(self):
if self.check_valid_result_data("flow_whois", silent=True):
display(self._last_result.flow_whois)
elif self.check_valid_result_data("flows", silent=True):
display(self._last_result.flows)
else:
nb_markdown("No network flow data found.")
@set_text(docs=_CELL_DOCS, key="ti")
def _display_ti_results(self):
if self.check_valid_result_data("flow_ti", silent=True):
display(self._last_result.flow_ti)
else:
nb_markdown("No Threat Intelligence results found.")
@set_text(docs=_CELL_DOCS, key="map")
def _display_map(self):
if (
self.check_valid_result_data("flows", silent=True)
and self._last_result.flow_map
):
display(self._last_result.flow_map)
@set_text(docs=_CELL_DOCS, key="matrix")
def _display_matrix(self):
if self._last_result.flow_matrix:
display(self._last_result.flow_matrix)
def _display_results(self):
self._display_flows()
self._display_ti_results()
self._display_map()
self._display_matrix()
@lru_cache()
def _get_host_flows(host_name, ip_addr, qry_prov, timespan) -> pd.DataFrame:

Просмотреть файл

@ -22,3 +22,19 @@ output:
This shows a summary of network events for a host.
Depending on what data is avaliable it will use MDE network events, Common Security Logs, or Azure Network Diagnostic Logs.
flows:
title: Host Network Flows
text:
This data shows network flows to and from the host.
ti:
title: Threat Intelligence in Flow Data
text:
These are threat intelligence results based on the flow data for the host.
map:
title: Remote Network Connection Locations
text:
This map shows the locations of remote network connections to and from the host.
matrix:
title: Network Flow Matrix
text:
This plot show the relationshop between the various IP addresses seen in network flow data.

Просмотреть файл

@ -5,12 +5,11 @@
# --------------------------------------------------------------------------
"""Notebooklet for Host Summary."""
from functools import lru_cache
from typing import Any, Dict, Iterable, Optional, Union
from typing import Any, Dict, Iterable, Optional
import pandas as pd
from azure.common.exceptions import CloudError
from bokeh.models import LayoutDOM
from bokeh.plotting.figure import Figure
from IPython.display import display
try:
from msticpy import nbwidgets
@ -21,7 +20,6 @@ except ImportError:
from msticpy.nbtools.nbdisplay import display_timeline
from msticpy.common.timespan import TimeSpan
from msticpy.common.utility import md
from msticpy.datamodel import entities
from ...._version import VERSION
@ -63,8 +61,6 @@ class HostSummaryResult(NotebookletResult):
related_alerts : pd.DataFrame
Pandas DataFrame of any alerts recorded for the host
within the query time span.
alert_timeline:
Bokeh time plot of alerts recorded for host.
related_bookmarks: pd.DataFrame
Pandas DataFrame of any investigation bookmarks
relating to the host.
@ -96,7 +92,6 @@ class HostSummaryResult(NotebookletResult):
super().__init__(description, timespan, notebooklet)
self.host_entity: entities.Host = None
self.related_alerts: Optional[pd.DataFrame] = None
self.alert_timeline: Union[LayoutDOM, Figure] = None
self.related_bookmarks: Optional[pd.DataFrame] = None
self.summary: Optional[pd.DataFrame] = None
self.scheduled_tasks: Optional[pd.DataFrame] = None
@ -129,7 +124,6 @@ class HostSummary(Notebooklet):
super().__init__(*args, **kwargs)
# pylint: disable=too-many-branches, too-many-statements
@set_text(docs=_CELL_DOCS, key="run") # noqa: MC0001
def run( # noqa:MC0001
self,
value: Any = None,
@ -195,7 +189,7 @@ class HostSummary(Notebooklet):
host_verif = verify_host_name(self.query_provider, value, self.timespan)
if host_verif.host_names:
md(f"Could not obtain unique host name from {value}. Aborting.")
nb_markdown(f"Could not obtain unique host name from {value}. Aborting.")
self._last_result = result
return self._last_result
if not host_verif.host_name:
@ -204,17 +198,17 @@ class HostSummary(Notebooklet):
+ "Results may be unreliable.",
"orange",
)
host_name = value
self.host_name = value
else:
host_name = host_verif.host_name
self.host_name = host_verif.host_name
host_entity = entities.Host(HostName=host_name)
host_entity = entities.Host(HostName=self.host_name)
if "heartbeat" in self.options:
host_entity = get_heartbeat(self.query_provider, host_name)
host_entity = get_heartbeat(self.query_provider, self.host_name)
if "azure_net" in self.options:
host_entity = host_entity or entities.Host(HostName=host_name)
host_entity = host_entity or entities.Host(HostName=self.host_name)
get_aznet_topology(
self.query_provider, host_entity=host_entity, host_name=host_name
self.query_provider, host_entity=host_entity, host_name=self.host_name
)
# If azure_details flag is set, an encrichment provider is given,
# and the resource is an Azure host get resource details from Azure API
@ -237,14 +231,10 @@ class HostSummary(Notebooklet):
result.host_entity = host_entity
if not self.silent:
_show_host_entity(host_entity)
if "alerts" in self.options:
related_alerts = _get_related_alerts(
self.query_provider, self.timespan, host_name
self.query_provider, self.timespan, self.host_name
)
if len(related_alerts) > 0:
result.alert_timeline = _show_alert_timeline(related_alerts)
result.related_alerts = related_alerts
if "bookmarks" in self.options:
@ -308,22 +298,111 @@ class HostSummary(Notebooklet):
)
self._last_result = result
if not self.silent:
self._display_output()
return self._last_result
@set_text(docs=_CELL_DOCS, key="show_host_entity")
def _display_entity(self):
"""Display the host_entity output."""
if self.check_valid_result_data("host_entity", silent=True):
nb_print(self._last_result.host_entity)
@set_text(docs=_CELL_DOCS, key="run")
def _display_summary(self):
"""Display the summary output."""
if self.check_valid_result_data("summary", silent=True):
display(self._last_result.summary)
@set_text(docs=_CELL_DOCS, key="show_bookmarks")
def _display_bookmarks(self):
"""Display the bookmarks related to the host."""
if self.check_valid_result_data("related_bookmarks", silent=True):
display(self._last_result.related_bookmarks)
else:
nb_markdown(f"No Bookmarks related to {self.host_name}")
@set_text(docs=_CELL_DOCS, key="show_scheduled_tasks")
def _display_scheduled_tasks(self):
"""Display the scheduled_tasks related to the host."""
if self.check_valid_result_data("scheduled_tasks", silent=True):
display(self._last_result.scheduled_tasks)
else:
nb_markdown(f"No scheduled tasks related to {self.host_name}")
@set_text(docs=_CELL_DOCS, key="show_account_actions")
def _display_account_actions(self):
"""Display the account_actions related to the host."""
if self.check_valid_result_data("account_actions", silent=True):
display(self._last_result.account_actions)
else:
nb_markdown(f"No account actions related to {self.host_name}")
@set_text(docs=_CELL_DOCS, key="show_notable_events")
def _display_notable_events(self):
"""Display the notable_events related to the host."""
if self.check_valid_result_data("notable_events", silent=True):
display(self._last_result.notable_events)
else:
nb_markdown(f"No notable events related to {self.host_name}")
@set_text(docs=_CELL_DOCS, key="show_processes")
def _display_processes(self):
"""Display the processes related to the host."""
if self.check_valid_result_data("processes", silent=True):
nb_print(self._last_result.processes)
else:
nb_markdown(f"No processes related to {self.host_name}")
@set_text(docs=_CELL_DOCS, key="show_process_ti")
def _display_process_ti(self):
"""Display the processes related to the host."""
if self.check_valid_result_data("process_ti", silent=True):
nb_print(self._last_result.process_ti)
else:
nb_markdown(f"No TI found in process data related to {self.host_name}")
def _display_output(self):
"""Display all notebooklet sections."""
self._display_entity()
self._display_summary()
self._display_bookmarks()
self._display_scheduled_tasks()
self._display_account_actions()
self._display_notable_events()
self._display_process_ti()
def display_process_tree(self):
"""Diplay a process tree from process data."""
if self.check_valid_result_data("processes", silent=True):
self._last_result.processes.mp_plot.process_tree()
def browse_alerts(self) -> nbwidgets.SelectAlert:
"""Return alert browser/viewer."""
if self.check_valid_result_data("related_alerts"):
if self.check_valid_result_data("related_alerts", silent=True):
return browse_alerts(self._last_result)
return None
def display_alert_timeline(self):
"""Display the alert timeline."""
if self.check_valid_result_data("related_alerts"):
if self.check_valid_result_data("related_alerts", silent=True):
if len(self._last_result.related_alerts) > 1:
return _show_alert_timeline(self._last_result.related_alerts)
print("Cannot plot timeline with 0 or 1 event.")
return None
def display_alert_summary(self):
"""Display summarized view of alerts grouped by AlertName."""
if self.check_valid_result_data("related_alerts", silent=True):
return (
self.related_alerts[["AlertName", "TimeGenerated"]]
.groupby("AlertName")
.TimeGenerated.agg("count")
)
return None
def _process_ti(data, col, ti_prov) -> Optional[pd.DataFrame]:
extracted_iocs = extract_iocs(data, col, True)
@ -477,33 +556,11 @@ def _azure_api_details(az_cli, host_record):
return None
# %%
# Get IP Information from Heartbeat
@set_text(docs=_CELL_DOCS, key="show_host_entity")
def _show_host_entity(host_entity):
nb_print(host_entity)
# %%
# Get related alerts
@lru_cache()
def _get_related_alerts(qry_prov, timespan, host_name):
related_alerts = qry_prov.SecurityAlert.list_related_alerts(
timespan, host_name=host_name
)
if not related_alerts.empty:
host_alert_items = (
related_alerts[["AlertName", "TimeGenerated"]]
.groupby("AlertName")
.TimeGenerated.agg("count")
)
nb_markdown(
f"Found {len(related_alerts)} related alerts ({len(host_alert_items)}) types"
)
else:
nb_markdown("No related alerts found.")
return related_alerts
return qry_prov.SecurityAlert.list_related_alerts(timespan, host_name=host_name)
@set_text(docs=_CELL_DOCS, key="show_alert_timeline")
@ -525,12 +582,6 @@ def _show_alert_timeline(related_alerts):
@lru_cache()
def _get_related_bookmarks(qry_prov, timespan, host_name):
nb_data_wait("Bookmarks")
host_bkmks = qry_prov.AzureSentinel.list_bookmarks_for_entity(
return qry_prov.AzureSentinel.list_bookmarks_for_entity(
timespan, entity_id=f"'{host_name}'"
)
if not host_bkmks.empty:
nb_markdown(f"{len(host_bkmks)} investigation bookmarks found for this host.")
else:
nb_markdown("No bookmarks found.")
return host_bkmks

Просмотреть файл

@ -54,5 +54,27 @@ output:
text:
Each marker on the timeline indicates one or more alerts related to the
host.
show_bookmarks:
title: Related Bookmarks
text:
Bookmarks related to the host.
show_scheduled_tasks:
title: Scheduled Tasks
text:
These are the scheduled tasks observed being created and run on the host.
show_account_actions:
title: Account Actions
text:
These are actions observed on the host where accounts have been created or modified.
show_notable_events:
title: Noteable Host Events
text:
These are noteable events on the host, either in the type of event or severity.
show_process_ti:
title: Process Threat Intelligence
text:
This shows Threat Intelligence results from indicators present in Command Line activty on the host.
show_processes:
title: Host Processes
text:
Process execution events from the host.

Просмотреть файл

@ -11,7 +11,6 @@ from typing import Any, Dict, Iterable, Optional, Union
import numpy as np
import pandas as pd
from bokeh.models import LayoutDOM
from bokeh.plotting.figure import Figure
from defusedxml import ElementTree
from defusedxml.ElementTree import ParseError
from IPython.display import display
@ -61,7 +60,7 @@ class WinHostEventsResult(NotebookletResult):
acct_pivot : pd.DataFrame
DataFrame that is a pivot table of event ID
vs. Account of account management events
account_timeline : Union[Figure, LayoutDOM]
account_timeline : Optional[LayoutDOM]
Bokeh plot figure or Layout showing the account events on an
interactive timeline.
expanded_events : pd.DataFrame
@ -95,7 +94,7 @@ class WinHostEventsResult(NotebookletResult):
self.event_pivot: Optional[pd.DataFrame] = None
self.account_events: Optional[pd.DataFrame] = None
self.account_pivot: Optional[pd.DataFrame] = None
self.account_timeline: Union[Figure, LayoutDOM] = None
self.account_timeline: Optional[LayoutDOM] = None
self.expanded_events: Optional[pd.DataFrame] = None

Просмотреть файл

@ -12,14 +12,15 @@ from typing import Any, Dict, Iterable, List, Optional, Union
import numpy as np
import pandas as pd
from bokeh.plotting.figure import Figure
from bokeh.models import LayoutDOM
from msticpy.common.exceptions import MsticpyException
from msticpy.common.timespan import TimeSpan
from msticpy.datamodel.entities import GeoLocation, Host, IpAddress
try:
from msticpy import nbwidgets
from msticpy.context.ip_utils import get_ip_type, get_whois_info
from msticpy.context.ip_utils import get_ip_type
from msticpy.context.ip_utils import ip_whois as get_whois_info
from msticpy.vis.timeline import display_timeline
except ImportError:
# Fall back to msticpy locations prior to v2.0.0
@ -91,7 +92,7 @@ class IpSummaryResult(NotebookletResult):
VMComputer latest record
az_network_flows : pd.DataFrame
Azure NSG flows for IP, if available
az_network_flows_timeline: Figure
az_network_flows_timeline: LayoutDOM
Azure NSG flows timeline, if data is available
aad_signins : pd.DataFrame = None
AAD signin activity
@ -105,7 +106,7 @@ class IpSummaryResult(NotebookletResult):
Common Security Log entries for source IP
related_bookmarks : pd.DataFrame
Bookmarks related to IP Address
alert_timeline : Figure
alert_timeline : LayoutDOM
Timeline plot of alerts
ti_results: pd.DataFrame
Threat intel lookup results
@ -164,7 +165,7 @@ class IpSummaryResult(NotebookletResult):
self.vmcomputer: Optional[pd.DataFrame] = None
self.az_network_flows: Optional[pd.DataFrame] = None
self.az_network_flow_summary: Optional[pd.DataFrame] = None
self.az_network_flows_timeline: Figure = None
self.az_network_flows_timeline: Optional[LayoutDOM] = None
self.aad_signins: Optional[pd.DataFrame] = None
self.azure_activity: Optional[pd.DataFrame] = None
self.azure_activity_summary: Optional[pd.DataFrame] = None
@ -172,7 +173,7 @@ class IpSummaryResult(NotebookletResult):
self.common_security: Optional[pd.DataFrame] = None
self.related_alerts: Optional[pd.DataFrame] = None
self.related_bookmarks: Optional[pd.DataFrame] = None
self.alert_timeline: Figure = None
self.alert_timeline: Optional[LayoutDOM] = None
self.ti_results: Optional[pd.DataFrame] = None
self.passive_dns: Optional[pd.DataFrame] = None
self.host_logons: Optional[pd.DataFrame] = None
@ -383,7 +384,7 @@ class IpAddressSummary(Notebooklet):
def netflow_by_protocol(
self,
) -> Figure:
) -> Optional[LayoutDOM]:
"""Display netflows grouped by protocol."""
if not self.check_valid_result_data("az_network_flows"):
return None
@ -391,7 +392,7 @@ class IpAddressSummary(Notebooklet):
def netflow_total_by_protocol(
self,
) -> Figure:
) -> Optional[LayoutDOM]:
"""Display netflows grouped by protocol."""
if not self.check_valid_result_data("az_network_flows"):
return None
@ -399,7 +400,7 @@ class IpAddressSummary(Notebooklet):
def netflow_by_direction(
self,
) -> Figure:
) -> Optional[LayoutDOM]:
"""Display netflows grouped by direction."""
if not self.check_valid_result_data("az_network_flows"):
return None
@ -991,7 +992,7 @@ def _get_ti_data(ti_lookup, src_ip, result):
nb_data_wait("Threat Intel")
if not ti_lookup:
return
ti_results = ti_lookup.lookup_ioc(observable=src_ip)
ti_results = ti_lookup.lookup_ioc(src_ip)
result.ti_results = ti_lookup.result_to_df(ti_results)
warn_ti_res = len(result.ti_results.query("Severity != 'information'"))
if warn_ti_res:
@ -1022,7 +1023,7 @@ def _get_passv_dns(ti_lookup, src_ip, result):
return
with suppress(MsticpyException):
passv_dns = ti_lookup.lookup_ioc(
observable=src_ip,
src_ip,
ioc_type="ipv4" if isinstance(ip_class, IPv4Address) else "ipv6",
ioc_query_type="passivedns",
)

Просмотреть файл

@ -6,18 +6,18 @@
"""Notebooklet for Network Flow Summary."""
from ipaddress import ip_address
from itertools import chain
from typing import Any, Dict, Iterable, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import numpy as np
import pandas as pd
from bokeh.plotting.figure import Figure
from bokeh.models import LayoutDOM
from IPython.display import display
from msticpy.common.timespan import TimeSpan
from msticpy.datamodel import entities
try:
from msticpy import nbwidgets
from msticpy.context.ip_utils import get_ip_type, get_whois_df, get_whois_info
from msticpy.context.ip_utils import get_ip_type, get_whois_df
from msticpy.context.ip_utils import ip_whois as get_whois_info
from msticpy.context.tiproviders.ti_provider_base import ResultSeverity
from msticpy.vis import foliummap
from msticpy.vis.timeline import display_timeline, display_timeline_values
@ -64,11 +64,11 @@ class NetworkFlowResult(NotebookletResult):
type of host, not all of this data may be populated.
network_flows : pd.DataFrame
The raw network flows recorded for this host.
plot_flows_by_protocol : Figure
plot_flows_by_protocol : LayoutDOM
Bokeh timeline plot of flow events by protocol.
plot_flows_by_direction : Figure
plot_flows_by_direction : LayoutDOM
Bokeh timeline plot of flow events by direction (in/out).
plot_flow_values : Figure
plot_flow_values : LayoutDOM
Bokeh values plot of flow events by protocol.
flow_index : pd.DataFrame
Summarized DataFrame of flows
@ -108,9 +108,9 @@ class NetworkFlowResult(NotebookletResult):
self.description: str = "Network flow results"
self.host_entity: entities.Host = None
self.network_flows: Optional[pd.DataFrame] = None
self.plot_flows_by_protocol: Figure = None
self.plot_flows_by_direction: Figure = None
self.plot_flow_values: Figure = None
self.plot_flows_by_protocol: Optional[LayoutDOM] = None
self.plot_flows_by_direction: Optional[LayoutDOM] = None
self.plot_flow_values: Optional[LayoutDOM] = None
self.flow_index: Optional[pd.DataFrame] = None
self.flow_index_data: Optional[pd.DataFrame] = None
self.flow_summary: Optional[pd.DataFrame] = None
@ -596,17 +596,20 @@ def _get_source_host_asns(host_entity):
@set_text(docs=_CELL_DOCS, key="select_asn_subset")
def _select_asn_subset(flow_sum_df, host_entity):
our_host_asns = _get_source_host_asns(host_entity)
all_asns = list(flow_sum_df["DestASN"].unique()) + list(
flow_sum_df["SourceASN"].unique()
)
all_asns = set(all_asns) - set(["private address"])
all_asns: List[str] = []
other_asns: List[str] = []
# Select the ASNs in the 25th percentile (lowest number of flows)
quant_25pc = flow_sum_df["TotalAllowedFlows"].quantile(q=[0.25]).iat[0]
quant_25pc_df = flow_sum_df[flow_sum_df["TotalAllowedFlows"] <= quant_25pc]
other_asns = list(quant_25pc_df["DestASN"].unique()) + list(
quant_25pc_df["SourceASN"].unique()
)
if "DestASN" in flow_sum_df.columns:
all_asns.extend(flow_sum_df["DestASN"].unique())
other_asns.extend(quant_25pc_df["DestASN"].unique())
if "SourceASN" in flow_sum_df.columns:
all_asns.extend(flow_sum_df["SourceASN"].unique())
other_asns.extend(quant_25pc_df["SourceASN"].unique())
all_asns = set(all_asns) - {"private address"}
other_asns = set(other_asns) - set(our_host_asns)
return nbwidgets.SelectSubset(source_items=all_asns, default_selected=other_asns)
@ -614,20 +617,24 @@ def _select_asn_subset(flow_sum_df, host_entity):
# %%
# Lookup ASN IPs with TILookup
def _get_ips_from_selected_asn(flow_sum_df, select_asn):
dest_ips = set(
chain.from_iterable(
flow_sum_df[flow_sum_df["DestASN"].isin(select_asn.selected_items)][
"dest_ips"
]
dest_ips: Set[str] = set()
src_ips: Set[str] = set()
if "DestASN" in flow_sum_df.columns:
dest_ips = set(
chain.from_iterable(
flow_sum_df[flow_sum_df["DestASN"].isin(select_asn.selected_items)][
"dest_ips"
]
)
)
)
src_ips = set(
chain.from_iterable(
flow_sum_df[flow_sum_df["SourceASN"].isin(select_asn.selected_items)][
"source_ips"
]
if "SourceASN" in flow_sum_df.columns:
src_ips = set(
chain.from_iterable(
flow_sum_df[flow_sum_df["SourceASN"].isin(select_asn.selected_items)][
"source_ips"
]
)
)
)
selected_ips = dest_ips | src_ips
nb_markdown(f"{len(selected_ips)} unique IPs in selected ASNs")
return selected_ips
@ -645,6 +652,8 @@ def _lookup_ip_ti(flows_df, ti_lookup, selected_ips):
ti_results = ti_lookup.lookup_iocs(data=selected_ip_dict)
nb_markdown(f"{len(ti_results)} TI results received.")
if ti_results.empty:
return pd.DataFrame(columns=["Ioc"])
ti_results_pos = ti_results[ti_check_ser_sev(ti_results["Severity"], 1)]
nb_markdown(f"{len(ti_results_pos)} positive results found.")
@ -736,34 +745,36 @@ def _display_geo_map(flow_index, ip_locator, host_entity, ti_results, select_asn
nb_markdown("No network flow data available.")
return None
ips_in: List[str] = []
ips_out: List[str] = []
# Get the flow records for all flows not in the TI results
selected_out = flow_index[flow_index["DestASN"].isin(select_asn.selected_items)]
selected_in = flow_index[flow_index["SourceASN"].isin(select_asn.selected_items)]
sel_out_exp = _list_to_rows(selected_out, "dest_ips")
sel_in_exp = _list_to_rows(selected_in, "source_ips")
sel_out_exp = sel_out_exp[~sel_out_exp["dest_ips"].isin(ti_results["Ioc"])]
sel_in_exp = sel_in_exp[~sel_in_exp["source_ips"].isin(ti_results["Ioc"])]
if "DestASN" in flow_index.columns:
selected_out = flow_index[flow_index["DestASN"].isin(select_asn.selected_items)]
sel_out_exp = selected_out.explode("dest_ips")
sel_out_exp = sel_out_exp[~sel_out_exp["dest_ips"].isin(ti_results["Ioc"])]
if sel_out_exp.empty:
ips_out = []
else:
nb_data_wait("IP Geolocation")
ips_out = list(
sel_out_exp.apply(
lambda x: _format_ip_entity(ip_locator, x, "dest_ips"), axis=1
if not sel_out_exp.empty:
nb_data_wait("IP Geolocation")
ips_out = list(
sel_out_exp.apply(
lambda x: _format_ip_entity(ip_locator, x, "dest_ips"), axis=1
)
)
)
if sel_in_exp.empty:
ips_in = []
else:
nb_data_wait("IP Geolocation")
ips_in = list(
sel_in_exp.apply(
lambda x: _format_ip_entity(ip_locator, x, "source_ips"), axis=1
if "SourceASN" in flow_index.columns:
selected_in = flow_index[
flow_index["SourceASN"].isin(select_asn.selected_items)
]
sel_in_exp = selected_in.explode("source_ips")
sel_in_exp = sel_in_exp[~sel_in_exp["source_ips"].isin(ti_results["Ioc"])]
if not sel_in_exp.empty:
nb_data_wait("IP Geolocation")
ips_in = list(
sel_in_exp.apply(
lambda x: _format_ip_entity(ip_locator, x, "source_ips"), axis=1
)
)
)
icon_props = {"color": "green"}
host_ips = getattr(host_entity, "PublicIpAddresses", [])
@ -787,18 +798,3 @@ def _display_geo_map(flow_index, ip_locator, host_entity, ti_results, select_asn
folium_map.center_map()
return folium_map
def _list_to_rows(data, col):
orig_cols = data.columns
item_col = f"{col}_list_item$$"
ren_col = {item_col: col}
return (
pd.DataFrame(data[col].to_list())
.replace([None], np.nan) # convert any Nones to NaN
.merge(data, right_index=True, left_index=True)
.melt(id_vars=orig_cols, value_name=item_col)
.dropna(subset=[item_col]) # get rid of rows with NaNs in this col
.drop([col, "variable"], axis=1)
.rename(columns=ren_col)
)

Просмотреть файл

@ -5,6 +5,7 @@
# --------------------------------------------------------------------------
"""Notebooklet for URL Summary."""
from collections import Counter
from os.path import exists
from typing import Any, Dict, Iterable, List, Optional
import dns.resolver
@ -33,7 +34,6 @@ from ...._version import VERSION
from ....common import (
MsticnbDataProviderError,
MsticnbMissingParameterError,
nb_data_wait,
nb_markdown,
set_text,
)
@ -51,7 +51,7 @@ _CELL_DOCS: Dict[str, Any]
_CLS_METADATA, _CELL_DOCS = read_mod_metadata(__file__, __name__)
# pylint: disable=too-few-public-methods
# pylint: disable=too-few-public-methods, too-many-instance-attributes
class URLSummaryResult(NotebookletResult):
"""URL Summary Results."""
@ -73,6 +73,7 @@ class URLSummaryResult(NotebookletResult):
self.hosts: Optional[List] = None
self.flows: Optional[pd.DataFrame] = None
self.flow_graph: Optional[LayoutDOM] = None
self.ti_results: Optional[pd.DataFrame] = None
# pylint: disable=too-few-public-methods
@ -156,112 +157,61 @@ class URLSummary(Notebooklet):
result = URLSummaryResult(
notebooklet=self, description=self.metadata.description, timespan=timespan
)
if not self._last_result:
self._last_result = result
url = value.strip().lower()
_, domain, tld = tldextract.extract(url)
self.url = value.strip().lower()
_, domain, tld = tldextract.extract(self.url)
domain = f"{domain.lower()}.{tld.lower()}"
domain_validator = DomainValidator()
validated = domain_validator.validate_tld(domain)
result.summary = pd.DataFrame(
{"URL": [url], "Domain": [domain], "Validated TLD": [validated]}
{"URL": [self.url], "Domain": [domain], "Validated TLD": [validated]}
)
if not self.silent:
nb_markdown(f"Summary of {url}:")
display(result.summary)
if "ti" in self.options:
if "tilookup" in self.data_providers.providers:
ti_prov = self.data_providers.providers["tilookup"]
else:
raise MsticnbDataProviderError("No TI providers avaliable")
nb_data_wait("Threat Intelligence Results")
ti_results, ti_results_merged = get_ti_results(
ti_prov, result.summary, "URL"
)
if isinstance(ti_results, pd.DataFrame) and not ti_results.empty:
result.summary = ti_results_merged
if not self.silent:
nb_markdown(f"Threat Intelligence Results for {url}.")
display(ti_results_merged)
result.ti_results = ti_results_merged
if "whois" in self.options:
result.domain_record = _domain_whois_record(
domain, self.data_providers.providers["tilookup"]
)
if (
not self.silent
and isinstance(result, pd.DataFrame)
and not result.domain_record.empty # type: ignore
):
nb_markdown(f"WhoIs Results for {url}.")
display(
result.domain_record.T.style.applymap( # type: ignore
color_domain_record_cells,
subset=pd.IndexSlice[["Page Rank", "Domain Name Entropy"], 0],
)
)
if "cert" in self.options:
result.cert_details = _get_tls_cert_details(url, domain_validator)
if not self.silent:
if (
isinstance(result.cert_details, pd.DataFrame)
and not result.cert_details.empty
):
nb_markdown(f"TLS Certificate Details for {url}.")
display(result.cert_details)
else:
print("No TLS certificate found.")
result.cert_details = _get_tls_cert_details(self.url, domain_validator)
if "ip_record" in self.options:
result.ip_record = None
result.ip_record = _get_ip_record(
domain, domain_validator, self.data_providers.providers["tilookup"]
)
if not self.silent:
if (
isinstance(result.ip_record, pd.DataFrame)
and not result.ip_record.empty
):
nb_markdown(f"IP Address Details for {url}.")
display(result.ip_record.T)
else:
print("No current IP found.")
if "screenshot" in self.options:
image_data = screenshot(url)
image_data = screenshot(self.url)
with open("screenshot.png", "wb") as screenshot_file:
screenshot_file.write(image_data.content)
if not self.silent:
nb_markdown(f"Screenshot of {url}")
display(Image(filename="screenshot.png"))
if "alerts" in self.options:
alerts = self.query_provider.SecurityAlert.list_alerts(timespan)
result.related_alerts = alerts[
alerts["Entities"].str.contains(url, case=False)
alerts["Entities"].str.contains(self.url, case=False)
]
if (
not self.silent
and isinstance(result, pd.DataFrame)
and not result.related_alerts.empty # type: ignore
):
nb_markdown(f"Alerts related to {url}")
display(result.related_alerts)
if "bookmarks" in self.options:
result.bookmarks = (
self.query_provider.AzureSentinel.list_bookmarks_for_entity(
url=url, start=timespan.start, end=timespan.end
url=self.url, start=timespan.start, end=timespan.end
)
)
if (
not self.silent
and isinstance(result, pd.DataFrame)
and not result.bookmarks.empty # type: ignore
):
nb_markdown(f"Bookmarks related to {url}")
display(result.bookmarks)
if "dns" in self.options:
result.dns_results = (
@ -269,33 +219,27 @@ class URLSummary(Notebooklet):
domain=domain, start=timespan.start, end=timespan.end
)
)
if not self.silent and not result.dns_results.empty: # type: ignore
nb_markdown(f"DNS events related to {url}")
display(result.dns_results)
if "hosts" in self.options:
syslog_hosts = self.query_provider.LinuxSyslog.all_syslog(
add_query_items=f"| where SyslogMessage has '{url}'",
add_query_items=f"| where SyslogMessage has '{self.url}'",
start=timespan.start,
end=timespan.end,
)["Computer"].unique()
mde_hosts = self.query_provider.MDE.host_connections(
time_column="TimeGenerated",
host_name="",
add_query_items=f"| where RemoteUrl has '{url}'",
add_query_items=f"| where RemoteUrl has '{self.url}'",
start=timespan.start,
end=timespan.end,
)["DeviceName"].unique()
windows_hosts = self.query_provider.WindowsSecurity.list_events(
add_query_items=f"| where CommandLine has '{url}'",
add_query_items=f"| where CommandLine has '{self.url}'",
start=timespan.start,
end=timespan.end,
)["Computer"].unique()
all_hosts = list(syslog_hosts) + list(mde_hosts) + list(windows_hosts)
result.hosts = all_hosts
if not self.silent:
nb_markdown(f"Hosts connecting to {url}")
display(result.hosts)
if "flows" in self.options:
result.flows = self.query_provider.Network.network_connections_to_url(
@ -310,16 +254,111 @@ class URLSummary(Notebooklet):
result.flow_graph = display_timeline_values(
flow_graph_data,
value_col="sum_SentBytes",
title=f"Network traffic volume to {url}",
title=f"Network traffic volume to {self.url}",
)
if not self.silent:
display(result.flow_graph)
nb_markdown(f"Network connections to {url}")
display(result.flows)
self._last_result = result
if not self.silent:
self._display_results()
return self._last_result
@set_text(docs=_CELL_DOCS, key="display_summary")
def _display_summary(self):
"""Display URL summary."""
if self.check_valid_result_data("summary", silent=True):
display(self._last_result.summary)
@set_text(docs=_CELL_DOCS, key="show_ti_details")
def _display_ti_data(self):
"""Display TI results."""
if self.check_valid_result_data("ti_results", silent=True):
display(self._last_result.ti_results)
else:
nb_markdown(f"No TI results found for {self.url}")
@set_text(docs=_CELL_DOCS, key="show_domain_record")
def _display_domain_record(self):
"""Display Domain Record."""
if self.check_valid_result_data("domain_record", silent=True):
display(
self._last_result.domain_record.T.style.applymap( # type: ignore
color_domain_record_cells,
subset=pd.IndexSlice[["Page Rank", "Domain Name Entropy"], 0],
)
)
@set_text(docs=_CELL_DOCS, key="show_TLS_cert")
def _display_cert_details(self):
"""Display TLS Certificate details."""
if self.check_valid_result_data("cert_details", silent=True):
display(self._last_result.cert_details)
else:
nb_markdown(f"No TLS certificate found for {self.url}.")
@set_text(docs=_CELL_DOCS, key="show_IP_record")
def _display_ip_record(self):
"""Display IP record."""
if self.check_valid_result_data("ip_record", silent=True):
display(self._last_result.ip_record.T)
else:
nb_markdown(f"No current IP found for {self.url}.")
@set_text(docs=_CELL_DOCS, key="show_screenshot")
def _display_screenshot(self):
"""Display ULR screenshot."""
if exists("screenshot.png"):
nb_markdown(f"Screenshot of {self.url}")
display(Image(filename="screenshot.png"))
@set_text(docs=_CELL_DOCS, key="show_related_alerts")
def _display_related_alerts(self):
"""Display related alerts in table."""
if self.check_valid_result_data("related_alerts", silent=True):
display(self._last_result.related_alerts)
else:
nb_markdown(f"No Alerts related to {self.url}")
@set_text(docs=_CELL_DOCS, key="show_bookmarks")
def _display_bookmarks(self):
"""Display bookmarks related to URL."""
if self.check_valid_result_data("bookmarks", silent=True):
display(self._last_result.bookmarks)
else:
nb_markdown(f"No Bookmarks related to {self.url}")
@set_text(docs=_CELL_DOCS, key="show_dns_results")
def _display_dns_results(self):
"""Display DNS resolutions for URL."""
if self.check_valid_result_data("dns_results", silent=True):
nb_markdown(f"DNS events related to {self.url}", "bold")
display(self._last_result.dns_results)
else:
nb_markdown(f"No DNS resolutions found for {self.url}")
@set_text(docs=_CELL_DOCS, key="show_hosts")
def _display_hosts(self):
"""Display list of hosts connecting to URL."""
if (
self.check_valid_result_data("hosts", silent=True)
and self._last_result.hosts
):
nb_markdown(f"Hosts connecting to {self.url}", "bold")
display(self._last_result.hosts)
else:
nb_markdown(f"No hosts found connecting to {self.url}")
@set_text(docs=_CELL_DOCS, key="show_flows")
def _display_flows(self):
"""Display network flow data for URL."""
if self.check_valid_result_data("flow_graph", silent=True):
display(self._last_result.flow_graph)
nb_markdown(f"Network connections to {self.url}", "bold")
display(self._last_result.flows)
else:
nb_markdown(f"No flow data found for {self.url}")
@set_text(docs=_CELL_DOCS, key="browse_alerts")
def browse_alerts(self) -> nbwidgets.SelectAlert:
"""Return alert browser/viewer."""
@ -335,6 +374,20 @@ class URLSummary(Notebooklet):
md("Cannot plot timeline with 0 or 1 event.")
return None
def _display_results(self):
"""Display all the notebooklet results."""
self._display_summary()
self._display_domain_record()
self._display_ip_record()
self._display_cert_details()
self._display_ti_data()
self._display_screenshot()
self._display_related_alerts()
self._display_bookmarks()
self._display_dns_results()
self._display_hosts()
self._display_flows()
def entropy(data):
"""Calculate Entropy of a String."""
@ -370,7 +423,6 @@ def _show_alert_timeline(related_alerts):
return None
@set_text(docs=_CELL_DOCS, key="show_domain_record")
def _domain_whois_record(domain, ti_prov):
"""Build a Domain Whois Record."""
dom_record = pd.DataFrame()
@ -396,27 +448,35 @@ def _domain_whois_record(domain, ti_prov):
)
ns_domains = []
# Identity domains populatirty with Open Page Rank
page_rank = ti_prov.result_to_df(
ti_prov.lookup_ioc(observable=domain, providers=["OPR"])
)
if page_rank["RawResult"][0]:
page_rank_score = page_rank["RawResult"][0]["response"][0][
"page_rank_integer"
]
# Identity domains popularity with Open Page Rank
if "OPR" in ti_prov.loaded_providers:
page_rank = ti_prov.result_to_df(
ti_prov.lookup_ioc(domain, providers=["OPR"])
)
if page_rank["RawResult"][0]:
page_rank_score = page_rank["RawResult"][0]["response"][0][
"page_rank_integer"
]
else:
page_rank_score = 0
dom_record["Page Rank"] = [page_rank_score]
else:
page_rank_score = 0
dom_record["Page Rank"] = [page_rank_score]
nb_markdown("OPR TI provider needed to calculate Page Rank score.")
dom_record["Page Rank"] = ["Not known - OPR provider needed"]
# Get a list of subdomains for the domain
url_ti = ti_prov.result_to_df(
ti_prov.lookup_ioc(observable=domain, providers=["VirusTotal"])
)
try:
sub_doms = url_ti["RawResult"][0]["subdomains"]
except (TypeError, KeyError):
sub_doms = "None found"
dom_record["Sub Domains"] = [sub_doms]
if "VirusTotal" in ti_prov.loaded_providers:
url_ti = ti_prov.result_to_df(
ti_prov.lookup_ioc(domain, providers=["VirusTotal"])
)
try:
sub_doms = url_ti["RawResult"][0]["subdomains"]
except (TypeError, KeyError):
sub_doms = "None found"
dom_record["Sub Domains"] = [sub_doms]
else:
nb_markdown("VT TI provider needed to get sub-domains.")
dom_record["Page Rank"] = ["Not known - OPR provider needed"]
# Work out domain entropy to identity possible DGA
dom_ent = entropy(domain)
@ -431,7 +491,6 @@ def _domain_whois_record(domain, ti_prov):
return dom_record
@set_text(docs=_CELL_DOCS, key="show_TLS_cert")
def _get_tls_cert_details(url, domain_validator):
"""Get details of a TLS certificate used by a domain."""
result, x509 = domain_validator.in_abuse_list(url)
@ -449,9 +508,8 @@ def _get_tls_cert_details(url, domain_validator):
return cert_df
@set_text(docs=_CELL_DOCS, key="show_IP_record")
def _get_ip_record(domain, domain_validator, ti_prov):
"""Get IP addresses assoicated with a domain."""
"""Get IP addresses associated with a domain."""
ip_record = None
if domain_validator.is_resolvable(domain) is True:
try:
@ -469,7 +527,6 @@ def _get_ip_record(domain, domain_validator, ti_prov):
"Creation Date": [ip_whois_result.get("creation_date", None)],
}
)
if isinstance(ip_record, pd.DataFrame) and not ip_record.empty:
ip_record = _process_tor_ip_record(ip_record, ti_prov)
ip_record = _process_previous_resolutions(ip_record, ti_prov)
@ -480,8 +537,9 @@ def _process_tor_ip_record(ip_record, ti_prov):
"""See if IP record contains Tor IP."""
tor = None
if "Tor" in ti_prov.loaded_providers:
print(ti_prov.loaded_providers)
tor = ti_prov.result_to_df(
ti_prov.lookup_ioc(observable=ip_record["IP Address"][0], providers=["Tor"])
ti_prov.lookup_ioc(ip_record["IP Address"][0], providers=["Tor"])
)
if tor is None or tor["Details"][0] == "Not found.":
ip_record["Tor Node?"] = "No"
@ -492,15 +550,18 @@ def _process_tor_ip_record(ip_record, ti_prov):
def _process_previous_resolutions(ip_record, ti_prov):
"""Get previous resolutions for IP in ip_record."""
ip_ti_results = ti_prov.result_to_df(
ti_prov.lookup_ioc(
observable=ip_record["IP Address"][0], providers=["VirusTotal"]
if "VirusTotal" in ti_prov.loaded_providers:
ip_ti_results = ti_prov.result_to_df(
ti_prov.lookup_ioc(ip_record["IP Address"][0], providers=["VirusTotal"])
)
try:
last_10 = ip_ti_results.T["VirusTotal"]["RawResult"]["resolutions"][:10]
prev_domains = [record["hostname"] for record in last_10]
except TypeError:
prev_domains = None
else:
prev_domains = (
"Unknown - VirusTotal provider required for previous resolution details."
)
)
try:
last_10 = ip_ti_results.T["VirusTotal"]["RawResult"]["resolutions"][:10]
prev_domains = [record["hostname"] for record in last_10]
except TypeError:
prev_domains = None
ip_record["Last 10 resolutions"] = [prev_domains]
return ip_record

Просмотреть файл

@ -2,14 +2,14 @@ metadata:
name: URLSummary
description: HURLs summary
default_options:
- ti: Displays TI results for the URL
- whois: Display a summary of the URL
- ip_record: Display a summary of the IP address the URL resolves to
- ti: Displays TI results for the URL.
- whois: Display a summary of the URL.
- ip_record: Display a summary of the IP address the URL resolves to.
- cert: Display a summary of TLS certs used by the URL.
- alerts: Displays a DataFrame of all alerts associated with the URL
- bookmarks: Displays a DataFrame of all bookmarks associated with the URL
- dns: Displays a DataFrame of all DNS events associated with the URL
- hosts: Displays a DataFrame of all hosts associated with the URL
- alerts: Displays a DataFrame of all alerts associated with the URL.
- bookmarks: Displays a DataFrame of all bookmarks associated with the URL.
- dns: Displays a DataFrame of all DNS events associated with the URL.
- hosts: Displays a DataFrame of all hosts associated with the URL.
other_options:
- screenshot: Capture a screenshot of the website using Browshot
keywords:
@ -26,6 +26,10 @@ output:
This shows a summary of the URL and its presence in data.
It will display an overview of the URL, its associated domain, IP addresses and TLS certificates.
It will also display a screenshot of the URL.
display_summary:
title: Summary
text:
Summary of the URL.
display_alert_timeline:
title: Timeline of related alerts
text:
@ -47,4 +51,34 @@ output:
show_IP_record:
title: IP Details
text:
This shows details of the IP address used by the URL.
This shows details of the IP address used by the URL.
show_ti_details:
title: TI Results
text:
This shows any results in Threat Intelligence feeds for the URL.
show_screenshot:
title: URL Screenshot
text:
Below is a static screenshot of the URL.
show_related_alerts:
title: Related Alerts
text:
This is a table of all the alerts related to the URL.
show_dns_results:
title: DNS Resolutions
text:
These are the DNS resolutions for the URL.
show_hosts:
title: Connecting Hosts
text:
The following hosts were obsereved connecting to the URL.
show_flows:
title: Network Flows
text:
These are the network flows observed to and from the URL.
show_bookmarks:
title: Related Bookmarks
text:
These are the Bookmarks related to the URL.

Просмотреть файл

@ -38,7 +38,7 @@ the code.
from typing import Any, Dict, Iterable, Optional, Union
import pandas as pd
from bokeh.plotting.figure import Figure
from bokeh.models import LayoutDOM
from msticpy.common.timespan import TimeSpan
try:
@ -121,7 +121,7 @@ class TemplateResult(NotebookletResult):
# Make sure they are documented in the Attributes section
# above.
self.all_events: Optional[pd.DataFrame] = None
self.plot: Figure = None
self.plot: Optional[LayoutDOM] = None
self.additional_info: Optional[dict] = None
@ -275,6 +275,7 @@ class TemplateNB(Notebooklet):
# This section contains functions that do the work. It can be split into
# cells recognized by some editors (like VSCode) but this is optional
# %%
# Get Windows Security Events
def _get_all_events(qry_prov, host_name, timespan):

Просмотреть файл

@ -6,7 +6,7 @@
"""host_network_summary notebooklet."""
from collections import namedtuple
from functools import lru_cache
from typing import Any, Dict, List, Set
from typing import Any, Dict, List, Optional, Set
import pandas as pd
from msticpy.common.timespan import TimeSpan
@ -23,7 +23,9 @@ __author__ = "Ian Hellen"
@lru_cache()
def get_heartbeat(
qry_prov: QueryProvider, host_name: str = None, host_ip: str = None
qry_prov: QueryProvider,
host_name: Optional[str] = None,
host_ip: Optional[str] = None,
) -> entities.Host:
"""
Get Heartbeat information for host or IP.
@ -64,8 +66,8 @@ def get_heartbeat(
def get_aznet_topology(
qry_prov: QueryProvider,
host_entity: entities.Host,
host_name: str = None,
host_ip: str = None,
host_name: Optional[str] = None,
host_ip: Optional[str] = None,
):
"""
Get Azure Network topology information for host or IP address.

Просмотреть файл

@ -493,7 +493,9 @@ class Notebooklet(ABC):
del fmt
return "No documentation available."
def check_valid_result_data(self, attrib: str = None, silent: bool = False) -> bool:
def check_valid_result_data(
self, attrib: Optional[str] = None, silent: bool = False
) -> bool:
"""
Check that the result is valid and `attrib` contains data.

Просмотреть файл

@ -9,7 +9,6 @@ from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from bokeh.models import LayoutDOM
from bokeh.plotting.figure import Figure
from msticpy.common.timespan import TimeSpan
from ._version import VERSION
@ -100,7 +99,7 @@ class NotebookletResult(DFViewer):
if isinstance(obj, pd.DataFrame):
suffix = f"<br>(showing top 5 of {len(obj)} rows)" if len(obj) > 5 else ""
return obj.head(5)._repr_html_() + suffix
if isinstance(obj, (LayoutDOM, Figure)):
if isinstance(obj, LayoutDOM):
show_bokeh(obj)
if hasattr(obj, "_repr_html_"):
return obj._repr_html_()
@ -186,8 +185,4 @@ class NotebookletResult(DFViewer):
def vis_properties(self) -> List[str]:
"""Return list of properties with visualizations."""
return [
attr
for attr, val in vars(self).items()
if isinstance(val, (LayoutDOM, Figure))
]
return [attr for attr, val in vars(self).items() if isinstance(val, LayoutDOM)]

Просмотреть файл

@ -27,7 +27,7 @@ nblts: NBContainer = NBContainer()
nb_index: Dict[str, Notebooklet] = {}
def discover_modules(nb_path: Union[str, Iterable[str]] = None) -> NBContainer:
def discover_modules(nb_path: Union[str, Iterable[str], None] = None) -> NBContainer:
"""
Discover notebooks modules.

Просмотреть файл

@ -6,7 +6,7 @@
"""Notebooklet templates module."""
import os
from pathlib import Path
from typing import Union
from typing import Optional, Union
from msticpy.common.utility import valid_pyname
@ -45,7 +45,7 @@ REPLACE_TEXT = {
def create_template(
nb_name: str = "MyNotebooklet",
folder: Union[str, Path] = ".",
author: str = None,
author: Optional[str] = None,
subfolder: bool = False,
overwrite: bool = False,
):

Просмотреть файл

@ -1,10 +1,10 @@
bokeh>=1.4.0
bokeh<3.0.0
defusedxml>=0.6.0
ipython>=7.23.1
ipywidgets>=7.5.1
lxml>=4.4.2
Markdown>=3.2.1
msticpy[azure]>=2.1.1
msticpy[azure]==2.3.1
numpy>=1.17.3
pandas>=0.25.3
python-dateutil>=2.8.1

Просмотреть файл

@ -4,11 +4,15 @@
# license information.
# --------------------------------------------------------------------------
"""Test the nb_template class."""
import json
import re
from pathlib import Path
from unittest.mock import patch
import pandas as pd
import pytest
import pytest_check as check
import respx
from bokeh.models import LayoutDOM
from msticpy.common.timespan import TimeSpan
from msticpy.datamodel import entities
@ -24,7 +28,12 @@ except ImportError:
from msticnb import data_providers, discover_modules, nblts
from ....unit_test_lib import TEST_DATA_PATH, GeoIPLiteMock, TILookupMock
from ....unit_test_lib import (
TEST_DATA_PATH,
GeoIPLiteMock,
TILookupMock,
get_test_data_path,
)
# pylint: disable=protected-access, no-member, redefined-outer-name, unused-argument
@ -45,8 +54,32 @@ def init_notebooklets(monkeypatch):
)
def test_account_summary_notebooklet(init_notebooklets):
@pytest.fixture(scope="session")
def whois_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("whois_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@pytest.fixture(scope="session")
def rdap_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("rdap_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_account_summary_notebooklet(
mock_whois, init_notebooklets, rdap_response, whois_response
):
"""Test basic run of notebooklet."""
mock_whois.return_value = whois_response["asn_response_1"]
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
test_nb = nblts.azsent.account.AccountSummary()
tspan = TimeSpan(period="1D")

Просмотреть файл

@ -43,6 +43,6 @@ def test_host_summary_notebooklet(init_notebooklets):
check.is_not_none(result.host_entity)
check.is_not_none(result.related_alerts)
check.is_instance(result.related_alerts, pd.DataFrame)
check.is_not_none(result.alert_timeline)
check.is_not_none(result.display_alert_timeline())
check.is_not_none(result.related_bookmarks)
check.is_instance(result.related_bookmarks, pd.DataFrame)

Просмотреть файл

@ -10,9 +10,10 @@ from pathlib import Path
import pandas as pd
import pytest
from bokeh.layouts import Column
from bokeh.plotting import Figure
from bokeh.models import LayoutDOM
from msticpy.common.timespan import TimeSpan
# pylint: disable=unused-import
try:
from msticpy.vis.foliummap import FoliumMap
except ImportError:
@ -45,7 +46,7 @@ def test_output_types(nbltdata): # pylint: disable=redefined-outer-name
assert isinstance(nbltdata.logon_sessions, pd.DataFrame)
assert isinstance(nbltdata.logon_matrix, pd.io.formats.style.Styler)
assert isinstance(nbltdata.plots, dict)
assert isinstance(nbltdata.plots["User Pie Chart"], Figure)
assert isinstance(nbltdata.plots["User Pie Chart"], LayoutDOM)
assert isinstance(nbltdata.timeline, Column)
@ -76,7 +77,7 @@ def test_local_data(monkeypatch):
assert nbltlocaldata.logon_sessions["SubjectUserName"].iloc[0] == "WinAttackSim$"
assert nbltlocaldata.logon_sessions["LogonProcessName"].iloc[3] == "Advapi "
assert "User Pie Chart" in nbltlocaldata.plots.keys()
assert isinstance(nbltlocaldata.plots["Process Bar Chart"], Figure)
assert isinstance(nbltlocaldata.plots["Process Bar Chart"], LayoutDOM)
assert isinstance(nbltlocaldata.logon_matrix, pd.io.formats.style.Styler)
assert nbltlocaldata.logon_matrix.index[0][0] == "Font Driver Host\\UMFD-0"
assert isinstance(nbltlocaldata.timeline, Column)

Просмотреть файл

@ -4,11 +4,15 @@
# license information.
# --------------------------------------------------------------------------
"""Test the nb_template class."""
import json
import re
from pathlib import Path
from unittest.mock import patch
import pandas as pd
import pytest
import pytest_check as check
import respx
from bokeh.models import LayoutDOM
from msticpy.common.timespan import TimeSpan
@ -19,6 +23,7 @@ from ....unit_test_lib import (
TEST_DATA_PATH,
GeoIPLiteMock,
TILookupMock,
get_test_data_path,
)
@ -91,16 +96,45 @@ def init_notebooklets(monkeypatch):
)
def test_ip_summary_notebooklet(init_notebooklets, monkeypatch):
@pytest.fixture(scope="session")
def whois_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("whois_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@pytest.fixture(scope="session")
def rdap_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("rdap_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_ip_summary_notebooklet(
mock_whois, init_notebooklets, monkeypatch, rdap_response, whois_response
):
"""Test basic run of notebooklet."""
test_nb = nblts.azsent.network.IpAddressSummary()
valid_tables = ["SigninLogs", "AzureActivity", "OfficeActivity"]
# test_nb.query_provider.schema.update(
# {tab: {} for tab in DEF_PROV_TABLES + valid_tables}
# )
eq_mock = create_mocked_exec_query(test_nb.query_provider.exec_query)
monkeypatch.setattr(test_nb.query_provider, "exec_query", eq_mock)
mock_whois.return_value = whois_response["asn_response_1"]
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
respx.get(
re.compile(r"https://otx\.alienvault.*|https://www\.virustotal.*")
).respond(200, json=_OTX_RESP)
respx.get(re.compile(r"https://check\.torproject\.org.*")).respond(404)
respx.get(re.compile(r".*SecOps-Institute/Tor-IP-Addresses.*")).respond(
200, content=b"12.34.56.78\n12.34.56.78\n12.34.56.78"
)
tspan = TimeSpan(period="1D")
result = test_nb.run(value="11.1.2.3", timespan=tspan)
@ -119,7 +153,11 @@ def test_ip_summary_notebooklet(init_notebooklets, monkeypatch):
check.is_instance(result.ti_results, pd.DataFrame)
def test_ip_summary_notebooklet_internal(init_notebooklets, monkeypatch):
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_ip_summary_notebooklet_internal(
mock_whois, init_notebooklets, monkeypatch, rdap_response, whois_response
):
"""Test basic run of notebooklet."""
# test_data = str(Path(TEST_DATA_PATH).absolute())
# monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
@ -134,6 +172,13 @@ def test_ip_summary_notebooklet_internal(init_notebooklets, monkeypatch):
test_nb = nblts.azsent.network.IpAddressSummary()
eq_mock = create_mocked_exec_query(test_nb.query_provider.exec_query)
monkeypatch.setattr(test_nb.query_provider, "exec_query", eq_mock)
mock_whois.return_value = whois_response["asn_response_1"]
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
respx.get(re.compile(r"https://check\.torproject\.org.*")).respond(404)
respx.get(re.compile(r".*SecOps-Institute/Tor-IP-Addresses.*")).respond(
200, content=b"12.34.56.78\n12.34.56.78\n12.34.56.78"
)
tspan = TimeSpan(period="1D")
valid_tables = [
@ -157,7 +202,11 @@ def test_ip_summary_notebooklet_internal(init_notebooklets, monkeypatch):
check.is_none(result.ti_results)
def test_ip_summary_notebooklet_all(init_notebooklets, monkeypatch):
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_ip_summary_notebooklet_all(
mock_whois, init_notebooklets, monkeypatch, rdap_response, whois_response
):
"""Test basic run of notebooklet."""
# test_data = str(Path(TEST_DATA_PATH).absolute())
# monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
@ -175,6 +224,15 @@ def test_ip_summary_notebooklet_all(init_notebooklets, monkeypatch):
test_nb.query_provider.schema.update({tab: {} for tab in DEF_PROV_TABLES})
eq_mock = create_mocked_exec_query(test_nb.query_provider.exec_query)
monkeypatch.setattr(test_nb.query_provider, "exec_query", eq_mock)
mock_whois.return_value = whois_response["asn_response_1"]
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
respx.get(
re.compile(r"https://otx\.alienvault.*|https://www\.virustotal.*")
).respond(200, json=_OTX_RESP)
respx.get(re.compile(r"https://check\.torproject\.org.*")).respond(404)
respx.get(re.compile(r".*SecOps-Institute/Tor-IP-Addresses.*")).respond(
200, content=b"12.34.56.78\n12.34.56.78\n12.34.56.78"
)
tspan = TimeSpan(period="1D")
result = test_nb.run(value="40.76.43.124", timespan=tspan, options=opts)
@ -201,7 +259,11 @@ def test_ip_summary_notebooklet_all(init_notebooklets, monkeypatch):
check.is_instance(result.ti_results, pd.DataFrame)
def test_ip_summary_mde_data(init_notebooklets, monkeypatch):
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_ip_summary_mde_data(
mock_whois, init_notebooklets, monkeypatch, rdap_response, whois_response
):
"""Test MDE data sets in run of notebooklet."""
# test_data = str(Path(TEST_DATA_PATH).absolute())
# monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
@ -226,6 +288,15 @@ def test_ip_summary_mde_data(init_notebooklets, monkeypatch):
)
eq_mock = create_mocked_exec_query(test_nb.query_provider.exec_query)
monkeypatch.setattr(test_nb.query_provider, "exec_query", eq_mock)
mock_whois.return_value = whois_response["asn_response_1"]
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
respx.get(
re.compile(r"https://otx\.alienvault.*|https://www\.virustotal.*")
).respond(200, json=_OTX_RESP)
respx.get(re.compile(r"https://check\.torproject\.org.*")).respond(404)
respx.get(re.compile(r".*SecOps-Institute/Tor-IP-Addresses.*")).respond(
200, content=b"12.34.56.78\n12.34.56.78\n12.34.56.78"
)
tspan = TimeSpan(period="1D")
result = test_nb.run(value="40.76.43.124", timespan=tspan, options=opts)
@ -252,3 +323,20 @@ def test_ip_summary_mde_data(init_notebooklets, monkeypatch):
check.is_not_none(result.whois)
check.is_instance(result.related_alerts, pd.DataFrame)
check.is_instance(result.ti_results, pd.DataFrame)
_OTX_RESP = {
"ioc_param": "url",
"response": {
"response": "Found stuff",
"pulse_info": {
"pulses": [
{
"name": ["somename"],
"tags": ["bad", "good", "ugly"],
"references": ["url1", "url2"],
}
]
},
},
}

Просмотреть файл

@ -4,12 +4,16 @@
# license information.
# --------------------------------------------------------------------------
"""Test the nb_template class."""
import json
import re
import sys
from pathlib import Path
from unittest.mock import patch
import pandas as pd
import pytest
import pytest_check as check
import respx
from bokeh.models import LayoutDOM
from msticpy.common.timespan import TimeSpan
@ -20,6 +24,7 @@ from ....unit_test_lib import (
TEST_DATA_PATH,
GeoIPLiteMock,
TILookupMock,
get_test_data_path,
)
# pylint: disable=no-member
@ -31,10 +36,49 @@ if not sys.platform.startswith("win"):
)
def test_network_flow_summary_notebooklet(monkeypatch):
@pytest.fixture
def init_notebooklets(monkeypatch):
"""Initialize notebooklets."""
test_data = str(Path(TEST_DATA_PATH).absolute())
discover_modules()
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
monkeypatch.setattr(data_providers, "TILookup", TILookupMock)
data_providers.init(
query_provider="LocalData",
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
providers=["tilookup", "geolitelookup"],
)
@pytest.fixture(scope="session")
def whois_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("whois_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@pytest.fixture(scope="session")
def rdap_response():
"""Return mock responses for Whois."""
json_text = (
get_test_data_path().joinpath("rdap_response.json").read_text(encoding="utf-8")
)
return json.loads(json_text)
@respx.mock
@patch("msticpy.context.ip_utils._asn_whois_query")
def test_network_flow_summary_notebooklet(
mock_whois, monkeypatch, init_notebooklets, rdap_response, whois_response
):
"""Test basic run of notebooklet."""
discover_modules()
test_data = str(Path(TEST_DATA_PATH).absolute())
mock_whois.return_value = whois_response["asn_response_1"]
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
monkeypatch.setattr(data_providers, "TILookup", TILookupMock)
data_providers.init(
@ -42,6 +86,7 @@ def test_network_flow_summary_notebooklet(monkeypatch):
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
)
respx.get(re.compile(r"http://rdap\.arin\.net/.*")).respond(200, json=rdap_response)
test_nb = nblts.azsent.network.NetworkFlowSummary()
tspan = TimeSpan(period="1D")

Просмотреть файл

@ -51,6 +51,7 @@ def test_print_methods():
def test_add_result_decorator():
"""Test method."""
# pylint: disable=too-few-public-methods
class _TestClass:
prop1 = None

Просмотреть файл

@ -6,7 +6,7 @@
"""NB metadata test class."""
import pytest_check as check
from msticnb import data_providers, nblts
from msticnb import data_providers, init, nblts
from msticnb.nb.azsent.host import host_summary
from msticnb.nb_metadata import NBMetadata, read_mod_metadata
@ -33,6 +33,7 @@ def test_read_metadata():
# pylint: disable=protected-access
def test_class_metadata(monkeypatch):
"""Test class correctly loads yaml metadata."""
init()
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
if "azuredata" in nblts.azsent.host.HostSummary.metadata.req_providers:
nblts.azsent.host.HostSummary.metadata.req_providers.remove("azuredata")

Просмотреть файл

@ -4,12 +4,13 @@
# license information.
# --------------------------------------------------------------------------
"""NB metadata test class."""
from msticnb import browse
from msticnb import browse, init
from msticnb.notebooklet import Notebooklet
def test_nb_browse():
"""Test Notebooklet browser."""
init()
browser = browse()
nb_list = browser.nb_select.options

Просмотреть файл

@ -16,7 +16,7 @@ except ImportError:
# Fall back to msticpy locations prior to v2.0.0
from msticpy.datamodel.pivot import Pivot
from msticnb import data_providers, nblts
from msticnb import data_providers, init, nblts
from msticnb.nb_pivot import add_pivot_funcs
from msticnb.notebooklet import NotebookletResult
@ -44,6 +44,7 @@ _EXPECTED_FUNCS = [
@pytest.fixture
def _init_pivot(monkeypatch):
init()
test_data = str(Path(TEST_DATA_PATH).absolute())
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
if "azuredata" in nblts.azsent.host.HostSummary.metadata.req_providers:

Просмотреть файл

@ -15,10 +15,10 @@ from lxml import etree # nosec
from markdown import markdown
from msticpy.common.timespan import TimeSpan
from msticnb import data_providers
from msticnb import data_providers, init, nblts
from msticnb.common import MsticnbDataProviderError
from msticnb.nb.azsent.host.host_summary import HostSummaryResult
from msticnb.read_modules import Notebooklet, nblts
from msticnb.read_modules import Notebooklet
from .nb_test import TstNBSummary
from .unit_test_lib import GeoIPLiteMock
@ -29,6 +29,7 @@ from .unit_test_lib import GeoIPLiteMock
def test_notebooklet_create(monkeypatch):
"""Test method."""
# Should run because required providers are loaded
init()
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
data_providers.init(
query_provider="LocalData", providers=["tilookup", "geolitelookup"]
@ -49,6 +50,8 @@ def test_notebooklet_create(monkeypatch):
new_nblt = nblt()
check.is_instance(new_nblt, Notebooklet)
check.is_none(new_nblt.result)
except MsticnbDataProviderError:
raise
finally:
nblt.metadata.req_providers = curr_provs
check.is_in("bad_provider", err.value.args[0])

Просмотреть файл

@ -36,7 +36,7 @@ the code.
from typing import Any, Dict, Iterable, Optional, Union
import pandas as pd
from bokeh.plotting.figure import Figure
from bokeh.models import LayoutDOM
try:
from msticpy.vis.timeline import display_timeline
@ -74,7 +74,7 @@ class CustomResult(NotebookletResult):
----------
all_events : pd.DataFrame
DataFrame of all raw events retrieved.
plot : bokeh.models.LayoutDOM
plot : bokeh.models.Optional[LayoutDOM]
Bokeh plot figure showing the account events on an
interactive timeline.
additional_info: dict
@ -88,7 +88,7 @@ class CustomResult(NotebookletResult):
# Make sure they are documented in the Attributes section
# above.
all_events: pd.DataFrame = None
plot: Figure = None
plot: Optional[LayoutDOM] = None
additional_info: Optional[dict] = None
@ -243,6 +243,7 @@ class CustomNB(Notebooklet):
# This section contains functions that do the work. It can be split into
# cells recognized by some editors (like VSCode) but this is optional
# %%
# Get Windows Security Events
def _get_all_events(qry_prov, host_name, timespan):

1
tests/testdata/rdap_response.json поставляемый Normal file

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

1
tests/testdata/whois_response.json поставляемый Normal file
Просмотреть файл

@ -0,0 +1 @@
{"ipv4": {"Private": ["10.0.0.1", ["Private", "Reserved"]], "Multicast": ["224.0.0.1", null], "Unspecified": ["0.0.0.0", null], "Reserved": ["198.51.100.1", ["Private", "Reserved"]], "Loopback": ["127.0.0.1", null], "Public": ["153.2.3.4", null], "Link Local": ["169.254.0.1", null]}, "ipv6": {"Private": ["FC00::C001:1DFF:FEE0:0", null], "Multicast": ["FF00::", null], "Unspecified": ["::", null], "Reserved": ["2001:db8::", ["Private", "Reserved"]], "Loopback": ["::1", null], "Public": ["2340:0023:AABA:0A01:0055:5054:9ABC:ABB0", null], "Link Local": ["FE80::C001:1DFF:FEE0:0", null]}, "asn_response_1": "AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name\n8068 | 13.107.4.50 | 13.107.4.0/24 | US | arin | 2015-03-26 | MICROSOFT-CORP-MSN-AS-BLOCK, US\n", "asn_response_2": "AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name\n8075 | 65.55.44.109 | 65.52.0.0/14 | US | arin | 2001-02-14 | MICROSOFT-CORP-MSN-AS-BLOCK, US\n"}

Просмотреть файл

@ -6,18 +6,16 @@
"""Unit test common utilities."""
import random
from pathlib import Path
from typing import Any, Dict, List, Optional
import attr
import pandas as pd
try:
from msticpy.context import TILookup
from msticpy.context.geoip import GeoIpLookup
from msticpy.context.tiproviders.ti_provider_base import LookupResult
except ImportError:
from msticpy.sectools.geoip import GeoIpLookup
from msticpy.sectools.tilookup import TILookup
from msticpy.sectools.tiproviders.ti_provider_base import LookupResult
from msticpy.datamodel.entities import GeoLocation, IpAddress
@ -30,6 +28,8 @@ def get_test_data_path():
"""Get path to testdata folder."""
cur_dir = Path(".").absolute()
td_paths = []
if cur_dir.joinpath("tests/testdata").is_dir():
return cur_dir.joinpath("tests/testdata")
td_path = None
while not td_paths:
td_paths = list(cur_dir.glob("**/tests/testdata"))
@ -40,10 +40,10 @@ def get_test_data_path():
raise FileNotFoundError("Cannot find testdata folder")
cur_dir = cur_dir.parent
return td_path
return Path(td_path).absolute()
TEST_DATA_PATH = get_test_data_path()
TEST_DATA_PATH = str(get_test_data_path())
DEF_PROV_TABLES = [
@ -123,7 +123,7 @@ def _get_geo_loc():
)
# Need to keep same signatire as mocked class
# Need to keep same signature as mocked class
# pylint: disable=no-self-use
class TILookupMock:
"""Test class for TILookup."""
@ -132,44 +132,48 @@ class TILookupMock:
"""Initialize mock class."""
del args, kwargs
def lookup_ioc(self, observable, ioc_type: str = None, **kwargs):
def lookup_ioc(
self, ioc=None, observable=None, ioc_type: Optional[str] = None, **kwargs
):
"""Lookup fake TI."""
del kwargs
result_list = []
ioc = ioc or kwargs.get("observable")
result_list: List[Dict[str, Any]] = []
for i in range(3):
hit = random.randint(1, 10) > 5
result_args = dict(
ioc=observable,
ioc_type=ioc_type,
query_subtype="mock",
provider="mockTI",
result=True,
severity=2 if hit else 0,
details=f"Details for {observable}",
raw_result=f"Raw details for {observable}",
Provider=f"TIProv-{i}",
Ioc=observable,
IocType=ioc_type,
QuerySubtype="mock",
Result=True,
Severity=2 if hit else 0,
Details=f"Details for {observable}",
RawResult=f"Raw details for {observable}",
)
if check_mp_version("2.0"):
result_args["sanitized_value"] = observable
else:
result_args["safe_ioc"] = observable
result_list.append((f"TIProv{i}", LookupResult(**result_args)))
return True, result_list
result_args["SafeIoC"] = observable
result_list.append(result_args)
return pd.DataFrame(result_list)
def lookup_iocs(self, data, obs_col: str = None, **kwargs):
def lookup_iocs(self, data, obs_col: Optional[str] = None, **kwargs):
"""Lookup fake TI."""
del kwargs
item_result: List[pd.DataFrame] = []
if isinstance(data, dict):
for obs, ioc_type in data.items():
_, item_result = self.lookup_ioc(observable=obs, ioc_type=ioc_type)
item_result.extend(
self.lookup_ioc(observable=obs, ioc_type=ioc_type)
for obs, ioc_type in data.items()
)
elif isinstance(data, pd.DataFrame):
for row in data.itertuples():
_, item_result = self.lookup_ioc(observable=row[obs_col])
item_result.extend(
self.lookup_ioc(observable=row[obs_col]) for row in data.itertuples()
)
elif isinstance(data, list):
for obs in data:
_, item_result = self.lookup_ioc(observable=obs)
results = [pd.Series(attr.asdict(ti_result)) for _, ti_result in item_result]
return pd.DataFrame(data=results).rename(columns=LookupResult.column_map())
item_result.extend(self.lookup_ioc(observable=obs) for obs in data)
return pd.concat(item_result) if item_result else pd.DataFrame()
@classmethod
def result_to_df(cls, ioc_lookup):