This commit is contained in:
Aaron Schnieder 2017-11-04 19:48:38 -07:00 коммит произвёл GitHub
Родитель dd2b7aa580
Коммит 652b350d47
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
44 изменённых файлов: 2961 добавлений и 10 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -99,3 +99,5 @@ ENV/
# mypy
.mypy_cache/
.vscode

43
Docker.md Normal file
Просмотреть файл

@ -0,0 +1,43 @@
# Using Docker for local dev
## Requirements
* [Docker](https://docs.docker.com/engine/installation/)
* Docker Compose (usually installed with docker)
## Building
* `docker-compose build`
## Execution
### Redis
* `docker-compose up redis`
* You can pass `-d` flag for running in background
### Redis queue web UI
* `docker-compose up dashboard`
* You can pass `-d` flag for running in background
* Open browser and navigate to [localhost:8080](http://localhost:8080)
### Scheduler
* `docker-compose up scheduler`
* This executes the app/scheduler.py in docker environment
* The scheduler will read a data file and queue a job for each row of data
### Processor
* `docker-compose up processor`
* This executes the app/processor.py in docker environment
* The processor unwraps an AES key used for decrypting each job's data record, then it runs a RQ worker
## Other Containers for testing
### Run worker (shell mode)
* Redis queue allows you to run the workers just by running the `rq worker` command.
* To do this in isolated docker environment run `docker-compose up worker`
### Run worker (python scrypt mode)
* Redis queue allows you to run the workers just by running a python script (see `app/worker.py`)
* To do this in isolated docker environment run `docker-compose up worker-script`
### Run job manager
* `docker-compose up manager`
* This command executes `app/manager.py` in docker environment.

Просмотреть файл

@ -1,14 +1,38 @@
# Introduction
High scale parallel processing architecture in Azure based on Redis RQ and written in Azure.
# Contributing
# Dev Setup
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.microsoft.com.
### Pyenv for multiple python versions (macOS)
brew install pyenv
echo -e 'if command -v pyenv 1>/dev/null 2>&1; then\n eval "$(pyenv init -)"\nfi' >> ~/.bash_profile
exec "$SHELL"
xcode-select --install
CFLAGS="-I$(brew --prefix openssl)/include" \
LDFLAGS="-L$(brew --prefix openssl)/lib" \
pyenv install -v 2.7.5
pyenv rehash
cd <yourlocalpath>/azure-python-redis-queue-processor
pyenv local 2.7.5
python --version
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
### Dependencies
- [Python 2.7.5](http://#)
- [PIP Package Manager](https://pip.pypa.io/en/stable/installing/)
- [Python code editor](http://#)
- [Git](http://#)
- [Azure CLI](http://)
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
### Setup Instructions
1. Git clone repo
2. Install dependencies
3. Open bash shell
4. Install Azure SDKs
```
sudo pip install --ignore-installed azure-keyvault
sudo pip install --ignore-installed msrestazure
sudo pip install --ignore-installed adal
sudo pip install --ignore-installed azure-storage
sudo pip install --ignore-installed enum
sudo pip install --ignore-installed redis
```

0
app/__init__.py Normal file
Просмотреть файл

102
app/aescipher.py Normal file
Просмотреть файл

@ -0,0 +1,102 @@
from os import urandom
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
class AESCipher:
""" Wrapper for cryptography aes cipher.
:attr char: padding_value(char): padding character used for encryption.
"""
padding_value = '\0'
def __init__(self, key, iv):
"""
Cipher constructor.
:param str key: AES key
:param str iv: initialization vector
"""
self._key = key
self._iv = iv
self._cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
def encrypt(self, content):
"""
Encrypt string using (key, iv) pair.
Uses padding_value if content has wrong padding.
:param str content: unencrypted string.
:returns: Encrypted string.
"""
padding = len(content) % 16
if padding != 0:
content += ''.join(self.padding_value for i in range(16 - padding))
encryptor = self._cipher.encryptor()
ct = encryptor.update(content) + encryptor.finalize()
return ct
def decrypt(self, content):
"""
Decrypt string using (key, iv) pair.
Removes padding_value from the end.
:param str content: encrypted string.
:returns: Unencrypted string.
"""
decryptor = self._cipher.decryptor()
content = decryptor.update(content) + decryptor.finalize()
return content.rstrip(self.padding_value)
def encrypt_file(self, in_filename):
"""
Encrypt file content using (key, iv) pair.
Uses padding_value if content has wrong padding.
:param str in_filename(in_filename): unencrypted data file name.
:returns: Encrypted string.
"""
with open(in_filename, "rb") as file:
content = file.read()
return self.encrypt(content)
def decrypt_file(self, in_filename):
"""
Decrypt file using (key, iv) pair.
Removes padding_value from the end.
:param str out_filename(out_filename): encrypted data file name.
:returns: Unencrypted string.
"""
with open(in_filename, "rb") as file:
content = file.read()
return self.decrypt(content)
def encrypt_file_save_file(self, in_filename, out_filename):
"""
Encrypt file using (key, iv) pair and save result in a file.
Uses padding_value if content has wrong padding.
:param str in_filename(in_filename): unencrypted data file name.
:param str out_filename(out_filename): encrypted data file name.
"""
content = self.encrypt_file(in_filename)
with open(out_filename, "wb+") as out:
out.write(content)
def decrypt_file_save_file(self, in_filename, out_filename):
"""
Decrypt file using (key, iv) pair and save result in a file.
Removes padding_value from the end.
:param str in_filename(in_filename): encrypted data file name.
:param str out_filename(out_filename): unencrypted data file name.
"""
content = self.decrypt_file(in_filename)
with open(out_filename, "wb+") as out:
out.write(content)

115
app/aeskeywrapper.py Normal file
Просмотреть файл

@ -0,0 +1,115 @@
from azure.keyvault import KeyVaultClient
from azure.common.credentials import ServicePrincipalCredentials
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives import hashes
class AESKeyWrapper:
""" Wrapper for key wrapping functions.
Key is wrapped localy with public key retrieved from Azure KeyVault.
Uses Azure KeyVault API to unwrap the key.
"""
def __init__(self, vault, client_id, secret, tenant, key_name, key_version):
"""
Wrapper constructor.
:param str vault: Azure KeyVault url.
:param str client_id: Azure Client Id.
:param str secret: Azure Client secret.
:param str tenant: Azure tenant id.
:param str key_name: Azure KeyVault key name.
:param str key_version: Azure KeyVault key version.
"""
self._key_name = key_name
self._key_version = key_version
self._vault = vault
self._client_id = client_id
self._secret = secret
self._tenant = tenant
self._credentials = ServicePrincipalCredentials(
client_id = self._client_id,
secret = self._secret,
tenant = self._tenant)
self.kvclient = KeyVaultClient(self._credentials)
def wrap_aes_key_local(self, aes_key, public_key):
"""
Wraps AES key locally.
Uses RSA-OAEP algorithm to wrap provided key.
:param str aes_key: unencrypted AES key.
:param str public_key: public part of RSA key.
:return: String with encrypted AES key.
"""
int_n = self._bytes_to_int(public_key.n)
int_e = self._bytes_to_int(public_key.e)
public_numbers = RSAPublicNumbers(int_e, int_n)
public_key = public_numbers.public_key(default_backend())
wrapped_key = public_key.encrypt(aes_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
return wrapped_key
def unwrap_aes_key(self, wrapped_key):
"""
Unwraps AES key with Azure KeyVault.
Uses RSA-OAEP algorithm to unwrap provided key.
:param str wrapped_key: encrypted AES key.
:return: String with unencrypted AES key.
"""
return self.kvclient.unwrap_key(self._vault, self._key_name, self._key_version, 'RSA-OAEP', wrapped_key).result
def get_public_key(self):
""" Retrieve public key from Azure KeyVault.
:return: JsonWebKey with public RSA key.
"""
key_bundle = self.kvclient.get_key(self._vault, self._key_name, self._key_version)
return key_bundle.key
def _bytes_to_int(self, bytes):
""" Helper function to convert bytes array to int. """
result = 0
for b in bytes:
result = result * 256 + ord(b)
return result
# Tests only
import random, string
from config import Config
config = Config()
def generate_aes_key(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
if __name__ == "__main__":
wrapper = AESKeyWrapper(vault = '',
client_id = '',
secret = '',
tenant = '',
key_name = '',
key_version = '')
public_key = wrapper.get_public_key()
for i in range(100):
key = generate_aes_key(32)
wrapped_key = wrapper.wrap_aes_key_local(key, public_key)
restored_aes_key = wrapper.unwrap_aes_key(wrapped_key)
if key != restored_aes_key.result:
print("==========================")
print(key)
print("--------------------------")
print(restored_aes_key.result)
print("")

70
app/azurerest.py Normal file
Просмотреть файл

@ -0,0 +1,70 @@
import platform
import requests
import adal
from config import Config
class AzureRest(object):
"""
Provides Azure REST API request functionality, adding all necessary headers and access tokens
"""
def __init__(self, logger):
"""
Constructor
"""
self.config = Config()
def _log_exception(self, exception, functionName):
"""
Logs an exception to the logger instance for this class.
:param Exeption exception: The exception thrown.
:param str functionName: Name of the function where the exception occurred.
"""
self.logger.debug("Exception occurred in: " + functionName)
self.logger.debug(type(exception))
self.logger.debug(exception)
def get_user_agent(self):
"""
Returns a user agent string for use in Azure REST API requests
:return: User agent string
:rtype: str
"""
user_agent = "python/{} ({}) requests/{} app/AzureMetrics".format(
platform.python_version(),
platform.platform(),
requests.__version__)
return user_agent
def get_access_token(self):
"""
Returns an access token for use in Azure REST API requests
:return: Access token or None on failure
:rtype: str
"""
try:
context = adal.AuthenticationContext('https://login.microsoftonline.com/' + self.config.tenant_id)
token_response = context.acquire_token_with_client_credentials('https://management.core.windows.net/', self.config.client_id, self.config.client_secret)
return token_response.get('accessToken')
except Exception as ex:
self._log_exception(ex, self.get_access_token.__name__)
return None
def http_get(self, uri):
"""
Executes Azure REST API HTTP GET request
:param str uri: The uri to use.
:return: Response or None on failure
:rtype: object
"""
try:
accessToken = self.get_access_token()
headers = {"Authorization": 'Bearer ' + accessToken}
headers['User-Agent'] = self.get_user_agent()
return requests.get(uri, headers=headers).json()
except Exception as ex:
self._log_exception(ex, self.http_get.__name__)
return None

77
app/config.py Normal file
Просмотреть файл

@ -0,0 +1,77 @@
"""
config.py only contains the configuration class
"""
import os
class Config(object):
"""
This class contains configuration parameters for all applications
"""
def __init__(self):
# Job Status storage account name
self.job_status_storage = ''
# Job Status storage SAS token
self.job_status_sas_token = '
# Azure Subscription
self.subscription_name = ''
self.subscription_id = ''
# Encrypted files storage SAS token
self.encrypted_files_sas_token = ''
self.storage_account_name = ''
self.storage_container_name = ''
# File names
self.encrypted_aes_key_filename = ''
self.encrypted_scheduler_script_filename = ''
self.encrypted_files_folder = ''
self.scheduler_script_filename = ''
self.encrypted_data_filename = ''
# AES key configuration
self.aes_key_length = 32
# Azure keyvault configuration
self.azure_keyvault_url = ''
self.azure_keyvault_client_id = ''
self.azure_keyvault_secret = ''
self.azure_keyvault_tenant_id = ''
self.azure_keyvault_key_name = 'RSAKey'
self.azure_keyvault_key_version = ''
# Azure Active Directory - Directory ID in Azure portal
self.tenant_id = ''
# Your Service Principal Application ID in Azure Portal
self.client_id = ''
# Application Key Value in Azure Portal
self.client_secret = ''
# Your Key Vault URI
self.key_vault_uri = ''
#Key vault API version
self.key_vault_api_version = '2016-10-01'
# Redis Q Host Address
self.redis_host = os.getenv('REDIS_HOST', 'localhost')
# Redis Q Port
self.redis_port = os.getenv('REDIS_PORT', 6379)
#Logger Queue Name
self.logger_queue_name = ''
#Logger Queue Storage Account Name
self.logger_storage_account_name=''
# Logger Queue SAS
self.logger_queue_sas =''
# Metrics configuration
self.vm_resource_group = ''
self.metrics_storage = ''
self.metrics_sas_token = ''

53
app/functions.py Normal file
Просмотреть файл

@ -0,0 +1,53 @@
"""
Functions that will be executed by the jobs
"""
import os
import logging
from datetime import datetime
from aescipher import AESCipher
from jobstatus import JobStatus
from jobstatus import JobState
from rq import get_current_job
LOGGER = logging.getLogger(__name__)
def init_logging():
"""
Initialize the logger
"""
LOGGER.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-20s %(levelname)-5s %(message)s')
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
init_logging()
jobstatus = JobStatus(LOGGER)
def multiply_by_two(x):
"""
Simple test function
"""
if x % 5 == 0:
raise Exception('Went wrong')
return x * 2
def _create_aes_cipher():
"""
Get the environment variables set for the AES Key from the job processor
and construct the AESCipher
:return: an AESCipher created from the AES key and IV set by the parent process
"""
return AESCipher(os.environ['AES_SECRET'], os.environ['AES_IV'])
def processing_job(encryptedRecord):
"""
This will decrypt the data and perform some task
:param object encryptedRecord: This is the encrypted record to be processed
:return: returns result of the job
"""
job = get_current_job()
aes_cipher = _create_aes_cipher()
record = int(aes_cipher.decrypt(encryptedRecord))
jobstatus.update_job_status(job.id, JobState.done)
return record * 2

190
app/jobstatus.py Normal file
Просмотреть файл

@ -0,0 +1,190 @@
import pickle
import redis
from datetime import datetime
from enum import IntEnum
from azure.storage.queue import QueueService, models
from config import Config
class JobState(IntEnum):
none = 0
queued = 1
processing = 2
processed = 3
done = 4
failed = 5
class JobStatusRecord(object):
"""
Custom object to track a single job's status
"""
def __init__(self):
"""
Initializes a new instance of the JobStatusRecord custom object.
"""
self.job_name = ""
self.job_id = ""
self.job_state = JobState.none
self.created = None
self.last_updated = None
self.last_error = None
class JobStatus(object):
"""
Class for managing job status records.
"""
def __init__(self, logger):
"""
Initializes a new instance of the JobStatus class.
:param logger logger: The logger instance to use for logging.
"""
self.logger = logger
self.config = Config()
if(self.init_storage_services() is False):
raise Exception("Errors occured instantiating job status storage service.")
if(self.init_storage() is False):
raise Exception("Errors occured validating job status table exists.")
def init_storage_services(self):
"""
Initializes the storage service clients using values from config.py.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# creates instance of Redis client to use for job status storage
pool = redis.ConnectionPool(host=self.config.redis_host, port=self.config.redis_port)
self.storage_service_cache = redis.Redis(connection_pool=pool)
# creates instance of QueueService to use for completed job status storage
self.storage_service_queue = QueueService(account_name = self.config.job_status_storage, sas_token = self.config.job_status_sas_token)
# set the encode function for objects stored as queue message to noencode, serialization will be handled as a string by pickle
# http://azure-storage.readthedocs.io/en/latest/ref/azure.storage.queue.queueservice.html
# http://azure-storage.readthedocs.io/en/latest/_modules/azure/storage/queue/models.html
self.storage_service_queue.encode_function = models.QueueMessageFormat.noencode
return True
except Exception as ex:
self.log_exception(ex, self.init_storage_services.__name__)
return False
def init_storage(self):
"""
Initializes storage table & queue, creating it if it doesn't exist.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# will create job status queue if it doesn't exist
self.storage_service_queue.create_queue(self.config.job_status_storage)
return True
except Exception as ex:
self.log_exception(ex, self.init_storage.__name__)
return False
def log_exception(self, exception, functionName):
"""
Logs an exception to the logger instance for this class.
:param Exeption exception: The exception thrown.
:param str functionName: Name of the function where the exception occurred.
"""
self.logger.debug("Exception occurred in: " + functionName)
self.logger.debug(type(exception))
self.logger.debug(exception)
def add_job_status(self, jobName, jobId, jobState):
"""
Adds a new job status record.
:param str jobName: The name of the job, this will be used as the partition key for the table.
:param str jobId: Id for the job.
:param JobState jobState: Enum for the current job state.
:return: True on succeess. False on failure.
:rtype: boolean
"""
record = JobStatusRecord()
record.job_name = jobName
record.job_id = jobId
record.created = datetime.utcnow()
record.job_state = int(jobState)
try:
# serialize the JobStatusRecord
jobStatusSerialized = pickle.dumps(record)
# write the serialized record out to Redis
self.storage_service_cache.set(jobId, jobStatusSerialized)
self.logger.info('queued: ' + jobId)
return True
except Exception as ex:
self.log_exception(ex, self.add_job_status.__name__)
return False
def get_job_status(self, jobId):
"""
Gets a job status record from storage.
:param str jobId: Id for the job.
:return: JobStatusRecord record on success. None on failure.
:rtype: JobStatusRecord or None
"""
try:
# get the serialized job status record from Redis
serializedRecord = self.storage_service_cache.get(jobId)
# deserialize the record and return the JobStatusRecord object
return pickle.loads(serializedRecord)
except Exception as ex:
self.log_exception(ex, self.get_job_status.__name__)
return None
def update_job_status(self, jobId, jobState, error = None):
"""
Updates a job status record.
:param str jobId: Id for the job.
:param JobState jobState: Enum for the current job state.
:param error: Optional parameter to provide error details for failed state.
:type error: str or None
:return: True on succeess. False on failure.
:rtype: boolean
"""
record = self.get_job_status(jobId)
record.job_id = jobId
record.last_updated = datetime.utcnow()
record.job_state = int(jobState)
if(error is not None):
record.last_error = error
try:
# serialize the JobStatusRecord
jobStatusSerialized = pickle.dumps(record)
# write the job status record out to table storage
self.storage_service_cache.set(jobId, jobStatusSerialized)
# if the job is complete or failed, write it out to the queue
if(jobState is JobState.done or jobState is JobState.failed):
self.queue_job_status(record)
return True
except Exception as ex:
self.log_exception(ex, self.update_job_status.__name__)
return False
def queue_job_status(self, jobStatusRecord):
"""
Queues at job status record
:param object jobStatus: The job status record to store in the queue message.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
jobStatusRecordSerialized = pickle.dumps(jobStatusRecord)
self.storage_service_queue.put_message(self.config.job_status_storage, jobStatusRecordSerialized)
return True
except Exception as ex:
self.log_exception(ex, self.queue_job_status.__name__)
return False

202
app/metricslogger.py Normal file
Просмотреть файл

@ -0,0 +1,202 @@
import pickle
from enum import Enum
from azure.storage.queue import QueueService, models
from config import Config
from azurerest import AzureRest
class AzureResource(Enum):
vm = 'vm',
vm_scale_set = 'vm_scale_set'
class MetricsLogger(object):
"""
Logs Microsoft Azure resource metrics to storage
"""
def __init__(self, logger):
"""
Constructor. Initializes storage, config and other dependencies
:param logger logger: The logger instance to use for logging
"""
self.logger = logger
self.config = Config()
self.azure_rest = AzureRest(logger)
if(self.init_storage_service() is False):
raise Exception("Errors occured instantiating metrics storage service.")
if(self.init_storage() is False):
raise Exception("Errors occured validating metrics table exists.")
def init_storage_service(self):
"""
Initializes the storage service client using values from config.py.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# creates instance of QueueService to use for completed metrics storage
self.storage_service_queue = QueueService(account_name = self.config.metrics_storage, sas_token = self.config.metrics_sas_token)
# set the encode function for objects stored as queue message to noencode, serialization will be handled as a string by pickle
# http://azure-storage.readthedocs.io/en/latest/ref/azure.storage.queue.queueservice.html
# http://azure-storage.readthedocs.io/en/latest/_modules/azure/storage/queue/models.html
self.storage_service_queue.encode_function = models.QueueMessageFormat.noencode
return True
except Exception as ex:
self._log_exception(ex, self.init_storage_service.__name__)
return False
def init_storage(self):
"""
Initializes storage queue, creating it if it doesn't exist.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# will create metrics storage queue if it doesn't exist
self.storage_service_queue.create_queue(self.config.metrics_storage)
return True
except Exception as ex:
self._log_exception(ex, self.init_storage.__name__)
return False
def _log_exception(self, exception, functionName):
"""
Logs an exception to the logger instance for this class.
:param Exeption exception: The exception thrown.
:param str functionName: Name of the function where the exception occurred.
"""
self.logger.debug("Exception occurred in: " + functionName)
self.logger.debug(type(exception))
self.logger.debug(exception)
def resource_provider_lookup(self, azureResource):
"""
Returns Azure Resource Provider string from enum.
:param Enum azureResource: The AzureResource to retrieve the Resource Provider for.
:return: Azure Resource Provider string
:rtype: str
"""
switch = {
AzureResource.vm: "Microsoft.Compute",
AzureResource.vm_scale_set: "Microsoft.Compute",
}
return switch.get(azureResource, None)
def resource_type_lookup(self, azureResource):
"""
Returns Azure Resource Type string from enum.
:param Enum azureResource: The AzureResource to retrieve the Resource Type for.
:return: Azure Resource Type string
:rtype: str
"""
switch = {
AzureResource.vm: "virtualMachines",
AzureResource.vm_scale_set: "virtualMachineScaleSets",
}
return switch.get(azureResource, None)
def get_vms_in_resource_group(self, resourceGroup):
"""
Calls Azure to get a list of VMs in a resource group
:param str resourceGroup: The Azure Resource Group containing the VMs to list
:return: Array of vm names
:rtype: Array[str]
"""
vmListArray = []
try:
# get the resource provider and resource type we need for the URI
resourceProvider = self.resource_provider_lookup(AzureResource.vm)
resourceType = self.resource_type_lookup(AzureResource.vm)
# build the full Microsoft Azure REST uri
baseUri = self.buildAzureMetricsBaseUri(resourceGroup, resourceProvider, resourceType, None)
uri = baseUri + "?api-version=2017-03-30"
#https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroup}/providers/Microsoft.Compute/virtualmachines?api-version={apiVersion}
# execute the HTTP GET request and capture the response
vmList = self.azure_rest.http_get(uri)
if(vmList is not None):
# iterate through the list of vms and build a string array of vm names
for item in vmList.viewitems():
value = item[0], item[1]
vmName = value[1][0]['name']
#vmName is unicode encoded, so we need to get the string
if(vmName is not None):
vmListArray.append(vmName.encode("utf-8"))
except Exception as ex:
self._log_exception(ex, self.get_vms_in_resource_group.__name__)
return False
return vmListArray
def buildAzureMetricsBaseUri(self, resourceGroup, resourceProvider, resourceType, resourceName):
"""
Builds a base Azure Metrics API URI
:param str resourceGroup: The Azure Resource Group to use in the URI
:param str resourceProvider: The Azure Resource Provider to use in the URI
:param str resourceType: The Azure Resource Type to use in the URI
:param str resourceName: The Azure Resource name to use in the URI
:return: Base Azure Metrics API URI
:rtype: str
"""
# build the base Microsoft Azure REST uri
baseUri = ''.join(['https://management.azure.com',
'/subscriptions/', self.config.subscription_id,
'/resourceGroups/', resourceGroup,
'/providers/', resourceProvider,
'/', resourceType])
# add a specific resource name to the URI if we received one
if(resourceName is not None):
baseUri = baseUri + '/' + resourceName
return baseUri
def get_metrics(self, azureResource, resourceGroup, resourceName):
"""
Get metrics using the Microsoft Insights Azure metrics service for a resource
:param Enum azureResource: The Azure Resource to retrieve metrics for
:param str resourceGroup: The Azure Resource Group to retrieve metrics for
:param str resourceName: The Azure Resource name to retrieve metrics for
:return: Azure Metrics response
:rtype: Object
"""
resourceProvider = self.resource_provider_lookup(azureResource)
resourceType = self.resource_type_lookup(azureResource)
baseUri = self.buildAzureMetricsBaseUri(resourceGroup, resourceProvider, resourceType, resourceName)
uri = baseUri + '/providers/microsoft.insights/metrics?api-version=2017-05-01-preview'
return self.azure_rest.http_get(uri)
def capture_vm_metrics(self):
"""
Iterates through all processing VMs and captures current VM metrics to storage.
"""
# get all VMs in the resource group specififed in the config
vmList = self.get_vms_in_resource_group(self.config.vm_resource_group)
# iterate through each vm in the list
for vmname in vmList:
try:
# get the metrics from the Azure Metrics service for this vm
vmmetrics = self.get_metrics(AzureResource.vm, self.config.vm_resource_group, vmname)
# write the metrics out to the storage queue
vmmetricsSerialized = pickle.dumps(vmmetrics)
self.storage_service_queue.put_message(self.config.metrics_storage, vmmetricsSerialized)
except Exception as ex:
self._log_exception(ex, self.capture_vm_metrics.__name__)

122
app/processor.py Normal file
Просмотреть файл

@ -0,0 +1,122 @@
"""
The class contains the Processor that will process each job
module deps:
pip install rq
"""
import argparse
import os
import logging
import redis
import time
from config import Config
from aescipher import AESCipher
from aeskeywrapper import AESKeyWrapper
from rq import Queue, Connection, Worker
# Logger
LOGGER = logging.getLogger(__name__)
class Processor(object):
"""
Processes Redis Q jobs
"""
def __init__(self, logger, redisHost, redisPort, queues, encryptedAESKeyPath):
"""
:param logger logger: the logger
:param str redis_host: Redis host where the Redis Q is running
:param int redis_port: Redis port where the Redis Q is running
:param array queues: the queues the worker will listen on
:param str encryptedAesKeyPath: path to the encrypted AES key file
"""
self.logger = logger
self.queues = queues
self.config = Config()
self.redis_host = redisHost
self.redis_port = redisPort
self.encrypted_aes_key_path = encryptedAESKeyPath
def _get_aes_key(self):
"""
Fetches the AES key using the values from the config
"""
# Decode AES key
self.logger.info('Decrypting AES Key')
wrapper = AESKeyWrapper(vault = self.config.azure_keyvault_url,
client_id = self.config.azure_keyvault_client_id,
secret = self.config.azure_keyvault_secret,
tenant = self.config.azure_keyvault_tenant_id,
key_name = self.config.azure_keyvault_key_name,
key_version = self.config.azure_keyvault_key_version)
with open(self.encrypted_aes_key_path, "rb") as aes_key_file:
wrapped_key = aes_key_file.read()
keys = wrapper.unwrap_aes_key(wrapped_key)
return (keys[:self.config.aes_key_length], keys[self.config.aes_key_length:])
def run(self):
"""
Execute the job processor - Fetch the AES key and start the Redis Q worker
"""
self.logger.info('Using redis host: %s:%s', self.redis_host, self.redis_port)
pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port)
redis_conn = redis.Redis(connection_pool=pool)
aes_key = self._get_aes_key()
# TODO: This is how we are passing the AES key to the fork child processes
# If there is a better way, we should change it.
os.environ['AES_SECRET'] = aes_key[0]
os.environ['AES_IV'] = aes_key[1]
self.logger.info('Starting worker')
# Wait until redis connection can be established
while(True):
try:
redis_conn.ping()
self.logger.info("Redis connection successful.")
break
except redis.exceptions.ConnectionError:
self.logger.info("Redis isn't running, sleep for 5 seconds.")
time.sleep(5)
with Connection(redis_conn):
worker = Worker(self.queues)
worker.work()
def init_logging():
"""
Initialize the logger
"""
LOGGER.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-20s %(levelname)-5s %(message)s')
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
def parse_args():
"""
Parse command line arguments
"""
config = Config()
parser = argparse.ArgumentParser(description='Process jobs from the Redis Q')
parser.add_argument('aesKeyFilePath', help='path to the encrypted aes key file.')
parser.add_argument('--queues', help='Redis Q queues to listen on', default=['high', 'default', 'low'])
parser.add_argument('--redisHost', help='Redis Q host.', default=config.redis_host)
parser.add_argument('--redisPort', help='Redis Q port.', default=config.redis_port)
return parser.parse_args()
if __name__ == "__main__":
# init logging
init_logging()
ARGS = parse_args()
LOGGER.info('Running Processor Sample')
PROCESSOR = Processor(LOGGER, ARGS.redisHost, ARGS.redisPort, ARGS.queues, ARGS.aesKeyFilePath)
PROCESSOR.run()

Просмотреть файл

@ -0,0 +1,13 @@
yum install -y wget
wget https://bootstrap.pypa.io/get-pip.py
python get-pip.py
pip install azure-storage
pip install azure-keyvault
pip install cryptography
pip install rq
tar -xzf processorscripts.tar.gz
python processorconfiguration.py
python processor.py data/aes.encrypted --redisHost $1 --redisPort 6379

Просмотреть файл

@ -0,0 +1,25 @@
"""
This is an unencrypted script, executed on worker node.
Script prepares worker node to run processor script.
Steps:
1. Download encrypted AES key.
"""
from azure.storage.blob import BlockBlobService
from config import Config
import os
config = Config()
blob_service = BlockBlobService(account_name=config.storage_account_name, sas_token=config.encrypted_files_sas_token)
try:
os.mkdir(config.encrypted_files_folder)
except:
pass
# Download encrypted AES key
blob_service.get_blob_to_path(container_name=config.storage_container_name,
blob_name=config.encrypted_aes_key_filename,
file_path=config.encrypted_files_folder + "/" + config.encrypted_aes_key_filename)
encrypted_aes_key_filename = config.encrypted_files_folder + "/" + config.encrypted_aes_key_filename

61
app/queuelogger.py Normal file
Просмотреть файл

@ -0,0 +1,61 @@
"""
pip install azure-storage-queue
"""
import sys
import socket
import json
from datetime import datetime
from azure.storage.queue import QueueService
from config import Config
class QueueLogger(object):
"""
This class contains functionality to log stdout to Azure Queue Storage
"""
def __init__(self, batch_size):
"""
Initializes a new instance of the QueueLogger class.
:param int batch_size: The number of messages to write into a single Azure Storage Queue message.
"""
self.terminal = sys.stdout
self.config = Config()
self.batch_size = batch_size
self.queue_service = QueueService(account_name = self.config.logger_storage_account_name,
sas_token = self.config.logger_queue_sas)
self.messages_to_write = []
def flush(self):
"""
Flush the internal buffer to Storage Queue
"""
self.put_message_to_queue()
def write(self, content):
"""
Buffers string content to be written to Storage Queue
:param str content: The content to write/buffer
"""
if(content == '\n'):
return
message = {
"machine": socket.gethostname(),
"content": content,
"time": str(datetime.now())
}
self.messages_to_write.append(message)
if(len(self.messages_to_write) >= self.batch_size):
self.put_message_to_queue()
self.terminal.write(content)
def put_message_to_queue(self):
"""
Adds a new Storage Queue message to the back of the message queue.
"""
json_content = json.dumps(self.messages_to_write,sort_keys=True,indent=4, separators=(',', ': '))
self.queue_service.put_message(self.config.logger_queue_name, content = json_content)
self.messages_to_write.clear()

127
app/scheduler.py Normal file
Просмотреть файл

@ -0,0 +1,127 @@
"""
Scheduler.py will read a data file, format it, and then enqueue jobs for each record into Redis Q
module deps:
pip install rq
"""
import argparse
import logging
import redis
import time
from datetime import datetime
from config import Config
from functions import processing_job
from rq import Queue, Connection
from aescipher import AESCipher
from aeskeywrapper import AESKeyWrapper
from jobstatus import JobStatus
from jobstatus import JobState
# Logger
LOGGER = logging.getLogger(__name__)
class Scheduler(object):
"""
Scheduler class enqueues jobs to Redis Q
"""
def __init__(self, logger, redis_host, redis_port):
"""
:param logger logger: logger
:param str redis_host: Redis host where the Redis Q is running
:param int redis_port: Redis port where the Redis Q is running
"""
self.config = Config()
self.logger = logger
self.redis_host = redis_host
self.redis_port = redis_port
self.jobstatus = JobStatus(logger)
def format_record(self, record):
"""
TODO: Implement any formatting needed to transform the data before enqueueing
:param object record: an encrypted record from the data file
:return: formatted record
"""
return record.rstrip('\n')
def run(self, data_file_path):
"""
Run the queueing job
:param str ata_file_path: path to the file with the data to be processed
:return: jobs that were queued
"""
self.logger.info('processing data file:, %s', data_file_path)
self.logger.info('Using redis host: %s:%s', self.redis_host, self.redis_port)
# get a redis connection
pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port)
redis_conn = redis.Redis(connection_pool=pool)
# Wait until redis connection can be established
while(True):
try:
redis_conn.ping()
self.logger.info("Redis connection successful.")
break
except redis.exceptions.ConnectionError:
self.logger.info("Redis isn't running, sleep for 5 seconds.")
time.sleep(5)
# read in the file and queue up jobs
count = 0
jobs = []
jobname = str(datetime.now())
with open(data_file_path, 'r') as data_file:
with Connection(redis_conn):
queue = Queue()
for record in data_file:
job = queue.enqueue(processing_job, self.format_record(record))
self.jobstatus.add_job_status(jobname, job.id, JobState.queued)
count += 1
jobs.append(job)
self.logger.info('%d jobs queued', count)
return jobs
def init_logging():
"""
Initialize the logger
"""
LOGGER.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-20s %(levelname)-5s %(message)s')
handler.setFormatter(formatter)
LOGGER.addHandler(handler)
def parse_args():
"""
Parse command line arguments
"""
config = Config()
parser = argparse.ArgumentParser(description='Enqueue jobs to Redis Q')
parser.add_argument('dataFilePath', help='path to the data file.')
parser.add_argument('--redisHost', help='Redis Q host.', default=config.redis_host)
parser.add_argument('--redisPort', help='Redis Q port.', default=config.redis_port)
return parser.parse_args()
if __name__ == "__main__":
# init logging
init_logging()
ARGS = parse_args()
LOGGER.info('Running Scheduler Sample')
# start program
SCHEDULER = Scheduler(LOGGER, ARGS.redisHost, ARGS.redisPort)
JOBS = SCHEDULER.run(ARGS.dataFilePath)
time.sleep(3)
for job in JOBS:
print 'Job id:', job.id
print 'Job status:', job.status
print 'Job result:', job.result

Просмотреть файл

@ -0,0 +1,13 @@
yum install -y wget
wget https://bootstrap.pypa.io/get-pip.py
python get-pip.py
pip install azure-storage
pip install azure-keyvault
pip install cryptography
pip install rq
tar -xzf schedulerscripts.tar.gz
python schedulerconfiguration.py
python scheduler-unencrypted.py data/data.encrypted --redisHost $1 --redisPort 6379

Просмотреть файл

@ -0,0 +1,61 @@
"""
This is an unencrypted script, executed on master node.
Script prepares master node to run data processing script.
Steps:
1. Download encrypted AES key.
2. Download encrypted processing script.
3. Decrypt AES key with Azure Keyvault.
4. Decrypt script with AES key.
"""
from azure.storage.blob import BlockBlobService
from aescipher import AESCipher
from aeskeywrapper import AESKeyWrapper
from config import Config
import os
config = Config()
blob_service = BlockBlobService(account_name=config.storage_account_name, sas_token=config.encrypted_files_sas_token)
try:
os.mkdir(config.encrypted_files_folder)
except:
pass
# Download encrypted AES key
blob_service.get_blob_to_path(container_name=config.storage_container_name,
blob_name=config.encrypted_aes_key_filename,
file_path=config.encrypted_files_folder + "/" + config.encrypted_aes_key_filename)
encrypted_aes_key_filename = config.encrypted_files_folder + "/" + config.encrypted_aes_key_filename
# Download encrypted script
blob_service.get_blob_to_path(container_name=config.storage_container_name,
blob_name=config.encrypted_scheduler_script_filename,
file_path=config.encrypted_files_folder + "/" + config.encrypted_scheduler_script_filename)
encrypted_script_filename = config.encrypted_files_folder + "/" + config.encrypted_scheduler_script_filename
decrypted_script_filename = config.scheduler_script_filename
# Download encrypted data file
blob_service.get_blob_to_path(container_name=config.storage_container_name,
blob_name=config.encrypted_data_filename,
file_path=config.encrypted_files_folder + "/" + config.encrypted_data_filename)
# Decode AES key
wrapper = AESKeyWrapper(vault = config.azure_keyvault_url,
client_id = config.azure_keyvault_client_id,
secret = config.azure_keyvault_secret,
tenant = config.azure_keyvault_tenant_id,
key_name = config.azure_keyvault_key_name,
key_version = config.azure_keyvault_key_version)
with open(encrypted_aes_key_filename, "rb") as aes_key_file:
wrapped_key = aes_key_file.read()
keys = wrapper.unwrap_aes_key(wrapped_key)
key = keys[:config.aes_key_length]
iv = keys[config.aes_key_length:]
# Decode script
aes_cipher = AESCipher(key, iv)
aes_cipher.decrypt_file_save_file(encrypted_script_filename, decrypted_script_filename)

36
app/test.py Normal file
Просмотреть файл

@ -0,0 +1,36 @@
import logging
import time
from jobstatus import JobStatus, JobState
from metricslogger import MetricsLogger
logger = logging.getLogger(__name__)
def initLogging():
logger.setLevel(logging.DEBUG)
# setup a console logger by default
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if __name__ == "__main__":
initLogging()
# job status tests
jobName = "TestJob"
jobId = "1111"
js = JobStatus(logger)
js.add_job_status(jobName, jobId, JobState.none)
time.sleep(5)
js.update_job_status(jobId, JobState.queued)
time.sleep(5)
js.update_job_status(jobId, JobState.processing)
time.sleep(5)
js.update_job_status(jobId, JobState.processed)
time.sleep(5)
js.update_job_status(jobId, JobState.failed, 'crazy error')
# metrics logger test
ml = MetricsLogger(logger)
ml.capture_vm_metrics()

581
arm/azuredeploy.json Normal file
Просмотреть файл

@ -0,0 +1,581 @@
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"storageAccountType": {
"type": "string",
"defaultValue": "Standard_LRS",
"allowedValues": [
"Standard_LRS",
"Standard_GRS",
"Standard_ZRS",
"Premium_LRS"
],
"metadata": {
"description": "Storage Account type"
}
},
"tenantId": {
"type": "string",
"metadata": {
"description": "Tenant Id for the subscription and use assigned access to the vault. Available from the Get-AzureRMSubscription PowerShell cmdlet"
}
},
"accessPolicies": {
"type": "array",
"defaultValue": "{}",
"metadata": {
"description": "Access policies object {\"tenantId\":\"\",\"objectId\":\"\",\"permissions\":{\"keys\":[\"\"],\"secrets\":[\"\"]}}"
}
},
"vaultSku": {
"type": "string",
"defaultValue": "Standard",
"allowedValues": [
"Standard",
"Premium"
],
"metadata": {
"description": "SKU for the vault"
}
},
"enabledForDeployment": {
"type": "bool",
"defaultValue": false,
"metadata": {
"description": "Specifies if the vault is enabled for VM or Service Fabric deployment"
}
},
"enabledForTemplateDeployment": {
"type": "bool",
"defaultValue": false,
"metadata": {
"description": "Specifies if the vault is enabled for ARM template deployment"
}
},
"enableVaultForVolumeEncryption": {
"type": "bool",
"defaultValue": false,
"metadata": {
"description": "Specifies if the vault is enabled for volume encryption"
}
},
"secretsObject": {
"type": "secureObject",
"defaultValue": "{}",
"metadata": {
"description": "all secrets {\"secretName\":\"\",\"secretValue\":\"\"} wrapped in a secure object"
}
},
"vmSku": {
"type": "string",
"defaultValue": "Standard_A1",
"metadata": {
"description": "Size of VMs in the VM Scale Set."
}
},
"centOSVersion": {
"type": "string",
"defaultValue": "7.4",
"allowedValues": [
"6.8",
"7.3",
"7.4"
]
},
"instanceCount": {
"type": "int",
"defaultValue": 2,
"metadata": {
"description": "Number of VM instances (100 or less)."
},
"maxValue": 100
},
"adminUsername": {
"type": "string",
"metadata": {
"description": "Admin username on all VMs."
}
},
"adminPassword": {
"type": "securestring",
"metadata": {
"description": "Admin password on all VMs."
}
},
"storageAccountName": {
"type": "string",
"metadata": {
"description": "Storage account where deployment resources are stored."
}
},
"storageAccountKey": {
"type": "string",
"metadata": {
"description": "Storage account key where deployment resources are stored."
}
}
},
"variables": {
"storageAccountName": "[concat(uniquestring(resourceGroup().id), 'sa')]",
"keyVaultName": "[concat(uniquestring(resourceGroup().id), 'kv')]",
"domainNameLabel": "[concat(uniquestring(resourceGroup().id), 'redisrqprocessing')]",
"vnetID": "[resourceId('Microsoft.Network/virtualNetworks', variables('virtualNetworkName'))]",
"subnetRef": "[concat(variables('vnetID'),'/subnets/', variables('schedulerSubnetName'))]",
"publicIPAddressName": "jumpboxPublicIp",
"jumpBoxName": "jbox",
"jumpBoxSAName": "[concat(uniquestring(resourceGroup().id), 'jbox')]",
"jumpBoxOSDiskName": "[concat(variables('jumpBoxName'), 'osdisk')]",
"jumpBoxVHDContainerName": "[concat(variables('jumpBoxName'), 'vhd')]",
"jumpBoxIPConfigName": "[concat(variables('jumpBoxName'), 'ipconfig')]",
"jumpBoxNicName": "[concat(variables('jumpBoxName'), 'nic')]",
"osType": {
"publisher": "OpenLogic",
"offer": "CentOS",
"sku": "[parameters('centOSVersion')]",
"version": "latest"
},
"imageReference": "[variables('osType')]",
"virtualNetworkName": "vnet",
"addressPrefix": "10.0.0.0/16",
"redisName": "redisvm",
"redisOSDiskName": "[concat(variables('redisName'), 'osdisk')]",
"redisVHDContainerName": "[concat(variables('redisName'), 'vhd')]",
"redisIPConfigName": "[concat(variables('redisName'), 'ipconfig')]",
"redisNicName": "[concat(variables('redisName'), 'nic')]",
"redisSetupScriptName": "install_redis.sh",
"redisSetupScriptUrl": "[concat('https://gist.githubusercontent.com/ali92hm/ff750eb1bee80338e0b05170ff2d4531/raw/95fa19a83b387f82b09826f094916c4d02c55793/',variables('redisSetupScriptName'))]",
"redisSetupScriptCommand": "[concat('bash ', variables('redisSetupScriptName'))]",
"redisNicIpAddress": "10.0.1.40",
"workerSubnetName": "workerSubnet",
"workerSubnetPrefix": "10.0.2.0/24",
"workerVmssName": "workerVmss",
"workerNicName": "[concat(variables('workerVmssName'), 'nic')]",
"workerIpConfigName": "[concat(variables('workerVmssName'), 'ipconfig')]",
"schedulerSubnetName": "schedulerSubnet",
"schedulerSubnetPrefix": "10.0.1.0/24",
"schedulerVmssName": "schedulerVmss",
"schedulerNicName": "[concat(variables('schedulerVmssName'), 'nic')]",
"schedulerIpConfigName": "[concat(variables('schedulerVmssName'), 'ipconfig')]"
},
"resources": [
{
"type": "Microsoft.Storage/storageAccounts",
"name": "[variables('storageAccountName')]",
"apiVersion": "2016-01-01",
"location": "[resourceGroup().location]",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "Storage",
"properties": {}
},
{
"type": "Microsoft.Storage/storageAccounts",
"name": "[variables('jumpBoxSAName')]",
"apiVersion": "2016-01-01",
"location": "[resourceGroup().location]",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "Storage",
"properties": {}
},
{
"type": "Microsoft.KeyVault/vaults",
"name": "[variables('keyVaultName')]",
"apiVersion": "2015-06-01",
"location": "[resourceGroup().location]",
"tags": {
"displayName": "KeyVault"
},
"properties": {
"enabledForDeployment": "[parameters('enabledForDeployment')]",
"enabledForTemplateDeployment": "[parameters('enabledForTemplateDeployment')]",
"enabledForVolumeEncryption": "[parameters('enableVaultForVolumeEncryption')]",
"tenantId": "[parameters('tenantId')]",
"accessPolicies": "[parameters('accessPolicies')]",
"sku": {
"name": "[parameters('vaultSku')]",
"family": "A"
}
}
},
{
"type": "Microsoft.KeyVault/vaults/secrets",
"name": "[concat(variables('keyVaultName'), '/', parameters('secretsObject').secrets[copyIndex()].secretName)]",
"apiVersion": "2015-06-01",
"properties": {
"value": "[parameters('secretsObject').secrets[copyIndex()].secretValue]"
},
"dependsOn": [
"[concat('Microsoft.KeyVault/vaults/', variables('keyVaultName'))]"
],
"copy": {
"name": "secretsCopy",
"count": "[length(parameters('secretsObject').secrets)]"
}
},
{
"type": "Microsoft.Network/virtualNetworks",
"name": "[variables('virtualNetworkName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-06-01",
"properties": {
"addressSpace": {
"addressPrefixes": [
"[variables('addressPrefix')]"
]
},
"subnets": [
{
"name": "[variables('schedulerSubnetName')]",
"properties": {
"addressPrefix": "[variables('schedulerSubnetPrefix')]"
}
},
{
"name": "[variables('workerSubnetName')]",
"properties": {
"addressPrefix": "[variables('workerSubnetPrefix')]"
}
}
]
}
},
{
"type": "Microsoft.Network/publicIPAddresses",
"name": "[variables('publicIPAddressName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-04-01",
"properties": {
"publicIPAllocationMethod": "Dynamic",
"dnsSettings": {
"domainNameLabel": "[variables('domainNameLabel')]"
}
}
},
{
"type": "Microsoft.Network/networkInterfaces",
"name": "[variables('jumpBoxNicName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-04-01",
"dependsOn": [
"[concat('Microsoft.Network/publicIPAddresses/', variables('publicIPAddressName'))]",
"[concat('Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'))]"
],
"properties": {
"ipConfigurations": [
{
"name": "[variables('jumpBoxIPConfigName')]",
"properties": {
"privateIPAllocationMethod": "Dynamic",
"publicIPAddress": {
"id": "[resourceId('Microsoft.Network/publicIPAddresses',variables('publicIPAddressName'))]"
},
"subnet": {
"id": "[variables('subnetRef')]"
}
}
}
]
}
},
{
"type": "Microsoft.Network/networkInterfaces",
"name": "[variables('redisNicName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-04-01",
"dependsOn": [
"[concat('Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'))]"
],
"properties": {
"ipConfigurations": [
{
"name": "[variables('redisIPConfigName')]",
"properties": {
"privateIPAllocationMethod": "Static",
"privateIPAddress": "[variables('redisNicIpAddress')]",
"subnet": {
"id": "[variables('subnetRef')]"
}
}
}
]
}
},
{
"type": "Microsoft.Compute/virtualMachines",
"name": "[variables('jumpBoxName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-03-30",
"dependsOn": [
"[concat('Microsoft.Storage/storageAccounts/', variables('jumpBoxSAName'))]",
"[concat('Microsoft.Network/networkInterfaces/', variables('jumpBoxNicName'))]"
],
"properties": {
"hardwareProfile": {
"vmSize": "[parameters('vmSku')]"
},
"osProfile": {
"computerName": "[variables('jumpBoxName')]",
"adminUsername": "[parameters('adminUsername')]",
"adminPassword": "[parameters('adminPassword')]"
},
"storageProfile": {
"imageReference": "[variables('imageReference')]",
"osDisk": {
"name": "[variables('jumpBoxOSDiskName')]",
"vhd": {
"uri": "[concat('http://',variables('jumpBoxSAName'),'.blob.core.windows.net/',variables('jumpBoxVHDContainerName'),'/',variables('jumpBoxOSDiskName'),'.vhd')]"
},
"caching": "ReadWrite",
"createOption": "FromImage"
}
},
"networkProfile": {
"networkInterfaces": [
{
"id": "[resourceId('Microsoft.Network/networkInterfaces',variables('jumpBoxNicName'))]"
}
]
},
"diagnosticsProfile": {
"bootDiagnostics": {
"enabled": "true",
"storageUri": "[concat('http://',variables('jumpBoxSAName'),'.blob.core.windows.net')]"
}
}
}
},
{
"type": "Microsoft.Compute/virtualMachines",
"name": "[variables('redisName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-03-30",
"dependsOn": [
"[concat('Microsoft.Storage/storageAccounts/', variables('jumpBoxSAName'))]",
"[concat('Microsoft.Network/networkInterfaces/', variables('redisNicName'))]"
],
"properties": {
"hardwareProfile": {
"vmSize": "[parameters('vmSku')]"
},
"osProfile": {
"computerName": "[variables('redisName')]",
"adminUsername": "[parameters('adminUsername')]",
"adminPassword": "[parameters('adminPassword')]"
},
"storageProfile": {
"imageReference": "[variables('imageReference')]",
"osDisk": {
"name": "[variables('redisOSDiskName')]",
"vhd": {
"uri": "[concat('http://',variables('jumpBoxSAName'),'.blob.core.windows.net/',variables('redisVHDContainerName'),'/',variables('redisOSDiskName'),'.vhd')]"
},
"caching": "ReadWrite",
"createOption": "FromImage"
}
},
"networkProfile": {
"networkInterfaces": [
{
"id": "[resourceId('Microsoft.Network/networkInterfaces',variables('redisNicName'))]"
}
]
},
"diagnosticsProfile": {
"bootDiagnostics": {
"enabled": "true",
"storageUri": "[concat('http://',variables('jumpBoxSAName'),'.blob.core.windows.net')]"
}
}
},
"resources": [
{
"name": "redissetup",
"type": "extensions",
"location": "[resourceGroup().location]",
"apiVersion": "2015-06-15",
"dependsOn": [
"[concat('Microsoft.Compute/virtualMachines/', variables('redisName'))]"
],
"properties": {
"publisher": "Microsoft.Azure.Extensions",
"type": "CustomScript",
"typeHandlerVersion": "2.0",
"autoUpgradeMinorVersion": true,
"settings": {
"fileUris": [
"[variables('redisSetupScriptUrl')]"
],
"commandToExecute": "[variables('redisSetupScriptCommand')]"
}
}
}
]
},
{
"type": "Microsoft.Compute/virtualMachineScaleSets",
"name": "[variables('schedulerVmssName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-03-30",
"dependsOn": [
"[concat('Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'))]",
"[concat('Microsoft.Compute/virtualMachines/', variables('redisName'))]"
],
"sku": {
"name": "[parameters('vmSku')]",
"tier": "Standard",
"capacity": "1"
},
"properties": {
"overprovision": "false",
"upgradePolicy": {
"mode": "Automatic"
},
"virtualMachineProfile": {
"storageProfile": {
"osDisk": {
"createOption": "FromImage"
},
"imageReference": "[variables('imageReference')]"
},
"osProfile": {
"computerNamePrefix": "[variables('schedulerVmssName')]",
"adminUsername": "[parameters('adminUsername')]",
"adminPassword": "[parameters('adminPassword')]"
},
"networkProfile": {
"networkInterfaceConfigurations": [
{
"name": "[variables('schedulerNicName')]",
"properties": {
"primary": "true",
"ipConfigurations": [
{
"name": "[variables('schedulerIpConfigName')]",
"properties": {
"subnet": {
"id": "[concat('/subscriptions/', subscription().subscriptionId,'/resourceGroups/', resourceGroup().name, '/providers/Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'), '/subnets/', variables('schedulerSubnetName'))]"
}
}
}
]
}
}
]
},
"extensionProfile": {
"extensions": [
{
"name": "CustomScriptExtension",
"properties": {
"publisher": "Microsoft.Azure.Extensions",
"type": "CustomScript",
"typeHandlerVersion": "2.0",
"autoUpgradeMinorVersion": true,
"settings": {
"fileUris": [
"https://config.blob.core.windows.net/scheduler-node-files/scheduler_bootstrap.sh",
"https://config.blob.core.windows.net/scheduler-node-files/schedulerscripts.tar.gz"
]
},
"protectedSettings": {
"commandToExecute": "[concat('./scheduler_bootstrap.sh ', variables('redisNicIpAddress'), ' &')]",
"storageAccountName": "[parameters('storageAccountName')]",
"storageAccountKey": "[parameters('storageAccountKey')]"
}
}
}
]
}
}
}
},
{
"type": "Microsoft.Compute/virtualMachineScaleSets",
"name": "[variables('workerVmssName')]",
"location": "[resourceGroup().location]",
"apiVersion": "2017-03-30",
"dependsOn": [
"[concat('Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'))]",
"[concat('Microsoft.Compute/virtualMachines/', variables('redisName'))]"
],
"sku": {
"name": "[parameters('vmSku')]",
"tier": "Standard",
"capacity": "[parameters('instanceCount')]"
},
"properties": {
"overprovision": "true",
"upgradePolicy": {
"mode": "Automatic"
},
"virtualMachineProfile": {
"storageProfile": {
"osDisk": {
"createOption": "FromImage"
},
"imageReference": "[variables('imageReference')]"
},
"osProfile": {
"computerNamePrefix": "[variables('workerVmssName')]",
"adminUsername": "[parameters('adminUsername')]",
"adminPassword": "[parameters('adminPassword')]"
},
"networkProfile": {
"networkInterfaceConfigurations": [
{
"name": "[variables('workerNicName')]",
"properties": {
"primary": "true",
"ipConfigurations": [
{
"name": "[variables('workerIpConfigName')]",
"properties": {
"subnet": {
"id": "[concat('/subscriptions/', subscription().subscriptionId,'/resourceGroups/', resourceGroup().name, '/providers/Microsoft.Network/virtualNetworks/', variables('virtualNetworkName'), '/subnets/', variables('workerSubnetName'))]"
}
}
}
]
}
}
]
},
"extensionProfile": {
"extensions": [
{
"name": "CustomScriptExtension",
"properties": {
"publisher": "Microsoft.Azure.Extensions",
"type": "CustomScript",
"typeHandlerVersion": "2.0",
"autoUpgradeMinorVersion": true,
"settings": {
"fileUris": [
"https://config.blob.core.windows.net/processor-node-files/processor_bootstrap.sh",
"https://config.blob.core.windows.net/processor-node-files/processorscripts.tar.gz"
]
},
"protectedSettings": {
"commandToExecute": "[concat('./processor_bootstrap.sh ', variables('redisNicIpAddress'), ' &')]",
"storageAccountName": "[parameters('storageAccountName')]",
"storageAccountKey": "[parameters('storageAccountKey')]"
}
}
}
]
}
}
}
}
],
"outputs": {
"storageAccountName": {
"type": "string",
"value": "[variables('storageAccountName')]"
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
{
"$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"storageAccountType": {
"value": "Standard_GRS"
},
"tenantId": {
"value": ""
},
"accessPolicies": {
"value": [
{
"tenantId": "",
"objectId": "",
"permissions": {
"keys": [
"all"
],
"secrets": [
"all"
]
}
}
]
},
"secretsObject": {
"value": {
"secrets": [
{
"secretName": "adminUsername",
"secretValue": "p@ssw0rd1!"
},
{
"secretName": "adminPassword",
"secretValue": "p@ssw0rd1!"
}
]
}
},
"adminUsername": {
"value" : "admin"
},
"adminPassword": {
"value" : "p@ssw0rd1!"
},
"storageAccountName": {
"value" : "redisrqprocessing"
},
"storageAccountKey": {
"value" : "thekey"
}
}
}

Просмотреть файл

@ -0,0 +1,16 @@
#!/bin/bash
sudo yum -y update
sudo yum -y install epel-release
sudo yum -y update
sudo yum -y install redis
# start redis
sudo systemctl start redis
# start redis on startup
sudo systemctl enable redis
# accept connection from outside of VM (make sure you are using a vnet and NSG)
echo "bind 0.0.0.0" >> /etc/redis.conf
echo "appendonly yes" >> /etc/redis.conf
sudo systemctl restart redis

4
build.sh Normal file
Просмотреть файл

@ -0,0 +1,4 @@
set PYTHONPATH=.
python scripts/CopyScriptsForArmTemplate.py
read -p "Press any key to exit..."

Двоичные данные
data/aes.encrypted Normal file

Двоичный файл не отображается.

10
data/data Normal file
Просмотреть файл

@ -0,0 +1,10 @@
1
2
3
4
5
6
7
8
9
10

11
data/data.encrypted Normal file
Просмотреть файл

@ -0,0 +1,11 @@
Ãó'Œp)X²Qȯ•
ìÛ–ŒÁ§Nð£¯Q
»uú
5ãÅ€ÝöçO™<EFBFBD>V8
š, ¬q !÷ÕbJœL
<EFBFBD>š&Ü0s­7b‰Eí²
×>äk+ºI„u—¿:V
E÷kOd/ÃÆÎÖDäëú-
Û"QðµI=n(²Ù¤å<C2A4>
à¢ÓÁ2$´ãˆàˆÊ&`
sƒëÓ4…H|úC´Å

Двоичные данные
data/job-scheduler.encrypted Normal file

Двоичный файл не отображается.

61
docker-compose.yaml Normal file
Просмотреть файл

@ -0,0 +1,61 @@
version: '3'
services:
redis:
container_name: redis
image: redis
ports:
- 6379:6379
scheduler:
container_name: scheduler
build: ./docker-files/python27
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
command: python scheduler.py /usr/src/data/data.encrypted --redisHost redis --redisPort 6379
volumes:
- ./app:/usr/src/app
- ./data:/usr/src/data
processor:
container_name: processor
build: ./docker-files/python27
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
command: python processor.py ../data/aes.encrypted --redisHost redis --redisPort 6379
volumes:
- ./app:/usr/src/app
- ./data:/usr/src/data
depends_on:
- redis
dashboard:
container_name: rq-dashboard
build: ./docker-files/rq-dashboard
ports:
- 8080:9181
environment:
- RQ_DASHBOARD_REDIS_HOST=redis
depends_on:
- redis
schedulernode:
container_name: schedulernode
image: centos:7
working_dir: /usr/src/app
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
command: bash scheduler_bootstrap.sh
volumes:
- ./app:/usr/src/app
processornode:
container_name: processornode
image: centos:7
working_dir: /usr/src/app
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
command: bash processor_bootstrap.sh
volumes:
- ./app:/usr/src/app

Просмотреть файл

@ -0,0 +1,13 @@
FROM centos:7
RUN yum -y update && \
yum -y install epel-release && \
yum -y install python-pip
# Add your pip packages
RUN pip install rq && \
pip install azure-keyvault
WORKDIR /usr/src/app
CMD ["python"]

Просмотреть файл

@ -0,0 +1,4 @@
FROM python:2.7-slim
RUN pip install rq-dashboard
CMD ["rq-dashboard"]

Просмотреть файл

@ -0,0 +1,10 @@
echo "This will create new AES key and update files in Azure and locally."
echo "Do you want to continue?"
read choice
if [[ $choice = y ]]
then
set PYTHONPATH=.
python scripts/GenerateEncryptedFiles.py
read -p "Press any key to exit..."
fi

Просмотреть файл

@ -0,0 +1,109 @@
# cd <root>
# set PYTHONPATH=.
# python scripts/CopyScriptsForArmTemplate.py
from azure.storage.blob import BlockBlobService
from app.config import Config
from app.aescipher import AESCipher
from app.aeskeywrapper import AESKeyWrapper
import tarfile
import os
config = Config()
encrypted_aes_key_filename = "data/aes.encrypted"
unencrypted_script_filename = "app/scheduler.py"
encrypted_script_filename = config.encrypted_files_folder + "/" + config.encrypted_scheduler_script_filename
basescripts = [
'config.py',
'aescipher.py',
'aeskeywrapper.py',
'functions.py',
'jobstatus.py',
]
processorscripts = [
'processorconfiguration.py',
'processor.py',
]
schedulerscripts = [
'schedulerconfiguration.py',
]
os.chdir('app')
print 'Preparing ProcessorScripts.tar.gz...'
with tarfile.open("../processorscripts.tar.gz", "w:gz") as tar:
for script in basescripts + processorscripts:
tar.add(name=script)
print ' +' + script
print 'Preparing SchedulerScripts.tar.gz...'
with tarfile.open("../schedulerscripts.tar.gz", "w:gz") as tar:
for script in basescripts + schedulerscripts:
tar.add(name=script)
print ' +' + script
print 'tar.gz files created.'
os.chdir('..')
blob_service = BlockBlobService(account_name=config.storage_account_name, sas_token=config.encrypted_files_sas_token)
blob_service.create_container(container_name = 'scheduler-node-files')
blob_service.create_blob_from_path(container_name='scheduler-node-files',
blob_name='scheduler_bootstrap.sh',
file_path='app/scheduler_bootstrap.sh')
print 'Scheduler_bootstrap.sh is uploaded'
blob_service.create_blob_from_path(container_name='scheduler-node-files',
blob_name='schedulerscripts.tar.gz',
file_path='schedulerscripts.tar.gz')
print 'SchedulerScripts.tar.gz is uploaded'
blob_service.create_container(container_name = 'processor-node-files')
blob_service.create_blob_from_path(container_name='processor-node-files',
blob_name='processor_bootstrap.sh',
file_path='app/processor_bootstrap.sh')
print 'Processor_bootstrap.sh is uploaded'
blob_service.create_blob_from_path(container_name='processor-node-files',
blob_name='processorscripts.tar.gz',
file_path='processorscripts.tar.gz')
print 'ProcessorScripts.tar.gz is uploaded'
blob_service.create_container(container_name = 'configuration-files')
blob_service.create_blob_from_path(container_name='configuration-files',
blob_name='config.py',
file_path='app/config.py')
print 'config.py is uploaded'
# Decode AES key
wrapper = AESKeyWrapper(vault = config.azure_keyvault_url,
client_id = config.azure_keyvault_client_id,
secret = config.azure_keyvault_secret,
tenant = config.azure_keyvault_tenant_id,
key_name = config.azure_keyvault_key_name,
key_version = config.azure_keyvault_key_version)
with open(encrypted_aes_key_filename, "rb") as aes_key_file:
wrapped_key = aes_key_file.read()
keys = wrapper.unwrap_aes_key(wrapped_key)
key = keys[:config.aes_key_length]
iv = keys[config.aes_key_length:]
print 'AES key decrypted'
# Decode script
aes_cipher = AESCipher(key, iv)
aes_cipher.encrypt_file_save_file(unencrypted_script_filename, encrypted_script_filename)
print unencrypted_script_filename + ' is encrypted and saved to ' + encrypted_script_filename
blob_service.create_blob_from_path(container_name=config.storage_container_name,
blob_name=config.encrypted_scheduler_script_filename,
file_path=encrypted_script_filename)
print encrypted_script_filename + " is uploaded to Azure.Storage"

Просмотреть файл

@ -0,0 +1,69 @@
import os
import random
import string
from azure.storage.blob import BlockBlobService
from azure.storage.blob import ContentSettings
from app.aescipher import AESCipher
from app.aeskeywrapper import AESKeyWrapper
from app.config import Config
config = Config()
script_filename = "app/scheduler.py"
unencrypted_records_file = 'data/data'
script_encrypted_filename = config.encrypted_files_folder + "/" + config.encrypted_scheduler_script_filename
aes_key_encrypted_filename = config.encrypted_files_folder + "/" + config.encrypted_aes_key_filename
encrypted_record_file = config.encrypted_files_folder + "/" + config.encrypted_data_filename
wrapper = AESKeyWrapper(vault = config.azure_keyvault_url,
client_id = config.azure_keyvault_client_id,
secret = config.azure_keyvault_secret,
tenant = config.azure_keyvault_tenant_id,
key_name = config.azure_keyvault_key_name,
key_version = config.azure_keyvault_key_version)
# Generate new key\iv pair
aes_key = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
aes_iv = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(16))
keys = aes_key + aes_iv
print 'AES key generated. Key:' + aes_key + " IV:" + aes_iv
# Save encrypted keys
encrypted_keys = wrapper.wrap_aes_key_local(keys, wrapper.get_public_key())
with open(aes_key_encrypted_filename, "wb+") as aes_key_encrypted_file:
aes_key_encrypted_file.write(encrypted_keys)
print 'AES key wrapped and saved to ' + aes_key_encrypted_filename
# Encrypt script using generated keys
cipher = AESCipher(aes_key, aes_iv)
cipher.encrypt_file_save_file(script_filename, script_encrypted_filename)
print script_filename + " encrypted and saved to " + script_encrypted_filename
# Encrypt data
with open(encrypted_record_file, 'wb') as encryptedFile:
with open(unencrypted_records_file, 'r') as dataFile:
for record in dataFile:
encryptedRecord = cipher.encrypt(record)
encryptedFile.writelines(encryptedRecord+'\n')
print unencrypted_records_file + " is encrypted and saved to " + encrypted_record_file
# Upload generated files to blob
blob_service = BlockBlobService(account_name=config.storage_account_name, sas_token=config.encrypted_files_sas_token)
blob_service.create_container(config.storage_container_name)
blob_service.create_blob_from_path(container_name=config.storage_container_name,
blob_name=config.encrypted_aes_key_filename,
file_path=aes_key_encrypted_filename)
blob_service.create_blob_from_path(container_name=config.storage_container_name,
blob_name=config.encrypted_scheduler_script_filename,
file_path=script_encrypted_filename)
blob_service.create_blob_from_path(container_name=config.storage_container_name,
blob_name=config.encrypted_data_filename,
file_path=encrypted_record_file)
print "Files succesfully uploaded to Azure.Storage"

0
samples/__init__.py Normal file
Просмотреть файл

48
samples/azureconfig.py Normal file
Просмотреть файл

@ -0,0 +1,48 @@
class AzureConfig(object):
"""
Configuration settings for use in sample code. Users wishing to run this sample can either set these
values as environment values or simply update the hard-coded values below
:ivar subscription_name: Azure subscription name
:vartype subscription_name: str
:ivar resource_group: Azure resource group containing the infra
:vartype resource_group: str
:ivar client_id: Azure Active Directory AppID of the Service Principle to run the sample
:vartype client_id: str
:ivar tenant_id: Azure Active Directory tenant id of the user intending to run the sample
:vartype tenant_id: str
:ivar client_secret: Azure Active Directory Application Key to run the sample
:vartype client_secret: str
:ivar key_vault_uri: URI for Azure Key Vault instance
:vartype key_vault_uri: str
"""
def __init__(self):
# Azure Subscription Name
self.subscription_name = ''
# Subscription Id
self.subscription_id = ''
# Azure Resource Group
self.resource_group = ''
# Azure Active Directory - Directory ID in Azure portal
self.tenant_id = ''
# Your Service Principal Application ID in Azure Portal
self.client_id = ''
# Application Key Value in Azure Portal
self.client_secret = ''
# Your Key Vault URI
self.key_vault_uri = ''
#Key vault API version
self.key_vault_api_version = '2016-10-01'

79
samples/azuremetrics.py Normal file
Просмотреть файл

@ -0,0 +1,79 @@
import platform
import json
import requests
import oauth
from azureconfig import AzureConfig
appVersion = '0.1' # version of this python app
config = AzureConfig()
def getUserAgent():
user_agent = "python/{} ({}) requests/{} app/{}".format(
platform.python_version(),
platform.platform(),
requests.__version__,
appVersion)
return user_agent
def getAccessToken():
#context = adal.AuthenticationContext('https://login.microsoftonline.com/' + config.tenant_id)
#token_response = context.acquire_token_with_client_credentials('https://management.core.windows.net/', config.client_id, config.client_secret)
token_response = oauth.acquire_token_with_client_credentials(
authority = 'https://login.microsoftonline.com/' + config.tenant_id,
resource = 'https://management.core.windows.net/',
clientId = config.client_id,
secret = config.client_secret
)
return token_response.get('accessToken')
def httpGet(uri, accessToken):
headers = {"Authorization": 'Bearer ' + accessToken}
headers['User-Agent'] = getUserAgent()
return requests.get(uri, headers=headers).json()
def buildAzureMetricsBaseUri(resourceGroup, resourceProvider, resourceType, resourceName):
baseUri = ''.join(['https://management.azure.com',
'/subscriptions/', config.subscription_id,
'/resourceGroups/', resourceGroup,
'/providers/', resourceProvider,
'/', resourceType,
'/', resourceName,
'/providers/microsoft.insights/'])
return baseUri
def getMetrics(resourceGroup, resourceProvider, resourceType, resourceName):
baseUri = buildAzureMetricsBaseUri(resourceGroup, resourceProvider, resourceType, resourceName)
uri = baseUri + 'metrics?api-version=2017-05-01-preview'
accessToken = getAccessToken()
return httpGet(uri, accessToken)
def getMetricsDefinition(resourceGroup, resourceProvider, resourceType, resourceName):
baseUri = buildAzureMetricsBaseUri(resourceGroup, resourceProvider, resourceType, resourceName)
uri = baseUri + 'metricDefinitions/providers/microsoft.insights/metricDefinitions?api-version=2017-05-01-preview'
accessToken = getAccessToken()
return httpGet(uri, accessToken)
if __name__ == "__main__":
# Resource Providers and Types https://docs.microsoft.com/en-us/azure/monitoring-and-diagnostics/monitoring-rest-api-walkthrough#retrieve-metric-values
# Virtual machine scale sets | Microsoft.Compute | virtualMachineScaleSets
# VMs | Microsoft.Compute | virtualMachines
# Get the metrics available for VM SS
vmssMetricsDefinitionJson = getMetricsDefinition('resourcegroupname', 'Microsoft.Compute', 'virtualMachineScaleSets', 'vmssname')
print('------------- Azure Metrics Definition Response ------------------')
print(json.dumps(vmssMetricsDefinitionJson, sort_keys=False, indent=2, separators=(',', ': ')))
# Get the actual perf metrics for an example VM SS
vmssMetricsJson = getMetrics('resourcegroupname', 'Microsoft.Compute', 'virtualMachineScaleSets', 'vmssname')
print('------------- Azure Metrics Response ------------------')
print(json.dumps(vmssMetricsJson, sort_keys=False, indent=2, separators=(',', ': ')))
# Get the metrics available for VMs
vmssMetricsDefinitionJson = getMetricsDefinition('resourcegroupname', 'Microsoft.Compute', 'virtualMachines', 'vmname')
print('------------- Azure Metrics Definition Response ------------------')
print(json.dumps(vmssMetricsDefinitionJson, sort_keys=False, indent=2, separators=(',', ': ')))
vmssMetricsJson = getMetrics('resourcegroupname', 'Microsoft.Compute', 'virtualMachines', 'vmname')
print('------------- Azure Metrics Response ------------------')
print(json.dumps(vmssMetricsJson, sort_keys=False, indent=2, separators=(',', ': ')))

17
samples/blobstorage.py Normal file
Просмотреть файл

@ -0,0 +1,17 @@
from azure.storage.blob import BlockBlobService
if __name__ == "__main__":
# init logging
#initLogging()
# instantiate the client
blobService = BlockBlobService(account_name='blobname', account_key='insertkey')
# list all blobs in a container
container = blobService.list_blobs('public')
for blob in container:
print("Blob found: " + blob.name)
# get a blob
blobService.get_blob_to_path('public', 'testimage.PNG', 'out-image.png')
print("Blob downloaded as out-image.png")

Просмотреть файл

@ -0,0 +1,167 @@
import pickle
from datetime import datetime
from enum import IntEnum
from azure.cosmosdb.table import TableService, Entity
from azure.storage.queue import QueueService, models
from config import Config
class JobState(IntEnum):
none = 0
queued = 1
processing = 2
processed = 3
done = 4
failed = 5
class JobStatus(object):
"""
Class for managing job status records.
"""
def __init__(self, logger):
"""
Initializes a new instance of the JobStatus class.
:param logger logger: The logger instance to use for logging.
"""
self.logger = logger
self.config = Config()
if(self.init_storage_services() is False):
raise Exception("Errors occured instantiating job status storage service.")
if(self.init_storage() is False):
raise Exception("Errors occured validating job status table exists.")
def init_storage_services(self):
"""
Initializes the storage service clients using values from config.py.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# creates instance of TableService to use for job status storage
self.storage_service_table = TableService(account_name = self.config.job_status_storage, sas_token = self.config.job_status_sas_token)
# creates instance of QueueService to use for completed job status storage
self.storage_service_queue = QueueService(account_name = self.config.job_status_storage, sas_token = self.config.job_status_sas_token)
# set the encode function for objects stored as queue message to noencode, serialization will be handled as a string by pickle
# http://azure-storage.readthedocs.io/en/latest/ref/azure.storage.queue.queueservice.html
# http://azure-storage.readthedocs.io/en/latest/_modules/azure/storage/queue/models.html
self.storage_service_queue.encode_function = models.QueueMessageFormat.noencode
return True
except Exception as ex:
self.log_exception(ex, self.init_storage_services.__name__)
return False
def init_storage(self):
"""
Initializes storage table & queue, creating it if it doesn't exist.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
# will create job status table and queue if either doesn't exist
self.storage_service_table.create_table(self.config.job_status_storage)
self.storage_service_queue.create_queue(self.config.job_status_storage)
return True
except Exception as ex:
self.log_exception(ex, self.init_storage.__name__)
return False
def log_exception(self, exception, functionName):
"""
Logs an exception to the logger instance for this class.
:param Exeption exception: The exception thrown.
:param str functionName: Name of the function where the exception occurred.
"""
self.logger.debug("Exception occurred in: " + functionName)
self.logger.debug(type(exception))
self.logger.debug(exception)
def add_job_status(self, jobName, jobId, jobState):
"""
Adds a new job status record.
:param str jobName: The name of the job, this will be used as the partition key for the table.
:param str jobId: Id for the job.
:param JobState jobState: Enum for the current job state.
:return: True on succeess. False on failure.
:rtype: boolean
"""
record = Entity()
record.PartitionKey = jobName
record.RowKey = jobId
record.Created = datetime.utcnow()
record.State = int(jobState)
try:
self.storage_service_table.insert_entity(self.config.job_status_storage, record)
return True
except Exception as ex:
self.log_exception(ex, self.add_job_status.__name__)
return False
def get_job_status(self, jobName, jobId):
"""
Gets a job status record from storage.
:param str jobName: The name of the job, this will be used as the partition key for the table.
:param str jobId: Id for the job.
:return: Entity record on success. None on failure.
:rtype: Entity or None
"""
try:
record = self.storage_service_table.get_entity(self.config.job_status_storage, jobName, jobId)
return record
except Exception as ex:
self.log_exception(ex, self.get_job_status.__name__)
return None
def update_job_status(self, jobName, jobId, jobState, error = None):
"""
Updates a job status record.
:param str jobName: The name of the job, this will be used as the partition key for the table.
:param str jobId: Id for the job.
:param JobState jobState: Enum for the current job state.
:param error: Optional parameter to provide error details for failed state.
:type error: str or None
:return: True on succeess. False on failure.
:rtype: boolean
"""
record = self.get_job_status(jobName, jobId)
record.PartitionKey = jobName
record.RowKey = jobId
record.LastUpdated = datetime.utcnow()
record.State = int(jobState)
if(error is not None):
record.LastError = error
try:
# write the job status record out to table storage
self.storage_service_table.update_entity(self.config.job_status_storage, record)
# if the job is complete or failed, write it out to the queue
if(jobState is JobState.done or jobState is JobState.failed):
self.queue_job_status(record)
return True
except Exception as ex:
self.log_exception(ex, self.update_job_status.__name__)
return False
def queue_job_status(self, jobStatus):
"""
Queues at job status record
:param object jobStatus: The job status record to store in the queue message.
:return: True on succeess. False on failure.
:rtype: boolean
"""
try:
jobStatusSerialized = pickle.dumps(jobStatus)
self.storage_service_queue.put_message(self.config.job_status_storage, jobStatusSerialized)
return True
except Exception as ex:
self.log_exception(ex, self.queue_job_status.__name__)
return False

Просмотреть файл

@ -0,0 +1,40 @@
# https://docs.microsoft.com/en-us/python/azure/python-sdk-azure-authenticate?view=azure-python#mgmt-auth-msi
# Enable MSI (Managed Service Identity) on VM
# Add VM Service Principal to Key Vault access via Key Vault Access Policy
# https://docs.microsoft.com/en-us/azure/active-directory/msi-tutorial-linux-vm-access-arm
from msrestazure.azure_active_directory import MSIAuthentication
from azure.keyvault import KeyVaultClient
from azureconfig import AzureConfig
from secrets_manager import SecretsManager
import logging
logger = logging.getLogger(__name__)
def initLogging():
logger.setLevel(logging.DEBUG)
# setup a console logger by default
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if __name__ == "__main__":
# init logging
initLogging()
# Create MSI Authentication credentials
credentials = MSIAuthentication()
# get the Azure config
config = AzureConfig()
# instantiate the key vault client
client = KeyVaultClient(credentials)
secretsMgr = SecretsManager(client, config.key_vault_uri, logger)
secretName = 'SuperSecret2'
secret = secretsMgr.getSecret(secretName)
logger.info(secret.id + " " + secret.value)

Просмотреть файл

@ -0,0 +1,49 @@
# install python
# install pip https://pip.pypa.io/en/stable/installing/
# install azure keyvault sdk -> sudo pip install --ignore-installed azure-keyvault
# https://docs.microsoft.com/en-us/python/api/overview/azure/key-vault?view=azure-python
# create an Azure Service Principal for resource access
# https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-create-service-principal-portal?view=azure-cli-latest
# add the service principal application to the Key Vault access policies in the Azure Portal
from azure.keyvault import KeyVaultClient, KeyVaultId
from azure.common.credentials import ServicePrincipalCredentials
from azureconfig import AzureConfig
from secrets_manager import SecretsManager
import logging
logger = logging.getLogger(__name__)
def initLogging():
logger.setLevel(logging.DEBUG)
# setup a console logger by default
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if __name__ == "__main__":
# init logging
initLogging()
# get the azure config and setup the creds
config = AzureConfig()
credentials = ServicePrincipalCredentials(
client_id = config.client_id,
secret = config.client_secret,
tenant = config.tenant_id
)
# instantiate the client
client = KeyVaultClient(credentials)
secretsMgr = SecretsManager(client, config.key_vault_uri, logger)
secretName = 'SuperSecret2'
secret = secretsMgr.getSecret(secretName)
logger.info(secret.id + " " + secret.value)

65
samples/localkeywrap.py Normal file
Просмотреть файл

@ -0,0 +1,65 @@
# Load modules
import base64
from azure.keyvault import KeyVaultClient
from azure.common.credentials import ServicePrincipalCredentials
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives import hashes
# Tests only
import random, string
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def bytes_to_int(bytes):
result = 0
for b in bytes:
result = result * 256 + ord(b)
return result
def wrap_aes_key(aes_key, public_json_key):
int_n = bytes_to_int(json_key.n)
int_e = bytes_to_int(json_key.e)
public_numbers = RSAPublicNumbers(int_e, int_n)
public_key = public_numbers.public_key(default_backend())
wrapped_key = public_key.encrypt(aes_key, padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
return wrapped_key
# Azure setup
aes_key = ''
azure_vault = ''
credentials = ServicePrincipalCredentials(
client_id = '',
secret = '',
tenant = '',
)
client = KeyVaultClient(
credentials
)
key_bundle = client.get_key(azure_vault, '', '')
json_key = key_bundle.key
for i in range(100):
key = randomword(24)
wrapped_key = wrap_aes_key(key, json_key)
restored_aes_key = client.unwrap_key(azure_vault, '', '', 'RSA-OAEP', wrapped_key)
if key != restored_aes_key.result:
print("==========================")
print(key)
print("--------------------------")
print(restored_aes_key.result)
print("")

54
samples/oauth.py Normal file
Просмотреть файл

@ -0,0 +1,54 @@
# install python
# >>> pip install requests
# sample object returned by the acquire_token_with_client_credentials method
#{
# 'tokenType': 'Bearer',
# 'expiresIn': 3600,
# 'expiresOn': '2017-10-30 12:56:39.436417',
# 'resource': '<Resource>',
# 'accessToken': '<Token>',
# 'isMRRT': True,
# '_clientId': '<ClientId>',
# '_authority': 'Authority'
#}
import requests
from azureconfig import AzureConfig
config = AzureConfig()
# all requests are posted through this method. Replace this single method if reqeusts is not available.
def post_request(url, headers, body):
response = requests.post(url, headers=headers, data=body)
return response
def acquire_token_with_client_credentials(authority, resource, clientId, secret):
url = authority + '/oauth2/token'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
body = {'grant_type':'client_credentials'}
body['client_id'] = clientId
body['client_secret'] = secret
body['resource'] = resource
response = post_request(url, headers, body)
token = {'tokenType' : response.json()['token_type']}
token['expiresIn'] = response.json()['expires_in']
token['expiresOn'] = response.json()['expires_on']
token['resource'] = response.json()['resource']
token['accessToken'] = response.json()['access_token']
token['isMRRT'] = True #TODO: Replace hard coded value
token['_clientId'] = clientId
token['_authority'] = authority
return token
if __name__ == '__main__':
tenantId = config.tenant_id # Azure Active Directory - Directory ID in Azure portal
clientId = config.client_id # Application ID in Azure Portal
clientSecret = config.client_secret # Application Key Value in Azure Portal
url = 'https://login.microsoftonline.com/' + tenantId
token = acquire_token_with_client_credentials(url, 'https://management.core.windows.net/', clientId, clientSecret)
print(token)

Просмотреть файл

@ -0,0 +1,81 @@
class SecretsManager(object):
def __init__(self, secretsClient, secretStoreUri, logger):
self.client = secretsClient
self.secret_store_uri = secretStoreUri
self.logger = logger
def logException(self, exception, functionName):
self.logger.debug("Exception occurred in: " + functionName)
self.logger.debug(type(exception))
self.logger.debug(exception)
def getKey(self, name):
try:
# Read a key without version
key = self.client.get_key(self.secret_store_uri, name, '')
return key
except Exception as ex:
self.logException(ex, self.getKey.__name__)
def getSecretVersions(self, name):
# Print a list of versions for a secret
versions = self.client.get_secret_versions(self.secret_store_uri, name)
for version in versions:
print(version.id)
def getSecret(self, name):
# Read a secret without version
try:
secret = self.client.get_secret(self.secret_store_uri, name, '')
return secret
except Exception as ex:
self.logException(ex, self.getSecret.__name__)
def setSecret(self, name, value):
# adds a secret to the Azure Key Vault. If the named secret already exists, Azure Key Vault creates a new version of that secret.
self.client.set_secret(self.secret_store_uri, name, value)
def disableSecret(self, name):
# Update a secret without version
self.client.update_secret(self.secret_store_uri, name, '', secret_attributes={'enabled': False})
def enableSecret(self, name):
# Update a secret without version
self.client.update_secret(self.secret_store_uri, name, '', secret_attributes={'enabled': True})
'''
# Create a key
key_bundle = self.client.create_key(self.secret_store_uri, 'FirstKey', 'RSA')
key_id = KeyVaultId.parse_key_id(key_bundle.key.kid)
# Update a key without version
client.update_key(key_id.vault, key_id.name, key_id.version_none, key_attributes={'enabled': False})
# Update a key with version
client.update_key(key_id.vault, key_id.name, key_id.version, key_attributes={'enabled': False})
# Print a list of versions for a key
versions = self.client.get_key_versions(self.secret_store_uri, 'FirstKey')
for version in versions:
print(version.kid) # https://myvault.vault.azure.net/keys/FirstKey/000102030405060708090a0b0c0d0e0f
# Read a key with version
client.get_key(key_id.vault, key_id.name, key_id.version)
# Delete a key
self.client.delete_key(self.secret_store_uri, 'FirstKey')
# Create a secret
secret_bundle = self.client.set_secret(self.secret_store_uri, 'FirstSecret', 'Hush, that is secret!!')
secret_id = KeyVaultId.parse_secret_id(secret_bundle.id)
# Update a secret with version
self.client.update_key(secret_id.vault, secret_id.name, secret_id.version, secret_attributes={'enabled': False})
# Read a secret with version
self.client.get_secret(secret_id.vault, secret_id.name, secret_id.version)
# Delete a secret
self.client.delete_secret(self.secret_store_uri, 'FirstSecret')
'''

73
samples/setupBlobData.py Normal file
Просмотреть файл

@ -0,0 +1,73 @@
# Grab the primary key
# Execute from parent folder of scripts and apps: PYTHONPATH=. python scripts/setupBlobData.py
from azure.storage.blob import BlockBlobService
from app.config import Config
from app.aescipher import AESCipher
from app.aeskeywrapper import AESKeyWrapper
import logging
logger = logging.getLogger(__name__)
BLOB_NAME = 'processingData'
def initLogging():
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-20s %(levelname)-5s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
if __name__ == "__main__":
# init logging
initLogging()
unencrypted_records_file = 'data/data'
encrypted_record_file = 'data/data.encrypted'
downloaded_encrypted_record_file = 'data/data.dl.encrypted'
config = Config()
# Download encrypted AES key
encrypted_aes_key_filename = "data/aes.encrypted"
# Decode AES key
wrapper = AESKeyWrapper(vault = config.azure_keyvault_url,
client_id = config.azure_keyvault_client_id,
secret = config.azure_keyvault_secret,
tenant = config.azure_keyvault_tenant_id,
key_name = config.azure_keyvault_key_name,
key_version = config.azure_keyvault_key_version)
with open(encrypted_aes_key_filename, "rb") as aes_key_file:
wrapped_key = aes_key_file.read()
keys = wrapper.unwrap_aes_key(wrapped_key)
key = keys[:config.aes_key_length]
iv = keys[config.aes_key_length:]
aes_cipher = AESCipher(key, iv)
# encrypt file records
encryptedData = aes_cipher.encrypt_file(unencrypted_records_file)
with open(encrypted_record_file, 'wb') as encryptedFile:
with open(unencrypted_records_file, 'r') as dataFile:
for record in dataFile:
encryptedRecord = aes_cipher.encrypt(record)
encryptedFile.writelines(encryptedRecord+'\n')
# instantiate the client
blob_service = BlockBlobService(account_name=config.storage_account_name, account_key=config.storage_account_key)
# create container
blob_service.create_container(config.storage_container_name)
# upload file
blob_service.create_blob_from_path(config.storage_container_name, BLOB_NAME, encrypted_record_file)
# download file
blob_service.get_blob_to_path(container_name=config.storage_container_name, blob_name=BLOB_NAME, file_path=downloaded_encrypted_record_file)
# decrypt file records
with open(downloaded_encrypted_record_file, 'rb') as encryptedFile:
for record in encryptedFile:
print aes_cipher.decrypt(record.rstrip('\n'))