зеркало из
1
0
Форкнуть 0
E2E tests DPS
This commit is contained in:
olivakar 2019-08-26 15:51:26 -07:00 коммит произвёл GitHub
Родитель 6127230188
Коммит dd623089ff
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
50 изменённых файлов: 5758 добавлений и 37 удалений

Просмотреть файл

@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__)

Просмотреть файл

@ -0,0 +1,22 @@
# # -------------------------------------------------------------------------
# # Copyright (c) Microsoft Corporation. All rights reserved.
# # Licensed under the MIT License. See License.txt in the project root for
# # license information.
# # --------------------------------------------------------------------------
# from azure.iot.device.common.connection_string import ConnectionString
# from azure.iot.device.common.sastoken import SasToken
#
#
# def connection_string_to_sas_token(conn_str):
# """
# parse an IoTHub service connection string and return the host and a shared access
# signature that can be used to connect to the given hub
# """
# conn_str_obj = ConnectionString(conn_str)
# sas_token = SasToken(
# uri=conn_str_obj.get("HostName"),
# key=conn_str_obj.get("SharedAccessKey"),
# key_name=conn_str_obj.get("SharedAccessKeyName"),
# )
#
# return {"host": conn_str_obj.get("HostName"), "sas": str(sas_token)}

Просмотреть файл

@ -0,0 +1,7 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
__path__ = __import__("pkgutil").extend_path(__path__, __name__)

Просмотреть файл

@ -0,0 +1,58 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.service_client import ServiceClient
from msrest import Configuration, Serializer, Deserializer
from .version import VERSION
from msrest.exceptions import HttpOperationError
from .operations.service_operations import ServiceOperations
from . import models
class IotHubGatewayServiceAPIs20180630Configuration(Configuration):
"""Configuration for IotHubGatewayServiceAPIs20180630
Note that all parameters used to create this instance are saved as instance
attributes.
:param str base_url: Service URL
"""
def __init__(self, base_url=None):
if not base_url:
base_url = "https://fully-qualified-iothubname.azure-devices.net"
super(IotHubGatewayServiceAPIs20180630Configuration, self).__init__(base_url)
self.add_user_agent("iothubgatewayserviceapis20180630/{}".format(VERSION))
class IotHubGatewayServiceAPIs20180630(object):
"""IotHubGatewayServiceAPIs20180630
:ivar config: Configuration for client.
:vartype config: IotHubGatewayServiceAPIs20180630Configuration
:ivar service: Service operations
:vartype service: service20180630.operations.ServiceOperations
:param str base_url: Service URL
"""
def __init__(self, base_url=None):
self.config = IotHubGatewayServiceAPIs20180630Configuration(base_url)
self._client = ServiceClient(None, self.config)
client_models = {k: v for k, v in models.__dict__.items() if isinstance(v, type)}
self.api_version = "2018-06-30"
self._serialize = Serializer(client_models)
self._deserialize = Deserializer(client_models)
self.service = ServiceOperations(
self._client, self.config, self._serialize, self._deserialize
)

Просмотреть файл

@ -0,0 +1,68 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from .configuration_metrics import ConfigurationMetrics
from .configuration_content import ConfigurationContent
from .configuration import Configuration
from .configuration_queries_test_input import ConfigurationQueriesTestInput
from .configuration_queries_test_response import ConfigurationQueriesTestResponse
from .registry_statistics import RegistryStatistics
from .service_statistics import ServiceStatistics
from .symmetric_key import SymmetricKey
from .x509_thumbprint import X509Thumbprint
from .authentication_mechanism import AuthenticationMechanism
from .device_capabilities import DeviceCapabilities
from .device import Device
from .property_container import PropertyContainer
from .export_import_device import ExportImportDevice
from .device_registry_operation_error import DeviceRegistryOperationError
from .device_registry_operation_warning import DeviceRegistryOperationWarning
from .bulk_registry_operation_result import BulkRegistryOperationResult
from .query_specification import QuerySpecification
from .query_result import QueryResult
from .job_properties import JobProperties
from .purge_message_queue_result import PurgeMessageQueueResult
from .twin_properties import TwinProperties
from .twin import Twin
from .cloud_to_device_method import CloudToDeviceMethod
from .job_request import JobRequest
from .device_job_statistics import DeviceJobStatistics
from .job_response import JobResponse
from .module import Module
from .cloud_to_device_method_result import CloudToDeviceMethodResult
__all__ = [
"ConfigurationMetrics",
"ConfigurationContent",
"Configuration",
"ConfigurationQueriesTestInput",
"ConfigurationQueriesTestResponse",
"RegistryStatistics",
"ServiceStatistics",
"SymmetricKey",
"X509Thumbprint",
"AuthenticationMechanism",
"DeviceCapabilities",
"Device",
"PropertyContainer",
"ExportImportDevice",
"DeviceRegistryOperationError",
"DeviceRegistryOperationWarning",
"BulkRegistryOperationResult",
"QuerySpecification",
"QueryResult",
"JobProperties",
"PurgeMessageQueueResult",
"TwinProperties",
"Twin",
"CloudToDeviceMethod",
"JobRequest",
"DeviceJobStatistics",
"JobResponse",
"Module",
"CloudToDeviceMethodResult",
]

Просмотреть файл

@ -0,0 +1,33 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class AuthenticationMechanism(Model):
"""AuthenticationMechanism.
:param symmetric_key:
:type symmetric_key: ~service20180630.models.SymmetricKey
:param x509_thumbprint:
:type x509_thumbprint: ~service20180630.models.X509Thumbprint
:param type: Possible values include: 'sas', 'selfSigned',
'certificateAuthority', 'none'
:type type: str or ~service20180630.models.enum
"""
_attribute_map = {
"symmetric_key": {"key": "symmetricKey", "type": "SymmetricKey"},
"x509_thumbprint": {"key": "x509Thumbprint", "type": "X509Thumbprint"},
"type": {"key": "type", "type": "str"},
}
def __init__(self, symmetric_key=None, x509_thumbprint=None, type=None):
super(AuthenticationMechanism, self).__init__()
self.symmetric_key = symmetric_key
self.x509_thumbprint = x509_thumbprint
self.type = type

Просмотреть файл

@ -0,0 +1,35 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class BulkRegistryOperationResult(Model):
"""Encapsulates the result of a bulk registry operation.
:param is_successful: Whether or not the operation was successful.
:type is_successful: bool
:param errors: If the operation was not successful, this contains an array
of DeviceRegistryOperationError objects.
:type errors: list[~service20180630.models.DeviceRegistryOperationError]
:param warnings: If the operation was partially successful, this contains
an array of DeviceRegistryOperationWarning objects.
:type warnings:
list[~service20180630.models.DeviceRegistryOperationWarning]
"""
_attribute_map = {
"is_successful": {"key": "isSuccessful", "type": "bool"},
"errors": {"key": "errors", "type": "[DeviceRegistryOperationError]"},
"warnings": {"key": "warnings", "type": "[DeviceRegistryOperationWarning]"},
}
def __init__(self, is_successful=None, errors=None, warnings=None):
super(BulkRegistryOperationResult, self).__init__()
self.is_successful = is_successful
self.errors = errors
self.warnings = warnings

Просмотреть файл

@ -0,0 +1,43 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class CloudToDeviceMethod(Model):
"""Parameters to execute a direct method on the device.
Variables are only populated by the server, and will be ignored when
sending a request.
:param method_name: Method to run
:type method_name: str
:ivar payload: Payload
:vartype payload: object
:param response_timeout_in_seconds:
:type response_timeout_in_seconds: int
:param connect_timeout_in_seconds:
:type connect_timeout_in_seconds: int
"""
_validation = {"payload": {"readonly": True}}
_attribute_map = {
"method_name": {"key": "methodName", "type": "str"},
"payload": {"key": "payload", "type": "object"},
"response_timeout_in_seconds": {"key": "responseTimeoutInSeconds", "type": "int"},
"connect_timeout_in_seconds": {"key": "connectTimeoutInSeconds", "type": "int"},
}
def __init__(
self, method_name=None, response_timeout_in_seconds=None, connect_timeout_in_seconds=None
):
super(CloudToDeviceMethod, self).__init__()
self.method_name = method_name
self.payload = None
self.response_timeout_in_seconds = response_timeout_in_seconds
self.connect_timeout_in_seconds = connect_timeout_in_seconds

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class CloudToDeviceMethodResult(Model):
"""Represents the Device Method Invocation Results.
:param status: Method invocation result status.
:type status: int
:param payload: Method invocation result payload.
:type payload: object
"""
_attribute_map = {
"status": {"key": "status", "type": "int"},
"payload": {"key": "payload", "type": "object"},
}
def __init__(self, status=None, payload=None):
super(CloudToDeviceMethodResult, self).__init__()
self.status = status
self.payload = payload

Просмотреть файл

@ -0,0 +1,78 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class Configuration(Model):
"""Configuration for IotHub devices and modules.
:param id: Gets Identifier for the configuration
:type id: str
:param schema_version: Gets Schema version for the configuration
:type schema_version: str
:param labels: Gets or sets labels for the configuration
:type labels: dict[str, str]
:param content: Gets or sets Content for the configuration
:type content: ~service20180630.models.ConfigurationContent
:param target_condition: Gets or sets Target Condition for the
configuration
:type target_condition: str
:param created_time_utc: Gets creation time for the configuration
:type created_time_utc: datetime
:param last_updated_time_utc: Gets last update time for the configuration
:type last_updated_time_utc: datetime
:param priority: Gets or sets Priority for the configuration
:type priority: int
:param system_metrics: System Configuration Metrics
:type system_metrics: ~service20180630.models.ConfigurationMetrics
:param metrics: Custom Configuration Metrics
:type metrics: ~service20180630.models.ConfigurationMetrics
:param etag: Gets or sets configuration's ETag
:type etag: str
"""
_attribute_map = {
"id": {"key": "id", "type": "str"},
"schema_version": {"key": "schemaVersion", "type": "str"},
"labels": {"key": "labels", "type": "{str}"},
"content": {"key": "content", "type": "ConfigurationContent"},
"target_condition": {"key": "targetCondition", "type": "str"},
"created_time_utc": {"key": "createdTimeUtc", "type": "iso-8601"},
"last_updated_time_utc": {"key": "lastUpdatedTimeUtc", "type": "iso-8601"},
"priority": {"key": "priority", "type": "int"},
"system_metrics": {"key": "systemMetrics", "type": "ConfigurationMetrics"},
"metrics": {"key": "metrics", "type": "ConfigurationMetrics"},
"etag": {"key": "etag", "type": "str"},
}
def __init__(
self,
id=None,
schema_version=None,
labels=None,
content=None,
target_condition=None,
created_time_utc=None,
last_updated_time_utc=None,
priority=None,
system_metrics=None,
metrics=None,
etag=None,
):
super(Configuration, self).__init__()
self.id = id
self.schema_version = schema_version
self.labels = labels
self.content = content
self.target_condition = target_condition
self.created_time_utc = created_time_utc
self.last_updated_time_utc = last_updated_time_utc
self.priority = priority
self.system_metrics = system_metrics
self.metrics = metrics
self.etag = etag

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ConfigurationContent(Model):
"""Configuration Content for Devices or Modules on Edge Devices.
:param device_content: Gets or sets device Configurations
:type device_content: dict[str, object]
:param modules_content: Gets or sets Module Configurations
:type modules_content: dict[str, dict[str, object]]
"""
_attribute_map = {
"device_content": {"key": "deviceContent", "type": "{object}"},
"modules_content": {"key": "modulesContent", "type": "{{object}}"},
}
def __init__(self, device_content=None, modules_content=None):
super(ConfigurationContent, self).__init__()
self.device_content = device_content
self.modules_content = modules_content

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ConfigurationMetrics(Model):
"""Configuration Metrics.
:param results:
:type results: dict[str, long]
:param queries:
:type queries: dict[str, str]
"""
_attribute_map = {
"results": {"key": "results", "type": "{long}"},
"queries": {"key": "queries", "type": "{str}"},
}
def __init__(self, results=None, queries=None):
super(ConfigurationMetrics, self).__init__()
self.results = results
self.queries = queries

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ConfigurationQueriesTestInput(Model):
"""ConfigurationQueriesTestInput.
:param target_condition:
:type target_condition: str
:param custom_metric_queries:
:type custom_metric_queries: dict[str, str]
"""
_attribute_map = {
"target_condition": {"key": "targetCondition", "type": "str"},
"custom_metric_queries": {"key": "customMetricQueries", "type": "{str}"},
}
def __init__(self, target_condition=None, custom_metric_queries=None):
super(ConfigurationQueriesTestInput, self).__init__()
self.target_condition = target_condition
self.custom_metric_queries = custom_metric_queries

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ConfigurationQueriesTestResponse(Model):
"""ConfigurationQueriesTestResponse.
:param target_condition_error:
:type target_condition_error: str
:param custom_metric_query_errors:
:type custom_metric_query_errors: dict[str, str]
"""
_attribute_map = {
"target_condition_error": {"key": "targetConditionError", "type": "str"},
"custom_metric_query_errors": {"key": "customMetricQueryErrors", "type": "{str}"},
}
def __init__(self, target_condition_error=None, custom_metric_query_errors=None):
super(ConfigurationQueriesTestResponse, self).__init__()
self.target_condition_error = target_condition_error
self.custom_metric_query_errors = custom_metric_query_errors

Просмотреть файл

@ -0,0 +1,83 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class Device(Model):
"""Device.
:param device_id:
:type device_id: str
:param generation_id:
:type generation_id: str
:param etag:
:type etag: str
:param connection_state: Possible values include: 'Disconnected',
'Connected'
:type connection_state: str or ~service20180630.models.enum
:param status: Possible values include: 'enabled', 'disabled'
:type status: str or ~service20180630.models.enum
:param status_reason:
:type status_reason: str
:param connection_state_updated_time:
:type connection_state_updated_time: datetime
:param status_updated_time:
:type status_updated_time: datetime
:param last_activity_time:
:type last_activity_time: datetime
:param cloud_to_device_message_count:
:type cloud_to_device_message_count: int
:param authentication:
:type authentication: ~service20180630.models.AuthenticationMechanism
:param capabilities:
:type capabilities: ~service20180630.models.DeviceCapabilities
"""
_attribute_map = {
"device_id": {"key": "deviceId", "type": "str"},
"generation_id": {"key": "generationId", "type": "str"},
"etag": {"key": "etag", "type": "str"},
"connection_state": {"key": "connectionState", "type": "str"},
"status": {"key": "status", "type": "str"},
"status_reason": {"key": "statusReason", "type": "str"},
"connection_state_updated_time": {"key": "connectionStateUpdatedTime", "type": "iso-8601"},
"status_updated_time": {"key": "statusUpdatedTime", "type": "iso-8601"},
"last_activity_time": {"key": "lastActivityTime", "type": "iso-8601"},
"cloud_to_device_message_count": {"key": "cloudToDeviceMessageCount", "type": "int"},
"authentication": {"key": "authentication", "type": "AuthenticationMechanism"},
"capabilities": {"key": "capabilities", "type": "DeviceCapabilities"},
}
def __init__(
self,
device_id=None,
generation_id=None,
etag=None,
connection_state=None,
status=None,
status_reason=None,
connection_state_updated_time=None,
status_updated_time=None,
last_activity_time=None,
cloud_to_device_message_count=None,
authentication=None,
capabilities=None,
):
super(Device, self).__init__()
self.device_id = device_id
self.generation_id = generation_id
self.etag = etag
self.connection_state = connection_state
self.status = status
self.status_reason = status_reason
self.connection_state_updated_time = connection_state_updated_time
self.status_updated_time = status_updated_time
self.last_activity_time = last_activity_time
self.cloud_to_device_message_count = cloud_to_device_message_count
self.authentication = authentication
self.capabilities = capabilities

Просмотреть файл

@ -0,0 +1,22 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class DeviceCapabilities(Model):
"""Status of Capabilities enabled on the device.
:param iot_edge:
:type iot_edge: bool
"""
_attribute_map = {"iot_edge": {"key": "iotEdge", "type": "bool"}}
def __init__(self, iot_edge=None):
super(DeviceCapabilities, self).__init__()
self.iot_edge = iot_edge

Просмотреть файл

@ -0,0 +1,47 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class DeviceJobStatistics(Model):
"""The job counts, e.g., number of failed/succeeded devices.
:param device_count: Number of devices in the job
:type device_count: int
:param failed_count: The number of failed jobs
:type failed_count: int
:param succeeded_count: The number of Successed jobs
:type succeeded_count: int
:param running_count: The number of running jobs
:type running_count: int
:param pending_count: The number of pending (scheduled) jobs
:type pending_count: int
"""
_attribute_map = {
"device_count": {"key": "deviceCount", "type": "int"},
"failed_count": {"key": "failedCount", "type": "int"},
"succeeded_count": {"key": "succeededCount", "type": "int"},
"running_count": {"key": "runningCount", "type": "int"},
"pending_count": {"key": "pendingCount", "type": "int"},
}
def __init__(
self,
device_count=None,
failed_count=None,
succeeded_count=None,
running_count=None,
pending_count=None,
):
super(DeviceJobStatistics, self).__init__()
self.device_count = device_count
self.failed_count = failed_count
self.succeeded_count = succeeded_count
self.running_count = running_count
self.pending_count = pending_count

Просмотреть файл

@ -0,0 +1,113 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class DeviceRegistryOperationError(Model):
"""Encapsulates device registry operation error details.
:param device_id: The ID of the device that indicated the error.
:type device_id: str
:param error_code: ErrorCode associated with the error. Possible values
include: 'InvalidErrorCode', 'GenericBadRequest',
'InvalidProtocolVersion', 'DeviceInvalidResultCount', 'InvalidOperation',
'ArgumentInvalid', 'ArgumentNull', 'IotHubFormatError',
'DeviceStorageEntitySerializationError', 'BlobContainerValidationError',
'ImportWarningExistsError', 'InvalidSchemaVersion',
'DeviceDefinedMultipleTimes', 'DeserializationError',
'BulkRegistryOperationFailure', 'DefaultStorageEndpointNotConfigured',
'InvalidFileUploadCorrelationId', 'ExpiredFileUploadCorrelationId',
'InvalidStorageEndpoint', 'InvalidMessagingEndpoint',
'InvalidFileUploadCompletionStatus', 'InvalidStorageEndpointOrBlob',
'RequestCanceled', 'InvalidStorageEndpointProperty', 'EtagDoesNotMatch',
'RequestTimedOut', 'UnsupportedOperationOnReplica', 'NullMessage',
'ConnectionForcefullyClosedOnNewConnection', 'InvalidRouteTestInput',
'InvalidSourceOnRoute', 'RoutingNotEnabled', 'InvalidEndorsementKey',
'InvalidRegistrationId', 'InvalidStorageRootKey',
'InvalidEnrollmentGroupId', 'TooManyEnrollments',
'RegistrationIdDefinedMultipleTimes', 'CannotRegisterModuleToModule',
'TenantHubRoutingNotEnabled', 'InvalidConfigurationTargetCondition',
'InvalidConfigurationContent',
'CannotModifyImmutableConfigurationContent',
'InvalidConfigurationCustomMetricsQuery', 'GenericUnauthorized',
'IotHubNotFound', 'IotHubUnauthorizedAccess', 'IotHubUnauthorized',
'ElasticPoolNotFound', 'SystemModuleModifyUnauthorizedAccess',
'GenericForbidden', 'IotHubSuspended', 'IotHubQuotaExceeded',
'JobQuotaExceeded', 'DeviceMaximumQueueDepthExceeded',
'IotHubMaxCbsTokenExceeded', 'DeviceMaximumActiveFileUploadLimitExceeded',
'DeviceMaximumQueueSizeExceeded', 'RoutingEndpointResponseForbidden',
'InvalidMessageExpiryTime', 'OperationNotAvailableInCurrentTier',
'DeviceModelMaxPropertiesExceeded',
'DeviceModelMaxIndexablePropertiesExceeded', 'GenericNotFound',
'DeviceNotFound', 'JobNotFound', 'QuotaMetricNotFound',
'SystemPropertyNotFound', 'AmqpAddressNotFound',
'RoutingEndpointResponseNotFound', 'CertificateNotFound',
'ElasticPoolTenantHubNotFound', 'ModuleNotFound',
'AzureTableStoreNotFound', 'IotHubFailingOver',
'QueryStoreClusterNotFound', 'DeviceNotOnline',
'DeviceConnectionClosedRemotely', 'EnrollmentNotFound',
'DeviceRegistrationNotFound', 'AsyncOperationNotFound',
'EnrollmentGroupNotFound', 'ConfigurationNotFound', 'GroupNotFound',
'GenericMethodNotAllowed', 'OperationNotAllowedInCurrentState',
'ImportDevicesNotSupported', 'BulkAddDevicesNotSupported',
'GenericConflict', 'DeviceAlreadyExists', 'LinkCreationConflict',
'CallbackSubscriptionConflict', 'ModelAlreadyExists', 'DeviceLocked',
'DeviceJobAlreadyExists', 'JobAlreadyExists', 'EnrollmentConflict',
'EnrollmentGroupConflict', 'RegistrationStatusConflict',
'ModuleAlreadyExistsOnDevice', 'ConfigurationAlreadyExists',
'ApplyConfigurationAlreadyInProgressOnDevice', 'GroupAlreadyExists',
'GenericPreconditionFailed', 'PreconditionFailed',
'DeviceMessageLockLost', 'JobRunPreconditionFailed',
'InflightMessagesInLink', 'GenericRequestEntityTooLarge',
'MessageTooLarge', 'TooManyDevices', 'TooManyModulesOnDevice',
'ConfigurationCountLimitExceeded', 'GroupCountLimitExceeded',
'GenericUnsupportedMediaType', 'IncompatibleDataType',
'GenericTooManyRequests', 'ThrottlingException',
'ThrottleBacklogLimitExceeded', 'ThrottlingBacklogTimeout',
'ThrottlingMaxActiveJobCountExceeded', 'GenericServerError',
'ServerError', 'JobCancelled', 'StatisticsRetrievalError',
'ConnectionForcefullyClosed', 'InvalidBlobState', 'BackupTimedOut',
'AzureStorageTimeout', 'GenericTimeout', 'InvalidThrottleParameter',
'EventHubLinkAlreadyClosed', 'ReliableBlobStoreError',
'RetryAttemptsExhausted', 'AzureTableStoreError',
'CheckpointStoreNotFound', 'DocumentDbInvalidReturnValue',
'ReliableDocDbStoreStoreError', 'ReliableBlobStoreTimeoutError',
'ConfigReadFailed', 'InvalidContainerReceiveLink',
'InvalidPartitionEpoch', 'RestoreTimedOut', 'StreamReservationFailure',
'UnexpectedPropertyValue', 'OrchestrationOperationFailed',
'GenericBadGateway', 'InvalidResponseWhileProxying',
'GenericServiceUnavailable', 'ServiceUnavailable', 'PartitionNotFound',
'IotHubActivationFailed', 'ServerBusy', 'IotHubRestoring',
'ConnectionUnavailable', 'DeviceUnavailable', 'ConfigurationNotAvailable',
'GroupNotAvailable', 'GenericGatewayTimeout', 'GatewayTimeout'
:type error_code: str or ~service20180630.models.enum
:param error_status: Additional details associated with the error.
:type error_status: str
:param module_id:
:type module_id: str
:param operation:
:type operation: str
"""
_attribute_map = {
"device_id": {"key": "deviceId", "type": "str"},
"error_code": {"key": "errorCode", "type": "str"},
"error_status": {"key": "errorStatus", "type": "str"},
"module_id": {"key": "moduleId", "type": "str"},
"operation": {"key": "operation", "type": "str"},
}
def __init__(
self, device_id=None, error_code=None, error_status=None, module_id=None, operation=None
):
super(DeviceRegistryOperationError, self).__init__()
self.device_id = device_id
self.error_code = error_code
self.error_status = error_status
self.module_id = module_id
self.operation = operation

Просмотреть файл

@ -0,0 +1,33 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class DeviceRegistryOperationWarning(Model):
"""Encapsulates device registry operation error details.
:param device_id: The ID of the device that indicated the warning.
:type device_id: str
:param warning_code: Possible values include:
'DeviceRegisteredWithoutTwin'
:type warning_code: str or ~service20180630.models.enum
:param warning_status: Additional details associated with the warning.
:type warning_status: str
"""
_attribute_map = {
"device_id": {"key": "deviceId", "type": "str"},
"warning_code": {"key": "warningCode", "type": "str"},
"warning_status": {"key": "warningStatus", "type": "str"},
}
def __init__(self, device_id=None, warning_code=None, warning_status=None):
super(DeviceRegistryOperationWarning, self).__init__()
self.device_id = device_id
self.warning_code = warning_code
self.warning_status = warning_status

Просмотреть файл

@ -0,0 +1,79 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ExportImportDevice(Model):
"""ExportImportDevice.
:param id:
:type id: str
:param module_id:
:type module_id: str
:param e_tag:
:type e_tag: str
:param import_mode: Possible values include: 'createOrUpdate', 'create',
'update', 'updateIfMatchETag', 'createOrUpdateIfMatchETag', 'delete',
'deleteIfMatchETag', 'updateTwin', 'updateTwinIfMatchETag'
:type import_mode: str or ~service20180630.models.enum
:param status: Possible values include: 'enabled', 'disabled'
:type status: str or ~service20180630.models.enum
:param status_reason:
:type status_reason: str
:param authentication:
:type authentication: ~service20180630.models.AuthenticationMechanism
:param twin_etag:
:type twin_etag: str
:param tags:
:type tags: dict[str, object]
:param properties:
:type properties: ~service20180630.models.PropertyContainer
:param capabilities:
:type capabilities: ~service20180630.models.DeviceCapabilities
"""
_attribute_map = {
"id": {"key": "id", "type": "str"},
"module_id": {"key": "moduleId", "type": "str"},
"e_tag": {"key": "eTag", "type": "str"},
"import_mode": {"key": "importMode", "type": "str"},
"status": {"key": "status", "type": "str"},
"status_reason": {"key": "statusReason", "type": "str"},
"authentication": {"key": "authentication", "type": "AuthenticationMechanism"},
"twin_etag": {"key": "twinETag", "type": "str"},
"tags": {"key": "tags", "type": "{object}"},
"properties": {"key": "properties", "type": "PropertyContainer"},
"capabilities": {"key": "capabilities", "type": "DeviceCapabilities"},
}
def __init__(
self,
id=None,
module_id=None,
e_tag=None,
import_mode=None,
status=None,
status_reason=None,
authentication=None,
twin_etag=None,
tags=None,
properties=None,
capabilities=None,
):
super(ExportImportDevice, self).__init__()
self.id = id
self.module_id = module_id
self.e_tag = e_tag
self.import_mode = import_mode
self.status = status
self.status_reason = status_reason
self.authentication = authentication
self.twin_etag = twin_etag
self.tags = tags
self.properties = properties
self.capabilities = capabilities

Просмотреть файл

@ -0,0 +1,99 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class JobProperties(Model):
"""JobProperties.
:param job_id: System generated. Ignored at creation.
:type job_id: str
:param start_time_utc: System generated. Ignored at creation.
:type start_time_utc: datetime
:param end_time_utc: System generated. Ignored at creation.
Represents the time the job stopped processing.
:type end_time_utc: datetime
:param type: Required.
The type of job to execute. Possible values include: 'unknown', 'export',
'import', 'backup', 'readDeviceProperties', 'writeDeviceProperties',
'updateDeviceConfiguration', 'rebootDevice', 'factoryResetDevice',
'firmwareUpdate', 'scheduleDeviceMethod', 'scheduleUpdateTwin',
'restoreFromBackup', 'failoverDataCopy'
:type type: str or ~service20180630.models.enum
:param status: System generated. Ignored at creation. Possible values
include: 'unknown', 'enqueued', 'running', 'completed', 'failed',
'cancelled', 'scheduled', 'queued'
:type status: str or ~service20180630.models.enum
:param progress: System generated. Ignored at creation.
Represents the percentage of completion.
:type progress: int
:param input_blob_container_uri: URI containing SAS token to a blob
container that contains registry data to sync.
:type input_blob_container_uri: str
:param input_blob_name: The blob name to be used when importing from the
provided input blob container.
:type input_blob_name: str
:param output_blob_container_uri: URI containing SAS token to a blob
container. This is used to output the status of the job and the results.
:type output_blob_container_uri: str
:param output_blob_name: The name of the blob that will be created in the
provided output blob container. This blob will contain
the exported device registry information for the IoT Hub.
:type output_blob_name: str
:param exclude_keys_in_export: Optional for export jobs; ignored for other
jobs. Default: false. If false, authorization keys are included
in export output. Keys are exported as null otherwise.
:type exclude_keys_in_export: bool
:param failure_reason: System genereated. Ignored at creation.
If status == failure, this represents a string containing the reason.
:type failure_reason: str
"""
_attribute_map = {
"job_id": {"key": "jobId", "type": "str"},
"start_time_utc": {"key": "startTimeUtc", "type": "iso-8601"},
"end_time_utc": {"key": "endTimeUtc", "type": "iso-8601"},
"type": {"key": "type", "type": "str"},
"status": {"key": "status", "type": "str"},
"progress": {"key": "progress", "type": "int"},
"input_blob_container_uri": {"key": "inputBlobContainerUri", "type": "str"},
"input_blob_name": {"key": "inputBlobName", "type": "str"},
"output_blob_container_uri": {"key": "outputBlobContainerUri", "type": "str"},
"output_blob_name": {"key": "outputBlobName", "type": "str"},
"exclude_keys_in_export": {"key": "excludeKeysInExport", "type": "bool"},
"failure_reason": {"key": "failureReason", "type": "str"},
}
def __init__(
self,
job_id=None,
start_time_utc=None,
end_time_utc=None,
type=None,
status=None,
progress=None,
input_blob_container_uri=None,
input_blob_name=None,
output_blob_container_uri=None,
output_blob_name=None,
exclude_keys_in_export=None,
failure_reason=None,
):
super(JobProperties, self).__init__()
self.job_id = job_id
self.start_time_utc = start_time_utc
self.end_time_utc = end_time_utc
self.type = type
self.status = status
self.progress = progress
self.input_blob_container_uri = input_blob_container_uri
self.input_blob_name = input_blob_name
self.output_blob_container_uri = output_blob_container_uri
self.output_blob_name = output_blob_name
self.exclude_keys_in_export = exclude_keys_in_export
self.failure_reason = failure_reason

Просмотреть файл

@ -0,0 +1,66 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class JobRequest(Model):
"""JobRequest.
:param job_id: Job identifier
:type job_id: str
:param type: Required.
The type of job to execute. Possible values include: 'unknown', 'export',
'import', 'backup', 'readDeviceProperties', 'writeDeviceProperties',
'updateDeviceConfiguration', 'rebootDevice', 'factoryResetDevice',
'firmwareUpdate', 'scheduleDeviceMethod', 'scheduleUpdateTwin',
'restoreFromBackup', 'failoverDataCopy'
:type type: str or ~service20180630.models.enum
:param cloud_to_device_method: Required if jobType is cloudToDeviceMethod.
The method type and parameters.
:type cloud_to_device_method: ~service20180630.models.CloudToDeviceMethod
:param update_twin:
:type update_twin: ~service20180630.models.Twin
:param query_condition: Required if jobType is updateTwin or
cloudToDeviceMethod.
Condition for device query to get devices to execute the job on
:type query_condition: str
:param start_time: ISO 8601 date time to start the job
:type start_time: datetime
:param max_execution_time_in_seconds: Max execution time in secounds (ttl
duration)
:type max_execution_time_in_seconds: long
"""
_attribute_map = {
"job_id": {"key": "jobId", "type": "str"},
"type": {"key": "type", "type": "str"},
"cloud_to_device_method": {"key": "cloudToDeviceMethod", "type": "CloudToDeviceMethod"},
"update_twin": {"key": "updateTwin", "type": "Twin"},
"query_condition": {"key": "queryCondition", "type": "str"},
"start_time": {"key": "startTime", "type": "iso-8601"},
"max_execution_time_in_seconds": {"key": "maxExecutionTimeInSeconds", "type": "long"},
}
def __init__(
self,
job_id=None,
type=None,
cloud_to_device_method=None,
update_twin=None,
query_condition=None,
start_time=None,
max_execution_time_in_seconds=None,
):
super(JobRequest, self).__init__()
self.job_id = job_id
self.type = type
self.cloud_to_device_method = cloud_to_device_method
self.update_twin = update_twin
self.query_condition = query_condition
self.start_time = start_time
self.max_execution_time_in_seconds = max_execution_time_in_seconds

Просмотреть файл

@ -0,0 +1,98 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class JobResponse(Model):
"""JobResponse.
:param job_id: System generated. Ignored at creation.
:type job_id: str
:param query_condition: Device query condition.
:type query_condition: str
:param created_time: System generated. Ignored at creation.
:type created_time: datetime
:param start_time: Scheduled job start time in UTC.
:type start_time: datetime
:param end_time: System generated. Ignored at creation.
Represents the time the job stopped processing.
:type end_time: datetime
:param max_execution_time_in_seconds: Max execution time in secounds (ttl
duration)
:type max_execution_time_in_seconds: long
:param type: Required.
The type of job to execute. Possible values include: 'unknown', 'export',
'import', 'backup', 'readDeviceProperties', 'writeDeviceProperties',
'updateDeviceConfiguration', 'rebootDevice', 'factoryResetDevice',
'firmwareUpdate', 'scheduleDeviceMethod', 'scheduleUpdateTwin',
'restoreFromBackup', 'failoverDataCopy'
:type type: str or ~service20180630.models.enum
:param cloud_to_device_method: Required if jobType is cloudToDeviceMethod.
The method type and parameters.
:type cloud_to_device_method: ~service20180630.models.CloudToDeviceMethod
:param update_twin:
:type update_twin: ~service20180630.models.Twin
:param status: System generated. Ignored at creation. Possible values
include: 'unknown', 'enqueued', 'running', 'completed', 'failed',
'cancelled', 'scheduled', 'queued'
:type status: str or ~service20180630.models.enum
:param failure_reason: System generated. Ignored at creation.
If status == failure, this represents a string containing the reason.
:type failure_reason: str
:param status_message: Status message for the job
:type status_message: str
:param device_job_statistics: Job details
:type device_job_statistics: ~service20180630.models.DeviceJobStatistics
"""
_attribute_map = {
"job_id": {"key": "jobId", "type": "str"},
"query_condition": {"key": "queryCondition", "type": "str"},
"created_time": {"key": "createdTime", "type": "iso-8601"},
"start_time": {"key": "startTime", "type": "iso-8601"},
"end_time": {"key": "endTime", "type": "iso-8601"},
"max_execution_time_in_seconds": {"key": "maxExecutionTimeInSeconds", "type": "long"},
"type": {"key": "type", "type": "str"},
"cloud_to_device_method": {"key": "cloudToDeviceMethod", "type": "CloudToDeviceMethod"},
"update_twin": {"key": "updateTwin", "type": "Twin"},
"status": {"key": "status", "type": "str"},
"failure_reason": {"key": "failureReason", "type": "str"},
"status_message": {"key": "statusMessage", "type": "str"},
"device_job_statistics": {"key": "deviceJobStatistics", "type": "DeviceJobStatistics"},
}
def __init__(
self,
job_id=None,
query_condition=None,
created_time=None,
start_time=None,
end_time=None,
max_execution_time_in_seconds=None,
type=None,
cloud_to_device_method=None,
update_twin=None,
status=None,
failure_reason=None,
status_message=None,
device_job_statistics=None,
):
super(JobResponse, self).__init__()
self.job_id = job_id
self.query_condition = query_condition
self.created_time = created_time
self.start_time = start_time
self.end_time = end_time
self.max_execution_time_in_seconds = max_execution_time_in_seconds
self.type = type
self.cloud_to_device_method = cloud_to_device_method
self.update_twin = update_twin
self.status = status
self.failure_reason = failure_reason
self.status_message = status_message
self.device_job_statistics = device_job_statistics

Просмотреть файл

@ -0,0 +1,73 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class Module(Model):
"""Module identity on a device.
:param module_id:
:type module_id: str
:param managed_by:
:type managed_by: str
:param device_id:
:type device_id: str
:param generation_id:
:type generation_id: str
:param etag:
:type etag: str
:param connection_state: Possible values include: 'Disconnected',
'Connected'
:type connection_state: str or ~service20180630.models.enum
:param connection_state_updated_time:
:type connection_state_updated_time: datetime
:param last_activity_time:
:type last_activity_time: datetime
:param cloud_to_device_message_count:
:type cloud_to_device_message_count: int
:param authentication:
:type authentication: ~service20180630.models.AuthenticationMechanism
"""
_attribute_map = {
"module_id": {"key": "moduleId", "type": "str"},
"managed_by": {"key": "managedBy", "type": "str"},
"device_id": {"key": "deviceId", "type": "str"},
"generation_id": {"key": "generationId", "type": "str"},
"etag": {"key": "etag", "type": "str"},
"connection_state": {"key": "connectionState", "type": "str"},
"connection_state_updated_time": {"key": "connectionStateUpdatedTime", "type": "iso-8601"},
"last_activity_time": {"key": "lastActivityTime", "type": "iso-8601"},
"cloud_to_device_message_count": {"key": "cloudToDeviceMessageCount", "type": "int"},
"authentication": {"key": "authentication", "type": "AuthenticationMechanism"},
}
def __init__(
self,
module_id=None,
managed_by=None,
device_id=None,
generation_id=None,
etag=None,
connection_state=None,
connection_state_updated_time=None,
last_activity_time=None,
cloud_to_device_message_count=None,
authentication=None,
):
super(Module, self).__init__()
self.module_id = module_id
self.managed_by = managed_by
self.device_id = device_id
self.generation_id = generation_id
self.etag = etag
self.connection_state = connection_state
self.connection_state_updated_time = connection_state_updated_time
self.last_activity_time = last_activity_time
self.cloud_to_device_message_count = cloud_to_device_message_count
self.authentication = authentication

Просмотреть файл

@ -0,0 +1,35 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class PropertyContainer(Model):
"""Represents Twin properties.
:param desired: Used in conjunction with reported properties to
synchronize device configuration or condition. Desired properties can only
be set by the solution back end and can be read by the device app. The
device app can also be notified in real time of changes on the desired
properties.
:type desired: dict[str, object]
:param reported: Used in conjunction with desired properties to
synchronize device configuration or condition. Reported properties can
only be set by the device app and can be read and queried by the solution
back end.
:type reported: dict[str, object]
"""
_attribute_map = {
"desired": {"key": "desired", "type": "{object}"},
"reported": {"key": "reported", "type": "{object}"},
}
def __init__(self, desired=None, reported=None):
super(PropertyContainer, self).__init__()
self.desired = desired
self.reported = reported

Просмотреть файл

@ -0,0 +1,32 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class PurgeMessageQueueResult(Model):
"""Result of a device message queue purge operation.
:param total_messages_purged:
:type total_messages_purged: int
:param device_id: The ID of the device whose messages are being purged.
:type device_id: str
:param module_id: The ID of the device whose messages are being purged.
:type module_id: str
"""
_attribute_map = {
"total_messages_purged": {"key": "totalMessagesPurged", "type": "int"},
"device_id": {"key": "deviceId", "type": "str"},
"module_id": {"key": "moduleId", "type": "str"},
}
def __init__(self, total_messages_purged=None, device_id=None, module_id=None):
super(PurgeMessageQueueResult, self).__init__()
self.total_messages_purged = total_messages_purged
self.device_id = device_id
self.module_id = module_id

Просмотреть файл

@ -0,0 +1,34 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class QueryResult(Model):
"""The query result.
:param type: The query result type. Possible values include: 'unknown',
'twin', 'deviceJob', 'jobResponse', 'raw', 'enrollment',
'enrollmentGroup', 'deviceRegistration'
:type type: str or ~service20180630.models.enum
:param items: The query result items, as a collection.
:type items: list[object]
:param continuation_token: Request continuation token.
:type continuation_token: str
"""
_attribute_map = {
"type": {"key": "type", "type": "str"},
"items": {"key": "items", "type": "[object]"},
"continuation_token": {"key": "continuationToken", "type": "str"},
}
def __init__(self, type=None, items=None, continuation_token=None):
super(QueryResult, self).__init__()
self.type = type
self.items = items
self.continuation_token = continuation_token

Просмотреть файл

@ -0,0 +1,22 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class QuerySpecification(Model):
"""A Json query request.
:param query: The query.
:type query: str
"""
_attribute_map = {"query": {"key": "query", "type": "str"}}
def __init__(self, query=None):
super(QuerySpecification, self).__init__()
self.query = query

Просмотреть файл

@ -0,0 +1,34 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class RegistryStatistics(Model):
"""RegistryStatistics.
:param total_device_count:
:type total_device_count: long
:param enabled_device_count:
:type enabled_device_count: long
:param disabled_device_count:
:type disabled_device_count: long
"""
_attribute_map = {
"total_device_count": {"key": "totalDeviceCount", "type": "long"},
"enabled_device_count": {"key": "enabledDeviceCount", "type": "long"},
"disabled_device_count": {"key": "disabledDeviceCount", "type": "long"},
}
def __init__(
self, total_device_count=None, enabled_device_count=None, disabled_device_count=None
):
super(RegistryStatistics, self).__init__()
self.total_device_count = total_device_count
self.enabled_device_count = enabled_device_count
self.disabled_device_count = disabled_device_count

Просмотреть файл

@ -0,0 +1,22 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class ServiceStatistics(Model):
"""ServiceStatistics.
:param connected_device_count:
:type connected_device_count: long
"""
_attribute_map = {"connected_device_count": {"key": "connectedDeviceCount", "type": "long"}}
def __init__(self, connected_device_count=None):
super(ServiceStatistics, self).__init__()
self.connected_device_count = connected_device_count

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class SymmetricKey(Model):
"""SymmetricKey.
:param primary_key:
:type primary_key: str
:param secondary_key:
:type secondary_key: str
"""
_attribute_map = {
"primary_key": {"key": "primaryKey", "type": "str"},
"secondary_key": {"key": "secondaryKey", "type": "str"},
}
def __init__(self, primary_key=None, secondary_key=None):
super(SymmetricKey, self).__init__()
self.primary_key = primary_key
self.secondary_key = secondary_key

Просмотреть файл

@ -0,0 +1,112 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class Twin(Model):
"""Twin Representation.
:param device_id: The deviceId uniquely identifies the device in the IoT
hub's identity registry. A case-sensitive string (up to 128 char long) of
ASCII 7-bit alphanumeric chars + {'-', ':', '.', '+', '%', '_', '#', '*',
'?', '!', '(', ')', ',', '=', '@', ';', '$', '''}.
:type device_id: str
:param module_id: Gets and sets the Module Id.
:type module_id: str
:param tags: A JSON document read and written by the solution back end.
Tags are not visible to device apps.
:type tags: dict[str, object]
:param properties: Gets and sets the Twin properties.
:type properties: ~service20180630.models.TwinProperties
:param etag: Twin's ETag
:type etag: str
:param version: Version for device twin, including tags and desired
properties
:type version: long
:param device_etag: Device's ETag
:type device_etag: str
:param status: Gets the corresponding Device's Status. Possible values
include: 'enabled', 'disabled'
:type status: str or ~service20180630.models.enum
:param status_reason: Reason, if any, for the corresponding Device to be
in specified Status
:type status_reason: str
:param status_update_time: Time when the corresponding Device's Status was
last updated
:type status_update_time: datetime
:param connection_state: Corresponding Device's ConnectionState. Possible
values include: 'Disconnected', 'Connected'
:type connection_state: str or ~service20180630.models.enum
:param last_activity_time: The last time the device connected, received or
sent a message. In ISO8601 datetime format in UTC, for example,
2015-01-28T16:24:48.789Z. This does not update if the device uses the
HTTP/1 protocol to perform messaging operations.
:type last_activity_time: datetime
:param cloud_to_device_message_count: Number of messages sent to the
corresponding Device from the Cloud
:type cloud_to_device_message_count: int
:param authentication_type: Corresponding Device's authentication type.
Possible values include: 'sas', 'selfSigned', 'certificateAuthority',
'none'
:type authentication_type: str or ~service20180630.models.enum
:param x509_thumbprint: Corresponding Device's X509 thumbprint
:type x509_thumbprint: ~service20180630.models.X509Thumbprint
"""
_attribute_map = {
"device_id": {"key": "deviceId", "type": "str"},
"module_id": {"key": "moduleId", "type": "str"},
"tags": {"key": "tags", "type": "{object}"},
"properties": {"key": "properties", "type": "TwinProperties"},
"etag": {"key": "etag", "type": "str"},
"version": {"key": "version", "type": "long"},
"device_etag": {"key": "deviceEtag", "type": "str"},
"status": {"key": "status", "type": "str"},
"status_reason": {"key": "statusReason", "type": "str"},
"status_update_time": {"key": "statusUpdateTime", "type": "iso-8601"},
"connection_state": {"key": "connectionState", "type": "str"},
"last_activity_time": {"key": "lastActivityTime", "type": "iso-8601"},
"cloud_to_device_message_count": {"key": "cloudToDeviceMessageCount", "type": "int"},
"authentication_type": {"key": "authenticationType", "type": "str"},
"x509_thumbprint": {"key": "x509Thumbprint", "type": "X509Thumbprint"},
}
def __init__(
self,
device_id=None,
module_id=None,
tags=None,
properties=None,
etag=None,
version=None,
device_etag=None,
status=None,
status_reason=None,
status_update_time=None,
connection_state=None,
last_activity_time=None,
cloud_to_device_message_count=None,
authentication_type=None,
x509_thumbprint=None,
):
super(Twin, self).__init__()
self.device_id = device_id
self.module_id = module_id
self.tags = tags
self.properties = properties
self.etag = etag
self.version = version
self.device_etag = device_etag
self.status = status
self.status_reason = status_reason
self.status_update_time = status_update_time
self.connection_state = connection_state
self.last_activity_time = last_activity_time
self.cloud_to_device_message_count = cloud_to_device_message_count
self.authentication_type = authentication_type
self.x509_thumbprint = x509_thumbprint

Просмотреть файл

@ -0,0 +1,35 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class TwinProperties(Model):
"""Represents Twin properties.
:param desired: Used in conjunction with reported properties to
synchronize device configuration or condition. Desired properties can only
be set by the solution back end and can be read by the device app. The
device app can also be notified in real time of changes on the desired
properties.
:type desired: dict[str, object]
:param reported: Used in conjunction with desired properties to
synchronize device configuration or condition. Reported properties can
only be set by the device app and can be read and queried by the solution
back end.
:type reported: dict[str, object]
"""
_attribute_map = {
"desired": {"key": "desired", "type": "{object}"},
"reported": {"key": "reported", "type": "{object}"},
}
def __init__(self, desired=None, reported=None):
super(TwinProperties, self).__init__()
self.desired = desired
self.reported = reported

Просмотреть файл

@ -0,0 +1,28 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from msrest.serialization import Model
class X509Thumbprint(Model):
"""X509Thumbprint.
:param primary_thumbprint:
:type primary_thumbprint: str
:param secondary_thumbprint:
:type secondary_thumbprint: str
"""
_attribute_map = {
"primary_thumbprint": {"key": "primaryThumbprint", "type": "str"},
"secondary_thumbprint": {"key": "secondaryThumbprint", "type": "str"},
}
def __init__(self, primary_thumbprint=None, secondary_thumbprint=None):
super(X509Thumbprint, self).__init__()
self.primary_thumbprint = primary_thumbprint
self.secondary_thumbprint = secondary_thumbprint

Просмотреть файл

@ -0,0 +1,10 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
from .service_operations import ServiceOperations
__all__ = ["ServiceOperations"]

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,8 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
VERSION = "2018-06-30"

Просмотреть файл

@ -0,0 +1,147 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure_provisioning_e2e.iothubservice20180630.iot_hub_gateway_service_ap_is20180630 import (
IotHubGatewayServiceAPIs20180630,
)
from msrest.exceptions import HttpOperationError
from azure.iot.device.common.connection_string import ConnectionString
from azure.iot.device.common.sastoken import SasToken
import uuid
import time
import random
max_failure_count = 5
initial_backoff = 10
def connection_string_to_sas_token(conn_str):
"""
parse an IoTHub service connection string and return the host and a shared access
signature that can be used to connect to the given hub
"""
conn_str_obj = ConnectionString(conn_str)
sas_token = SasToken(
uri=conn_str_obj.get("HostName"),
key=conn_str_obj.get("SharedAccessKey"),
key_name=conn_str_obj.get("SharedAccessKeyName"),
)
return {"host": conn_str_obj.get("HostName"), "sas": str(sas_token)}
def connection_string_to_hostname(conn_str):
"""
Retrieves only the hostname from connection string.
This will eventually give us the Linked IoT Hub
"""
conn_str_obj = ConnectionString(conn_str)
return conn_str_obj.get("HostName")
def run_with_retry(fun, args, kwargs):
failures_left = max_failure_count
retry = True
backoff = initial_backoff + random.randint(1, 10)
while retry:
try:
return fun(*args, **kwargs)
except HttpOperationError as e:
resp = e.response.json()
retry = False
if "Message" in resp:
if resp["Message"].startswith("ErrorCode:ThrottlingBacklogTimeout"):
retry = True
if retry and failures_left:
failures_left = failures_left - 1
print("{} failures left before giving up".format(failures_left))
print("sleeping for {} seconds".format(backoff))
time.sleep(backoff)
backoff = backoff * 2
else:
raise e
class Helper:
def __init__(self, service_connection_string):
self.cn = connection_string_to_sas_token(service_connection_string)
self.service = IotHubGatewayServiceAPIs20180630("https://" + self.cn["host"]).service
def headers(self):
return {
"Authorization": self.cn["sas"],
"Request-Id": str(uuid.uuid4()),
"User-Agent": "azure-iot-device-provisioning-e2e",
}
def get_device(self, device_id):
device = run_with_retry(
self.service.get_device, (device_id,), {"custom_headers": self.headers()}
)
return device
def get_module(self, device_id, module_id):
module = run_with_retry(
self.service.get_module, (device_id, module_id), {"custom_headers": self.headers()}
)
return module
def get_device_connection_string(self, device_id):
device = run_with_retry(
self.service.get_device, (device_id,), {"custom_headers": self.headers()}
)
primary_key = device.authentication.symmetric_key.primary_key
return (
"HostName="
+ self.cn["host"]
+ ";DeviceId="
+ device_id
+ ";SharedAccessKey="
+ primary_key
)
def get_module_connection_string(self, device_id, module_id):
module = run_with_retry(
self.service.get_module, (device_id, module_id), {"custom_headers": self.headers()}
)
primary_key = module.authentication.symmetric_key.primary_key
return (
"HostName="
+ self.cn["host"]
+ ";DeviceId="
+ device_id
+ ";ModuleId="
+ module_id
+ ";SharedAccessKey="
+ primary_key
)
def try_delete_device(self, device_id):
try:
run_with_retry(
self.service.delete_device,
(device_id,),
{"if_match": "*", "custom_headers": self.headers()},
)
return True
except HttpOperationError:
return False
def try_delete_module(self, device_id, module_id):
try:
run_with_retry(
self.service.delete_module,
(device_id, module_id),
{"if_match": "*", "custom_headers": self.headers()},
)
return True
except HttpOperationError:
return False

Просмотреть файл

Просмотреть файл

@ -0,0 +1,350 @@
#
# OpenSSL example configuration file.
# This is mostly being used for generation of certificate requests.
#
# This definition stops the following lines choking if HOME isn't
# defined.
HOME = .
RANDFILE = $ENV::HOME/.rnd
# Extra OBJECT IDENTIFIER info:
#oid_file = $ENV::HOME/.oid
oid_section = new_oids
# To use this configuration file with the "-extfile" option of the
# "openssl x509" utility, name here the section containing the
# X.509v3 extensions to use:
# extensions =
# (Alternatively, use a configuration file that has only
# X.509v3 extensions in its main [= default] section.)
[ new_oids ]
# We can add new OIDs in here for use by 'ca', 'req' and 'ts'.
# Add a simple OID like this:
# testoid1=1.2.3.4
# Or use config file substitution like this:
# testoid2=${testoid1}.5.6
# Policies used by the TSA examples.
tsa_policy1 = 1.2.3.4.1
tsa_policy2 = 1.2.3.4.5.6
tsa_policy3 = 1.2.3.4.5.7
####################################################################
[ ca ]
default_ca = CA_default # The default ca section
####################################################################
[ CA_default ]
dir = ./demoCA # Where everything is kept
certs = $dir/certs # Where the issued certs are kept
crl_dir = $dir/crl # Where the issued crl are kept
database = $dir/index.txt # database index file.
#unique_subject = no # Set to 'no' to allow creation of
# several ctificates with same subject.
new_certs_dir = $dir/newcerts # default place for new certs.
certificate = $dir/cacert.pem # The CA certificate
serial = $dir/serial # The current serial number
crlnumber = $dir/crlnumber # the current crl number
# must be commented out to leave a V1 CRL
crl = $dir/crl.pem # The current CRL
private_key = $dir/private/cakey.pem# The private key
RANDFILE = $dir/private/.rand # private random number file
x509_extensions = usr_cert # The extentions to add to the cert
# Comment out the following two lines for the "traditional"
# (and highly broken) format.
name_opt = ca_default # Subject Name options
cert_opt = ca_default # Certificate field options
# Extension copying option: use with caution.
# copy_extensions = copy
# Extensions to add to a CRL. Note: Netscape communicator chokes on V2 CRLs
# so this is commented out by default to leave a V1 CRL.
# crlnumber must also be commented out to leave a V1 CRL.
# crl_extensions = crl_ext
default_days = 365 # how long to certify for
default_crl_days= 30 # how long before next CRL
default_md = default # use public key default MD
preserve = no # keep passed DN ordering
# A few difference way of specifying how similar the request should look
# For type CA, the listed attributes must be the same, and the optional
# and supplied fields are just that :-)
policy = policy_anything
# For the CA policy
[ policy_match ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
# For the 'anything' policy
# At this point in time, you must list all acceptable 'object'
# types.
[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
####################################################################
[ req ]
default_bits = 2048
default_keyfile = privkey.pem
distinguished_name = req_distinguished_name
attributes = req_attributes
x509_extensions = v3_ca # The extentions to add to the self signed cert
# Passwords for private keys if not present they will be prompted for
# input_password = secret
# output_password = secret
# This sets a mask for permitted string types. There are several options.
# default: PrintableString, T61String, BMPString.
# pkix : PrintableString, BMPString (PKIX recommendation before 2004)
# utf8only: only UTF8Strings (PKIX recommendation after 2004).
# nombstr : PrintableString, T61String (no BMPStrings or UTF8Strings).
# MASK:XXXX a literal mask value.
# WARNING: ancient versions of Netscape crash on BMPStrings or UTF8Strings.
string_mask = utf8only
# req_extensions = v3_req # The extensions to add to a certificate request
[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = AU
countryName_min = 2
countryName_max = 2
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = Some-State
localityName = Locality Name (eg, city)
0.organizationName = Organization Name (eg, company)
0.organizationName_default = Internet Widgits Pty Ltd
# we can do this but it is not needed normally :-)
#1.organizationName = Second Organization Name (eg, company)
#1.organizationName_default = World Wide Web Pty Ltd
organizationalUnitName = Organizational Unit Name (eg, section)
#organizationalUnitName_default =
commonName = Common Name (e.g. server FQDN or YOUR name)
commonName_max = 64
emailAddress = Email Address
emailAddress_max = 64
# SET-ex3 = SET extension number 3
[ req_attributes ]
challengePassword = A challenge password
challengePassword_min = 4
challengePassword_max = 20
unstructuredName = An optional company name
[ usr_cert ]
# These extensions are added when 'ca' signs a request.
# This goes against PKIX guidelines but some CAs do it and some software
# requires this to avoid interpreting an end user certificate as a CA.
basicConstraints=CA:FALSE
# Here are some examples of the usage of nsCertType. If it is omitted
# the certificate can be used for anything *except* object signing.
# This is OK for an SSL server.
# nsCertType = server
# For an object signing certificate this would be used.
# nsCertType = objsign
# For normal client use this is typical
# nsCertType = client, email
# and for everything including object signing:
# nsCertType = client, email, objsign
# This is typical in keyUsage for a client certificate.
# keyUsage = nonRepudiation, digitalSignature, keyEncipherment
# This will be displayed in Netscape's comment listbox.
nsComment = "OpenSSL Generated Certificate"
# PKIX recommendations harmless if included in all certificates.
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid,issuer
# This stuff is for subjectAltName and issuerAltname.
# Import the email address.
# subjectAltName=email:copy
# An alternative to produce certificates that aren't
# deprecated according to PKIX.
# subjectAltName=email:move
# Copy subject details
# issuerAltName=issuer:copy
#nsCaRevocationUrl = http://www.domain.dom/ca-crl.pem
#nsBaseUrl
#nsRevocationUrl
#nsRenewalUrl
#nsCaPolicyUrl
#nsSslServerName
# This is required for TSA certificates.
# extendedKeyUsage = critical,timeStamping
[ v3_req ]
# Extensions to add to a certificate request
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
[ v3_ca ]
# Extensions for a typical CA
# PKIX recommendation.
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid:always,issuer
# This is what PKIX recommends but some broken software chokes on critical
# extensions.
#basicConstraints = critical,CA:true
# So we do this instead.
basicConstraints = CA:true
# Key usage: this is typical for a CA certificate. However since it will
# prevent it being used as an test self-signed certificate it is best
# left out by default.
# keyUsage = cRLSign, keyCertSign
# Some might want this also
# nsCertType = sslCA, emailCA
# Include email address in subject alt name: another PKIX recommendation
# subjectAltName=email:copy
# Copy issuer details
# issuerAltName=issuer:copy
# DER hex encoding of an extension: beware experts only!
# obj=DER:02:03
# Where 'obj' is a standard or added object
# You can even override a supported extension:
# basicConstraints= critical, DER:30:03:01:01:FF
[ crl_ext ]
# CRL extensions.
# Only issuerAltName and authorityKeyIdentifier make any sense in a CRL.
# issuerAltName=issuer:copy
authorityKeyIdentifier=keyid:always
[ proxy_cert_ext ]
# These extensions should be added when creating a proxy certificate
# This goes against PKIX guidelines but some CAs do it and some software
# requires this to avoid interpreting an end user certificate as a CA.
basicConstraints=CA:FALSE
# Here are some examples of the usage of nsCertType. If it is omitted
# the certificate can be used for anything *except* object signing.
# This is OK for an SSL server.
# nsCertType = server
# For an object signing certificate this would be used.
# nsCertType = objsign
# For normal client use this is typical
# nsCertType = client, email
# and for everything including object signing:
# nsCertType = client, email, objsign
# This is typical in keyUsage for a client certificate.
# keyUsage = nonRepudiation, digitalSignature, keyEncipherment
# This will be displayed in Netscape's comment listbox.
nsComment = "OpenSSL Generated Certificate"
# PKIX recommendations harmless if included in all certificates.
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid,issuer
# This stuff is for subjectAltName and issuerAltname.
# Import the email address.
# subjectAltName=email:copy
# An alternative to produce certificates that aren't
# deprecated according to PKIX.
# subjectAltName=email:move
# Copy subject details
# issuerAltName=issuer:copy
#nsCaRevocationUrl = http://www.domain.dom/ca-crl.pem
#nsBaseUrl
#nsRevocationUrl
#nsRenewalUrl
#nsCaPolicyUrl
#nsSslServerName
# This really needs to be in place for it to be a proxy certificate.
proxyCertInfo=critical,language:id-ppl-anyLanguage,pathlen:3,policy:foo
####################################################################
[ tsa ]
default_tsa = tsa_config1 # the default TSA section
[ tsa_config1 ]
# These are used by the TSA reply generation only.
dir = ./demoCA # TSA root directory
serial = $dir/tsaserial # The current serial number (mandatory)
crypto_device = builtin # OpenSSL engine to use for signing
signer_cert = $dir/tsacert.pem # The TSA signing certificate
# (optional)
certs = $dir/cacert.pem # Certificate chain to include in reply
# (optional)
signer_key = $dir/private/tsakey.pem # The TSA private key (optional)
default_policy = tsa_policy1 # Policy if request did not specify it
# (optional)
other_policies = tsa_policy2, tsa_policy3 # acceptable policies (optional)
digests = md5, sha1 # Acceptable message digests (mandatory)
accuracy = secs:1, millisecs:500, microsecs:100 # (optional)
clock_precision_digits = 0 # number of digits after dot. (optional)
ordering = yes # Is ordering defined for timestamps?
# (optional, default: no)
tsa_name = yes # Must the TSA name be included in the reply?
# (optional, default: no)
ess_cert_id_chain = no # Must the ESS cert id chain be included?
# (optional, default: no)

Просмотреть файл

@ -0,0 +1,289 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure_provisioning_e2e.service_helper import Helper, connection_string_to_hostname
from azure.iot.device.aio import ProvisioningDeviceClient
from azure.iot.device.common import X509
from provisioningserviceclient import (
ProvisioningServiceClient,
IndividualEnrollment,
EnrollmentGroup,
)
from provisioningserviceclient.protocol.models import AttestationMechanism, ReprovisionPolicy
import pytest
import logging
import os
from scripts.create_x509_chain_pipeline import (
call_intermediate_cert_creation_from_pipeline,
call_device_cert_creation_from_pipeline,
delete_directories_certs_created_from_pipeline,
)
pytestmark = pytest.mark.asyncio
logging.basicConfig(level=logging.DEBUG)
intermediate_common_name = "e2edpshomenum"
intermediate_password = "revelio"
device_common_name = "e2edpslocomotor"
device_password = "mortis"
service_client = ProvisioningServiceClient.create_from_connection_string(
os.getenv("PROVISIONING_SERVICE_CONNECTION_STRING")
)
device_registry_helper = Helper(os.getenv("IOTHUB_CONNECTION_STRING"))
linked_iot_hub = connection_string_to_hostname(os.getenv("IOTHUB_CONNECTION_STRING"))
PROVISIONING_HOST = os.getenv("PROVISIONING_DEVICE_ENDPOINT")
ID_SCOPE = os.getenv("PROVISIONING_DEVICE_IDSCOPE")
certificate_count = 8
type_to_device_indices = {
"individual_with_device_id": [1],
"individual_no_device_id": [2],
"group_intermediate": [3, 4, 5],
"group_ca": [6, 7, 8],
}
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
logging.info("set up certificates before cert related tests")
call_intermediate_cert_creation_from_pipeline(
common_name=intermediate_common_name,
ca_password=os.getenv("PROVISIONING_ROOT_PASSWORD"),
intermediate_password=intermediate_password,
)
call_device_cert_creation_from_pipeline(
common_name=device_common_name,
intermediate_password=intermediate_password,
device_password=device_password,
device_count=8,
)
def after_module():
logging.info("tear down certificates after cert related tests")
delete_directories_certs_created_from_pipeline()
request.addfinalizer(after_module)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the user supplied device_id different from the registration_id of the individual enrollment that has been created with a selfsigned X509 authentication"
)
async def test_device_register_with_device_id_for_a_x509_individual_enrollment():
device_id = "e2edpsthunderbolt"
device_index = type_to_device_indices.get("individual_with_device_id")[0]
try:
individual_enrollment_record = create_individual_enrollment_with_x509_client_certs(
device_index=device_index, device_id=device_id
)
registration_id = individual_enrollment_record.registration_id
device_cert_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
device_key_file = "demoCA/private/device_key" + str(device_index) + ".pem"
registration_result = await result_from_register(
registration_id, device_cert_file, device_key_file
)
assert device_id != registration_id
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with device_id equal to the registration_id of the individual enrollment that has been created with a selfsigned X509 authentication"
)
async def test_device_register_with_no_device_id_for_a_x509_individual_enrollment():
device_index = type_to_device_indices.get("individual_no_device_id")[0]
try:
individual_enrollment_record = create_individual_enrollment_with_x509_client_certs(
device_index=device_index
)
registration_id = individual_enrollment_record.registration_id
device_cert_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
device_key_file = "demoCA/private/device_key" + str(device_index) + ".pem"
registration_result = await result_from_register(
registration_id, device_cert_file, device_key_file
)
assert_device_provisioned(
device_id=registration_id, registration_result=registration_result
)
device_registry_helper.try_delete_device(registration_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A group of devices get provisioned to the linked IoTHub with device_ids equal to the individual registration_ids inside a group enrollment that has been created with intermediate X509 authentication"
)
async def test_group_of_devices_register_with_no_device_id_for_a_x509_intermediate_authentication_group_enrollment():
group_id = "e2e-intermediate-durmstrang"
common_device_id = device_common_name
devices_indices = type_to_device_indices.get("group_intermediate")
device_count_in_group = len(devices_indices)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
try:
intermediate_cert_filename = "demoCA/newcerts/intermediate_cert.pem"
with open(intermediate_cert_filename, "r") as intermediate_pem:
intermediate_cert_content = intermediate_pem.read()
attestation_mechanism = AttestationMechanism.create_with_x509_signing_certs(
intermediate_cert_content
)
enrollment_group_provisioning_model = EnrollmentGroup.create(
group_id, attestation=attestation_mechanism, reprovision_policy=reprovision_policy
)
service_client.create_or_update(enrollment_group_provisioning_model)
count = 0
common_device_key_input_file = "demoCA/private/device_key"
common_device_cert_input_file = "demoCA/newcerts/device_cert"
common_device_inter_cert_chain_file = "demoCA/newcerts/out_inter_device_chain_cert"
for index in devices_indices:
count = count + 1
device_id = common_device_id + str(index)
device_key_input_file = common_device_key_input_file + str(index) + ".pem"
device_cert_input_file = common_device_cert_input_file + str(index) + ".pem"
device_inter_cert_chain_file = common_device_inter_cert_chain_file + str(index) + ".pem"
filenames = [device_cert_input_file, intermediate_cert_filename]
with open(device_inter_cert_chain_file, "w") as outfile:
for fname in filenames:
with open(fname) as infile:
outfile.write(infile.read())
registration_result = await result_from_register(
registration_id=device_id,
device_cert_file=device_inter_cert_chain_file,
device_key_file=device_key_input_file,
)
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
# Make sure space is okay. The following line must be outside for loop.
assert count == device_count_in_group
finally:
service_client.delete_enrollment_group_by_param(group_id)
@pytest.mark.skip(
reason="The enrollment is never properly created on the pipeline and it is always created without any CA reference and eventually the registration fails"
)
@pytest.mark.it(
"A group of devices get provisioned to the linked IoTHub with device_ids equal to the individual registration_ids inside a group enrollment that has been created with an already uploaded ca cert X509 authentication"
)
async def test_group_of_devices_register_with_no_device_id_for_a_x509_ca_authentication_group_enrollment():
group_id = "e2e-ca-ilvermorny"
common_device_id = device_common_name
devices_indices = type_to_device_indices.get("group_ca")
device_count_in_group = len(devices_indices)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
try:
DPS_GROUP_CA_CERT = os.getenv("PROVISIONING_ROOT_CERT")
attestation_mechanism = AttestationMechanism.create_with_x509_ca_refs(
ref1=DPS_GROUP_CA_CERT
)
enrollment_group_provisioning_model = EnrollmentGroup.create(
group_id, attestation=attestation_mechanism, reprovision_policy=reprovision_policy
)
service_client.create_or_update(enrollment_group_provisioning_model)
count = 0
intermediate_cert_filename = "demoCA/newcerts/intermediate_cert.pem"
common_device_key_input_file = "demoCA/private/device_key"
common_device_cert_input_file = "demoCA/newcerts/device_cert"
common_device_inter_cert_chain_file = "demoCA/newcerts/out_inter_device_chain_cert"
for index in devices_indices:
count = count + 1
device_id = common_device_id + str(index)
device_key_input_file = common_device_key_input_file + str(index) + ".pem"
device_cert_input_file = common_device_cert_input_file + str(index) + ".pem"
device_inter_cert_chain_file = common_device_inter_cert_chain_file + str(index) + ".pem"
filenames = [device_cert_input_file, intermediate_cert_filename]
with open(device_inter_cert_chain_file, "w") as outfile:
for fname in filenames:
with open(fname) as infile:
logging.debug("Filename is {}".format(fname))
content = infile.read()
logging.debug(content)
outfile.write(content)
registration_result = await result_from_register(
registration_id=device_id,
device_cert_file=device_inter_cert_chain_file,
device_key_file=device_key_input_file,
)
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
# Make sure space is okay. The following line must be outside for loop.
assert count == device_count_in_group
finally:
service_client.delete_enrollment_group_by_param(group_id)
def assert_device_provisioned(device_id, registration_result):
"""
Assert that the device has been provisioned correctly to iothub from the registration result as well as from the device registry
:param device_id: The device id
:param registration_result: The registration result
"""
assert registration_result.status == "assigned"
assert registration_result.registration_state.device_id == device_id
assert registration_result.registration_state.assigned_hub == linked_iot_hub
device = device_registry_helper.get_device(device_id)
assert device is not None
assert device.authentication.type == "selfSigned"
assert device.device_id == device_id
def create_individual_enrollment_with_x509_client_certs(device_index, device_id=None):
registration_id = device_common_name + str(device_index)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
device_cert_input_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
with open(device_cert_input_file, "r") as in_device_cert:
device_cert_content = in_device_cert.read()
attestation_mechanism = AttestationMechanism.create_with_x509_client_certs(device_cert_content)
individual_provisioning_model = IndividualEnrollment.create(
attestation=attestation_mechanism,
registration_id=registration_id,
reprovision_policy=reprovision_policy,
device_id=device_id,
)
return service_client.create_or_update(individual_provisioning_model)
async def result_from_register(registration_id, device_cert_file, device_key_file):
x509 = X509(cert_file=device_cert_file, key_file=device_key_file, pass_phrase=device_password)
provisioning_device_client = ProvisioningDeviceClient.create_from_x509_certificate(
provisioning_host=PROVISIONING_HOST,
registration_id=registration_id,
id_scope=ID_SCOPE,
x509=x509,
)
return await provisioning_device_client.register()

Просмотреть файл

@ -0,0 +1,120 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure_provisioning_e2e.service_helper import Helper, connection_string_to_hostname
from azure.iot.device.aio import ProvisioningDeviceClient
from provisioningserviceclient import ProvisioningServiceClient, IndividualEnrollment
from provisioningserviceclient.protocol.models import AttestationMechanism, ReprovisionPolicy
import pytest
import logging
import os
pytestmark = pytest.mark.asyncio
logging.basicConfig(level=logging.DEBUG)
PROVISIONING_HOST = os.getenv("PROVISIONING_DEVICE_ENDPOINT")
ID_SCOPE = os.getenv("PROVISIONING_DEVICE_IDSCOPE")
conn_str = os.getenv("PROVISIONING_SERVICE_CONNECTION_STRING")
service_client = ProvisioningServiceClient.create_from_connection_string(
os.getenv("PROVISIONING_SERVICE_CONNECTION_STRING")
)
service_client = ProvisioningServiceClient.create_from_connection_string(conn_str)
device_registry_helper = Helper(os.getenv("IOTHUB_CONNECTION_STRING"))
linked_iot_hub = connection_string_to_hostname(os.getenv("IOTHUB_CONNECTION_STRING"))
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the device_id equal to the registration_id of the individual enrollment that has been created with a symmetric key authentication"
)
async def test_device_register_with_no_device_id_for_a_symmetric_key_individual_enrollment():
try:
individual_enrollment_record = create_individual_enrollment("e2e-dps-legilimens")
registration_id = individual_enrollment_record.registration_id
symmetric_key = individual_enrollment_record.attestation.symmetric_key.primary_key
registration_result = await result_from_register(registration_id, symmetric_key)
assert_device_provisioned(
device_id=registration_id, registration_result=registration_result
)
device_registry_helper.try_delete_device(registration_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the user supplied device_id different from the registration_id of the individual enrollment that has been created with a symmetric key authentication"
)
async def test_device_register_with_device_id_for_a_symmetric_key_individual_enrollment():
device_id = "e2edpsgoldensnitch"
try:
individual_enrollment_record = create_individual_enrollment(
registration_id="e2e-dps-levicorpus", device_id=device_id
)
registration_id = individual_enrollment_record.registration_id
symmetric_key = individual_enrollment_record.attestation.symmetric_key.primary_key
registration_result = await result_from_register(registration_id, symmetric_key)
assert device_id != registration_id
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
finally:
pass
service_client.delete_individual_enrollment_by_param(registration_id)
def create_individual_enrollment(registration_id, device_id=None):
"""
Create an individual enrollment record using the service client
:param registration_id: The registration id of the enrollment
:param device_id: Optional device id
:return: And individual enrollment record
"""
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
attestation_mechanism = AttestationMechanism(type="symmetricKey")
individual_provisioning_model = IndividualEnrollment.create(
attestation=attestation_mechanism,
registration_id=registration_id,
device_id=device_id,
reprovision_policy=reprovision_policy,
)
return service_client.create_or_update(individual_provisioning_model)
def assert_device_provisioned(device_id, registration_result):
"""
Assert that the device has been provisioned correctly to iothub from the registration result as well as from the device registry
:param device_id: The device id
:param registration_result: The registration result
"""
assert registration_result.status == "assigned"
assert registration_result.registration_state.device_id == device_id
assert registration_result.registration_state.assigned_hub == linked_iot_hub
device = device_registry_helper.get_device(device_id)
assert device is not None
assert device.authentication.type == "sas"
assert device.device_id == device_id
# TODO Eventually should return result after the APi changes
async def result_from_register(registration_id, symmetric_key):
provisioning_device_client = ProvisioningDeviceClient.create_from_symmetric_key(
provisioning_host=PROVISIONING_HOST,
registration_id=registration_id,
id_scope=ID_SCOPE,
symmetric_key=symmetric_key,
)
return await provisioning_device_client.register()

Просмотреть файл

@ -0,0 +1,288 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure_provisioning_e2e.service_helper import Helper, connection_string_to_hostname
from azure.iot.device import ProvisioningDeviceClient
from azure.iot.device.common import X509
from provisioningserviceclient import (
ProvisioningServiceClient,
IndividualEnrollment,
EnrollmentGroup,
)
from provisioningserviceclient.protocol.models import AttestationMechanism, ReprovisionPolicy
import pytest
import logging
import os
from scripts.create_x509_chain_pipeline import (
call_intermediate_cert_creation_from_pipeline,
call_device_cert_creation_from_pipeline,
delete_directories_certs_created_from_pipeline,
)
logging.basicConfig(level=logging.DEBUG)
intermediate_common_name = "e2edpswingardium"
intermediate_password = "leviosa"
device_common_name = "e2edpsexpecto"
device_password = "patronum"
service_client = ProvisioningServiceClient.create_from_connection_string(
os.getenv("PROVISIONING_SERVICE_CONNECTION_STRING")
)
device_registry_helper = Helper(os.getenv("IOTHUB_CONNECTION_STRING"))
linked_iot_hub = connection_string_to_hostname(os.getenv("IOTHUB_CONNECTION_STRING"))
PROVISIONING_HOST = os.getenv("PROVISIONING_DEVICE_ENDPOINT")
ID_SCOPE = os.getenv("PROVISIONING_DEVICE_IDSCOPE")
certificate_count = 8
type_to_device_indices = {
"individual_with_device_id": [1],
"individual_no_device_id": [2],
"group_intermediate": [3, 4, 5],
"group_ca": [6, 7, 8],
}
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
logging.info("set up certificates before cert related tests")
call_intermediate_cert_creation_from_pipeline(
common_name=intermediate_common_name,
ca_password=os.getenv("PROVISIONING_ROOT_PASSWORD"),
intermediate_password=intermediate_password,
)
call_device_cert_creation_from_pipeline(
common_name=device_common_name,
intermediate_password=intermediate_password,
device_password=device_password,
device_count=certificate_count,
)
def after_module():
logging.info("tear down certificates after cert related tests")
delete_directories_certs_created_from_pipeline()
request.addfinalizer(after_module)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the user supplied device_id different from the registration_id of the individual enrollment that has been created with a selfsigned X509 authentication"
)
def test_device_register_with_device_id_for_a_x509_individual_enrollment():
device_id = "e2edpsflyingfeather"
device_index = type_to_device_indices.get("individual_with_device_id")[0]
try:
individual_enrollment_record = create_individual_enrollment_with_x509_client_certs(
device_index=device_index, device_id=device_id
)
registration_id = individual_enrollment_record.registration_id
device_cert_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
device_key_file = "demoCA/private/device_key" + str(device_index) + ".pem"
registration_result = result_from_register(
registration_id, device_cert_file, device_key_file
)
assert device_id != registration_id
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with device_id equal to the registration_id of the individual enrollment that has been created with a selfsigned X509 authentication"
)
def test_device_register_with_no_device_id_for_a_x509_individual_enrollment():
device_index = type_to_device_indices.get("individual_no_device_id")[0]
try:
individual_enrollment_record = create_individual_enrollment_with_x509_client_certs(
device_index=device_index
)
registration_id = individual_enrollment_record.registration_id
device_cert_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
device_key_file = "demoCA/private/device_key" + str(device_index) + ".pem"
registration_result = result_from_register(
registration_id, device_cert_file, device_key_file
)
assert_device_provisioned(
device_id=registration_id, registration_result=registration_result
)
device_registry_helper.try_delete_device(registration_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A group of devices get provisioned to the linked IoTHub with device_ids equal to the individual registration_ids inside a group enrollment that has been created with intermediate X509 authentication"
)
def test_group_of_devices_register_with_no_device_id_for_a_x509_intermediate_authentication_group_enrollment():
group_id = "e2e-intermediate-hogwarts"
common_device_id = device_common_name
devices_indices = type_to_device_indices.get("group_intermediate")
device_count_in_group = len(devices_indices)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
try:
intermediate_cert_filename = "demoCA/newcerts/intermediate_cert.pem"
with open(intermediate_cert_filename, "r") as intermediate_pem:
intermediate_cert_content = intermediate_pem.read()
attestation_mechanism = AttestationMechanism.create_with_x509_signing_certs(
intermediate_cert_content
)
enrollment_group_provisioning_model = EnrollmentGroup.create(
group_id, attestation=attestation_mechanism, reprovision_policy=reprovision_policy
)
service_client.create_or_update(enrollment_group_provisioning_model)
count = 0
common_device_key_input_file = "demoCA/private/device_key"
common_device_cert_input_file = "demoCA/newcerts/device_cert"
common_device_inter_cert_chain_file = "demoCA/newcerts/out_inter_device_chain_cert"
for index in devices_indices:
count = count + 1
device_id = common_device_id + str(index)
device_key_input_file = common_device_key_input_file + str(index) + ".pem"
device_cert_input_file = common_device_cert_input_file + str(index) + ".pem"
device_inter_cert_chain_file = common_device_inter_cert_chain_file + str(index) + ".pem"
filenames = [device_cert_input_file, intermediate_cert_filename]
with open(device_inter_cert_chain_file, "w") as outfile:
for fname in filenames:
with open(fname) as infile:
outfile.write(infile.read())
registration_result = result_from_register(
registration_id=device_id,
device_cert_file=device_inter_cert_chain_file,
device_key_file=device_key_input_file,
)
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
# Make sure space is okay. The following line must be outside for loop.
assert count == device_count_in_group
finally:
service_client.delete_enrollment_group_by_param(group_id)
@pytest.mark.skip(
reason="The enrollment is never properly created on the pipeline and it is always created without any CA reference and eventually the registration fails"
)
@pytest.mark.it(
"A group of devices get provisioned to the linked IoTHub with device_ids equal to the individual registration_ids inside a group enrollment that has been created with an already uploaded ca cert X509 authentication"
)
def test_group_of_devices_register_with_no_device_id_for_a_x509_ca_authentication_group_enrollment():
group_id = "e2e-ca-beauxbatons"
common_device_id = device_common_name
devices_indices = type_to_device_indices.get("group_ca")
device_count_in_group = len(devices_indices)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
try:
DPS_GROUP_CA_CERT = os.getenv("PROVISIONING_ROOT_CERT")
attestation_mechanism = AttestationMechanism.create_with_x509_ca_refs(
ref1=DPS_GROUP_CA_CERT
)
enrollment_group_provisioning_model = EnrollmentGroup.create(
group_id, attestation=attestation_mechanism, reprovision_policy=reprovision_policy
)
service_client.create_or_update(enrollment_group_provisioning_model)
count = 0
intermediate_cert_filename = "demoCA/newcerts/intermediate_cert.pem"
common_device_key_input_file = "demoCA/private/device_key"
common_device_cert_input_file = "demoCA/newcerts/device_cert"
common_device_inter_cert_chain_file = "demoCA/newcerts/out_inter_device_chain_cert"
for index in devices_indices:
count = count + 1
device_id = common_device_id + str(index)
device_key_input_file = common_device_key_input_file + str(index) + ".pem"
device_cert_input_file = common_device_cert_input_file + str(index) + ".pem"
device_inter_cert_chain_file = common_device_inter_cert_chain_file + str(index) + ".pem"
filenames = [device_cert_input_file, intermediate_cert_filename]
with open(device_inter_cert_chain_file, "w") as outfile:
for fname in filenames:
with open(fname) as infile:
logging.debug("Filename is {}".format(fname))
content = infile.read()
logging.debug(content)
outfile.write(content)
registration_result = result_from_register(
registration_id=device_id,
device_cert_file=device_inter_cert_chain_file,
device_key_file=device_key_input_file,
)
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
# Make sure space is okay. The following line must be outside for loop.
assert count == device_count_in_group
finally:
service_client.delete_enrollment_group_by_param(group_id)
def assert_device_provisioned(device_id, registration_result):
"""
Assert that the device has been provisioned correctly to iothub from the registration result as well as from the device registry
:param device_id: The device id
:param registration_result: The registration result
"""
assert registration_result.status == "assigned"
assert registration_result.registration_state.device_id == device_id
assert registration_result.registration_state.assigned_hub == linked_iot_hub
device = device_registry_helper.get_device(device_id)
assert device is not None
assert device.authentication.type == "selfSigned"
assert device.device_id == device_id
def create_individual_enrollment_with_x509_client_certs(device_index, device_id=None):
registration_id = device_common_name + str(device_index)
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
device_cert_input_file = "demoCA/newcerts/device_cert" + str(device_index) + ".pem"
with open(device_cert_input_file, "r") as in_device_cert:
device_cert_content = in_device_cert.read()
attestation_mechanism = AttestationMechanism.create_with_x509_client_certs(device_cert_content)
individual_provisioning_model = IndividualEnrollment.create(
attestation=attestation_mechanism,
registration_id=registration_id,
reprovision_policy=reprovision_policy,
device_id=device_id,
)
return service_client.create_or_update(individual_provisioning_model)
def result_from_register(registration_id, device_cert_file, device_key_file):
x509 = X509(cert_file=device_cert_file, key_file=device_key_file, pass_phrase=device_password)
provisioning_device_client = ProvisioningDeviceClient.create_from_x509_certificate(
provisioning_host=PROVISIONING_HOST,
registration_id=registration_id,
id_scope=ID_SCOPE,
x509=x509,
)
return provisioning_device_client.register()

Просмотреть файл

@ -0,0 +1,115 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from azure_provisioning_e2e.service_helper import Helper, connection_string_to_hostname
from azure.iot.device import ProvisioningDeviceClient
from provisioningserviceclient import ProvisioningServiceClient, IndividualEnrollment
from provisioningserviceclient.protocol.models import AttestationMechanism, ReprovisionPolicy
import pytest
import logging
import os
logging.basicConfig(level=logging.DEBUG)
PROVISIONING_HOST = os.getenv("PROVISIONING_DEVICE_ENDPOINT")
ID_SCOPE = os.getenv("PROVISIONING_DEVICE_IDSCOPE")
service_client = ProvisioningServiceClient.create_from_connection_string(
os.getenv("PROVISIONING_SERVICE_CONNECTION_STRING")
)
device_registry_helper = Helper(os.getenv("IOTHUB_CONNECTION_STRING"))
linked_iot_hub = connection_string_to_hostname(os.getenv("IOTHUB_CONNECTION_STRING"))
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the device_id equal to the registration_id of the individual enrollment that has been created with a symmetric key authentication"
)
def test_device_register_with_no_device_id_for_a_symmetric_key_individual_enrollment():
try:
individual_enrollment_record = create_individual_enrollment(
"e2e-dps-underthewhompingwillow"
)
registration_id = individual_enrollment_record.registration_id
symmetric_key = individual_enrollment_record.attestation.symmetric_key.primary_key
registration_result = result_from_register(registration_id, symmetric_key)
assert_device_provisioned(
device_id=registration_id, registration_result=registration_result
)
device_registry_helper.try_delete_device(registration_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
@pytest.mark.it(
"A device gets provisioned to the linked IoTHub with the user supplied device_id different from the registration_id of the individual enrollment that has been created with a symmetric key authentication"
)
def test_device_register_with_device_id_for_a_symmetric_key_individual_enrollment():
device_id = "e2edpstommarvoloriddle"
try:
individual_enrollment_record = create_individual_enrollment(
registration_id="e2e-dps-prioriincantatem", device_id=device_id
)
registration_id = individual_enrollment_record.registration_id
symmetric_key = individual_enrollment_record.attestation.symmetric_key.primary_key
registration_result = result_from_register(registration_id, symmetric_key)
assert device_id != registration_id
assert_device_provisioned(device_id=device_id, registration_result=registration_result)
device_registry_helper.try_delete_device(device_id)
finally:
service_client.delete_individual_enrollment_by_param(registration_id)
def create_individual_enrollment(registration_id, device_id=None):
"""
Create an individual enrollment record using the service client
:param registration_id: The registration id of the enrollment
:param device_id: Optional device id
:return: And individual enrollment record
"""
reprovision_policy = ReprovisionPolicy(migrate_device_data=True)
attestation_mechanism = AttestationMechanism(type="symmetricKey")
individual_provisioning_model = IndividualEnrollment.create(
attestation=attestation_mechanism,
registration_id=registration_id,
device_id=device_id,
reprovision_policy=reprovision_policy,
)
return service_client.create_or_update(individual_provisioning_model)
def assert_device_provisioned(device_id, registration_result):
"""
Assert that the device has been provisioned correctly to iothub from the registration result as well as from the device registry
:param device_id: The device id
:param registration_result: The registration result
"""
assert registration_result.status == "assigned"
assert registration_result.registration_state.device_id == device_id
assert registration_result.registration_state.assigned_hub == linked_iot_hub
device = device_registry_helper.get_device(device_id)
assert device is not None
assert device.authentication.type == "sas"
assert device.device_id == device_id
def result_from_register(registration_id, symmetric_key):
provisioning_device_client = ProvisioningDeviceClient.create_from_symmetric_key(
provisioning_host=PROVISIONING_HOST,
registration_id=registration_id,
id_scope=ID_SCOPE,
symmetric_key=symmetric_key,
)
return provisioning_device_client.register()

Просмотреть файл

@ -5,3 +5,4 @@ pytest-testdox>=1.1.1
pytest-cov
mock #remove this as soon as no references to it remain in the code
flake8
azure-iothub-provisioningserviceclient >= 1.2.0 # Only needed for end to end tests for DPS

1
scripts/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__)

Просмотреть файл

@ -12,7 +12,7 @@ def create_custom_config():
# Best options is to have the location of openssl config file in an env variable
# The openssl config file extension could be "cfg" or "cnf"
config_path = os.getenv("OPENSSLCONFIG")
config_path = os.getenv("OPENSSL_CONF")
with open(config_path, "r") as openssl_config:
config = openssl_config.read()
lines = config.splitlines()
@ -53,28 +53,56 @@ def create_custom_config():
local_file.write("\n".join(list_of_lines) + "\n")
def create_verification_cert(nonce):
os.system("openssl genrsa -out demoCA/private/verification_key.pem" + " " + str(key_size))
def create_verification_cert(nonce, root_verify):
print("Done generating verification key")
subject = "//C=US/CN=" + nonce
os.system(
"openssl req -key demoCA/private/verification_key.pem"
+ " "
+ "-new -out demoCA/newcerts/verification_csr.pem -subj "
+ subject
)
print("Done generating verification CSR")
if not root_verify:
os.system(
"openssl genrsa -out demoCA/private/verification_inter_key.pem" + " " + str(key_size)
)
os.system(
"openssl req -key demoCA/private/verification_inter_key.pem"
+ " "
+ "-new -out demoCA/newcerts/verification_inter_csr.pem -subj "
+ subject
)
print("Done generating verification CSR for intermediate")
os.system(
"openssl x509 -req -in demoCA/newcerts/verification_csr.pem"
+ " "
+ "-CA demoCA/newcerts/ca_cert.pem -CAkey demoCA/private/ca_key.pem -passin pass:"
+ ca_password
+ " "
+ "-CAcreateserial -out demoCA/newcerts/verification_cert.pem -days 300 -sha256"
)
print("Done generating verification certificate. Upload to IoT Hub to verify")
os.system(
"openssl x509 -req -in demoCA/newcerts/verification_inter_csr.pem"
+ " "
+ "-CA demoCA/newcerts/intermediate_cert.pem -CAkey demoCA/private/intermediate_key.pem -passin pass:"
+ intermediate_password
+ " "
+ "-CAcreateserial -out demoCA/newcerts/verification_inter_cert.pem -days 300 -sha256"
)
print(
"Done generating verification certificate for intermediate. Upload to IoT Hub to verify"
)
else:
os.system(
"openssl genrsa -out demoCA/private/verification_root_key.pem" + " " + str(key_size)
)
os.system(
"openssl req -key demoCA/private/verification_root_key.pem"
+ " "
+ "-new -out demoCA/newcerts/verification_root_csr.pem -subj "
+ subject
)
print("Done generating verification CSR")
os.system(
"openssl x509 -req -in demoCA/newcerts/verification_root_csr.pem"
+ " "
+ "-CA demoCA/newcerts/ca_cert.pem -CAkey demoCA/private/ca_key.pem -passin pass:"
+ ca_password
+ " "
+ "-CAcreateserial -out demoCA/newcerts/verification_root_cert.pem -days 300 -sha256"
)
print("Done generating verification certificate. Upload to IoT Hub to verify")
def create_directories():
@ -86,7 +114,9 @@ def create_directories():
os.mkdir("demoCA/newcerts")
def create_certificate_chain(common_name, ca_password, intermediate_password, device_password):
def create_certificate_chain(
common_name, ca_password, intermediate_password, device_password, device_count
):
os.system(
"openssl genrsa -aes256 -out demoCA/private/ca_key.pem -passout pass:"
+ ca_password
@ -103,7 +133,9 @@ def create_certificate_chain(common_name, ca_password, intermediate_password, de
"openssl req -config demoCA/openssl.cnf -key demoCA/private/ca_key.pem -passin pass:"
+ ca_password
+ " "
+ "-new -x509 -days 300 -sha256 -extensions v3_ca -out demoCA/newcerts/ca_cert.pem -subj "
+ "-new -x509 -days "
+ str(days)
+ " -sha256 -extensions v3_ca -out demoCA/newcerts/ca_cert.pem -subj "
+ subject
)
print("Done generating root certificate")
@ -128,31 +160,57 @@ def create_certificate_chain(common_name, ca_password, intermediate_password, de
"openssl ca -config demoCA/openssl.cnf -in demoCA/newcerts/intermediate_csr.pem -out demoCA/newcerts/intermediate_cert.pem -keyfile demoCA/private/ca_key.pem -cert demoCA/newcerts/ca_cert.pem -passin pass:"
+ ca_password
+ " "
+ "-extensions v3_ca -days 30 -notext -md sha256 -batch"
+ "-extensions v3_ca -days "
+ str(days)
+ " -notext -md sha256 -batch"
)
print("Done generating intermediate certificate")
for index in range(0, device_count):
index = index + 1
print("creating device certificate for " + str(index))
create_leaf_certificates(index, device_password)
def create_leaf_certificates(index, device_password):
key_file_name = "device_key" + str(index) + ".pem"
csr_file_name = "device_csr" + str(index) + ".pem"
cert_file_name = "device_cert" + str(index) + ".pem"
os.system(
"openssl genrsa -aes256 -out demoCA/private/device_key.pem -passout pass:"
"openssl genrsa -aes256 -out demoCA/private/"
+ key_file_name
+ " -passout pass:"
+ device_password
+ " "
+ str(key_size)
)
print("Done generating device key")
subject = "//C=US/CN=device" + common_name
subject = "//C=US/CN=device" + common_name + str(index)
os.system(
"openssl req -config demoCA/openssl.cnf -new -sha256 -key demoCA/private/device_key.pem -passin pass:"
"openssl req -config demoCA/openssl.cnf -new -sha256 -key demoCA/private/"
+ key_file_name
+ " -passin pass:"
+ device_password
+ " "
+ "-out demoCA/newcerts/device_csr.pem -subj "
+ "-out demoCA/newcerts/"
+ csr_file_name
+ " -subj "
+ subject
)
print("Done generating device CSR")
os.system(
"openssl ca -config demoCA/openssl.cnf -in demoCA/newcerts/device_csr.pem -out demoCA/newcerts/device_cert.pem -keyfile demoCA/private/intermediate_key.pem -cert demoCA/newcerts/intermediate_cert.pem -passin pass:"
"openssl ca -config demoCA/openssl.cnf -in demoCA/newcerts/"
+ csr_file_name
+ " -out demoCA/newcerts/"
+ cert_file_name
+ " -keyfile demoCA/private/intermediate_key.pem -cert demoCA/newcerts/intermediate_cert.pem -passin pass:"
+ intermediate_password
+ " "
+ "-extensions usr_cert -days 3 -notext -md sha256 -batch"
+ "-extensions usr_cert -days "
+ str(days)
+ " -notext -md sha256 -batch"
)
print("Done generating device certificate")
@ -182,6 +240,11 @@ if __name__ == "__main__":
parser.add_argument(
"--device-password", type=str, help="device key password. If omitted it will be prompted."
)
parser.add_argument(
"--device-count", type=str, help="Number of devices that present in a group. Default is 1."
)
parser.add_argument(
"--mode",
type=str,
@ -192,6 +255,11 @@ if __name__ == "__main__":
type=str,
help="thumprint generated from iot hub certificates. During verification mode if omitted it will be prompted.",
)
parser.add_argument(
"--root-verify",
type=str,
help="The boolean value to enter in case it is the root or intermediate verification. By default it is True meaning root verifictaion. If veriication of intermediate certification is needed please enter False ",
)
args = parser.parse_args()
if args.key_size:
key_size = args.key_size
@ -207,11 +275,6 @@ if __name__ == "__main__":
else:
common_name = "random"
if args.ca_password:
ca_password = args.ca_password
else:
ca_password = getpass.getpass("Enter pass phrase for root key: ")
if args.mode:
if args.mode == "verification":
mode = "verification"
@ -228,6 +291,10 @@ if __name__ == "__main__":
create_custom_config()
if mode == "non-verification":
if args.ca_password:
ca_password = args.ca_password
else:
ca_password = getpass.getpass("Enter pass phrase for root key: ")
if args.intermediate_password:
intermediate_password = args.intermediate_password
else:
@ -236,14 +303,41 @@ if __name__ == "__main__":
device_password = args.device_password
else:
device_password = getpass.getpass("Enter pass phrase for device key: ")
if args.device_count:
device_count = args.device_count
else:
device_count = 1
else:
if args.nonce:
nonce = args.nonce
else:
nonce = getpass.getpass("Enter nonce for verification mode")
if args.root_verify:
lower_root_verify = args.root_verify.lower()
if lower_root_verify == "false":
root_verify = False
if args.intermediate_password:
intermediate_password = args.intermediate_password
else:
intermediate_password = getpass.getpass(
"Enter pass phrase for intermediate key: "
)
else:
root_verify = True
if args.ca_password:
ca_password = args.ca_password
else:
ca_password = getpass.getpass("Enter pass phrase for root key: ")
else:
root_verify = True
if args.ca_password:
ca_password = args.ca_password
else:
ca_password = getpass.getpass("Enter pass phrase for root key: ")
if mode == "verification":
create_verification_cert(nonce)
create_verification_cert(nonce, root_verify)
else:
create_directories()
create_certificate_chain(
@ -251,4 +345,5 @@ if __name__ == "__main__":
ca_password=ca_password,
intermediate_password=intermediate_password,
device_password=device_password,
device_count=int(device_count),
)

Просмотреть файл

@ -0,0 +1,436 @@
import os
import re
import base64
import logging
import shutil
import subprocess
# TODO : Do we change all print statements to logging ?
logging.basicConfig(level=logging.DEBUG)
def create_custom_config():
# The paths from different OS is different.
# For example OS X path is "/usr/local/etc/openssl/openssl.cnf"
# Windows path is "C:/Openssl/bin//openssl.cnf" etc
# Best options is to have the location of openssl config file in an env variable
# The openssl config file extension could be "cfg" or "cnf"
config_path = os.getenv("OPENSSL_CONF")
with open(config_path, "r") as openssl_config:
config = openssl_config.read()
lines = config.splitlines()
policy_loose_found = False
policy_any_found = False
policy_any_regex = re.compile(r"\s*\[\s*policy_anything\s*\]\s*")
# First, try to find policy_anything in the openssl config file
for number, line in enumerate(lines):
if re.search(policy_any_regex, line):
policy_any_found = True
break
# Of not found the try the search with policy_loose
if not policy_any_found:
policy_loose_regex = re.compile(r"\s*\[\s*policy_loose\s*\]\s*")
for number, line in enumerate(lines):
if re.search(policy_loose_regex, line):
policy_loose_found = True
break
list_of_lines = list()
ca_default_regex = re.compile(r"\s*\[\s*CA_default\s*\]\s*")
ca_default_section_found = False
policy_regex = re.compile(r"\s*\s*(policy\s*=\s*policy_strict|policy\s*=\s*policy_match)")
with open(config_path, "r") as change_openssl_config:
for line in change_openssl_config:
if not ca_default_section_found and re.search(ca_default_regex, line):
ca_default_section_found = True
if ca_default_section_found and re.search(policy_regex, line):
if policy_loose_found:
line = policy_regex.sub("policy = policy_loose", line)
if policy_any_found:
line = policy_regex.sub("policy = policy_anything", line)
list_of_lines.append(line.strip())
with open("demoCA/openssl.cnf", "w") as local_file:
local_file.write("\n".join(list_of_lines) + "\n")
def create_verification_cert(
nonce, root_verify, ca_password="hogwarts", intermediate_password="hogwartsi", key_size=4096
):
print("Done generating verification key")
subject = "//C=US/CN=" + nonce
if not root_verify:
os.system(
"openssl genrsa -out demoCA/private/verification_inter_key.pem" + " " + str(key_size)
)
os.system(
"openssl req -key demoCA/private/verification_inter_key.pem"
+ " "
+ "-new -out demoCA/newcerts/verification_inter_csr.pem -subj "
+ subject
)
print("Done generating verification CSR for intermediate")
os.system(
"openssl x509 -req -in demoCA/newcerts/verification_inter_csr.pem"
+ " "
+ "-CA demoCA/newcerts/intermediate_cert.pem -CAkey demoCA/private/intermediate_key.pem -passin pass:"
+ intermediate_password
+ " "
+ "-CAcreateserial -out demoCA/newcerts/verification_inter_cert.pem -days 300 -sha256"
)
print(
"Done generating verification certificate for intermediate. Upload to IoT Hub to verify"
)
else:
os.system(
"openssl genrsa -out demoCA/private/verification_root_key.pem" + " " + str(key_size)
)
os.system(
"openssl req -key demoCA/private/verification_root_key.pem"
+ " "
+ "-new -out demoCA/newcerts/verification_root_csr.pem -subj "
+ subject
)
print("Done generating verification CSR")
os.system(
"openssl x509 -req -in demoCA/newcerts/verification_root_csr.pem"
+ " "
+ "-CA demoCA/newcerts/ca_cert.pem -CAkey demoCA/private/ca_key.pem -passin pass:"
+ ca_password
+ " "
+ "-CAcreateserial -out demoCA/newcerts/verification_root_cert.pem -days 300 -sha256"
)
print("Done generating verification certificate. Upload to IoT Hub to verify")
def create_directories_and_prereq_files():
# os.system("type nul > demoCA/index.txt")
# os.system("type nul > demoCA/index.txt.attr")
os.system("touch demoCA/index.txt")
# os.system("touch demoCA/index.txt.attr")
os.system("echo 1000 > demoCA/serial")
# Create this folder as configuration file makes new keys go here
os.mkdir("demoCA/private")
# Create this folder as configuration file makes new certificates go here
os.mkdir("demoCA/newcerts")
def create_root(common_name, ca_password="hogwarts", key_size=4096, days=3650):
os.system(
"openssl genrsa -aes256 -out demoCA/private/ca_key.pem -passout pass:"
+ ca_password
+ " "
+ str(key_size)
)
print("Done generating root key")
# We need another argument like country as there is always error regarding the first argument
# Subject Attribute /C has no known NID, skipped
# So if the first arg is common name the error comes due to common name nad common name is not taken
subject = "//C=US/CN=" + common_name
os.system(
"openssl req -config demoCA/openssl.cnf -key demoCA/private/ca_key.pem -passin pass:"
+ ca_password
+ " "
+ "-new -x509 -days "
+ str(days)
+ " -sha256 -extensions v3_ca -out demoCA/newcerts/ca_cert.pem -subj "
+ subject
)
print("Done generating root certificate")
def create_intermediate(
common_name,
pipeline,
ca_password="hogwarts",
intermediate_password="hogwartsi",
key_size=4096,
days=365,
):
if pipeline:
ca_cert = os.getenv("PROVISIONING_ROOT_CERT")
ca_key = os.getenv("PROVISIONING_ROOT_CERT_KEY")
in_cert_file_path = "ca_cert.pem"
in_key_file_path = "ca_key.pem"
with open(in_cert_file_path, "w") as out_ca_pem:
cert = str(base64.b64decode(ca_cert), "ascii")
out_ca_pem.write(cert)
if os.path.exists(in_cert_file_path):
print("root cert decoded and created")
else:
print("root cert NOT decoded and created")
with open(in_key_file_path, "w") as out_ca_key:
key = str(base64.b64decode(ca_key), "ascii")
out_ca_key.write(key)
if os.path.exists(in_key_file_path):
print("root key decoded and created")
else:
print("root key NOT decoded and created")
else:
in_cert_file_path = "demoCA/newcerts/ca_cert.pem"
in_key_file_path = "demoCA/private/ca_key.pem"
os.system(
"openssl genrsa -aes256 -out demoCA/private/intermediate_key.pem -passout pass:"
+ intermediate_password
+ " "
+ str(key_size)
)
if os.path.exists("demoCA/private/intermediate_key.pem"):
print("Done generating intermediate key")
else:
print("intermediate key NOT generated")
subject = "/CN=" + common_name
os.system(
"openssl req -config demoCA/openssl.cnf -key demoCA/private/intermediate_key.pem -passin pass:"
+ intermediate_password
+ " "
+ "-new -sha256 -out demoCA/newcerts/intermediate_csr.pem -subj "
+ subject
)
if os.path.exists("demoCA/newcerts/intermediate_csr.pem"):
print("Done generating intermediate CSR")
else:
print("intermediate csr NOT generated")
command = [
"openssl",
"ca",
"-config",
"demoCA/openssl.cnf",
"-in",
"demoCA/newcerts/intermediate_csr.pem",
"-out",
"demoCA/newcerts/intermediate_cert.pem",
"-keyfile",
in_key_file_path,
"-cert",
in_cert_file_path,
"-passin",
"pass:" + ca_password,
"-extensions",
"v3_ca",
"-days",
str(days),
"-notext",
"-md",
"sha256",
"-batch",
]
cp = subprocess.run(
command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
print(cp.stdout)
print(cp.stderr)
print(cp.returncode)
if os.path.exists("demoCA/newcerts/intermediate_cert.pem"):
print("Done generating intermediate certificate")
else:
print("intermediate cert NOT generated")
def create_certificate_chain(
common_name,
ca_password,
intermediate_password="hogwartsi",
device_password="hogwartsd",
device_count=1,
pipeline=False,
key_size=4096,
days=365,
):
common_name_for_root = "root" + common_name
create_root(common_name_for_root, ca_password=ca_password, key_size=key_size, days=days * 10)
common_name_for_intermediate = "root" + common_name
create_intermediate(
common_name_for_intermediate,
pipeline=False,
ca_password=ca_password,
intermediate_password=intermediate_password,
key_size=key_size,
days=days,
)
for index in range(0, device_count):
index = index + 1
print("creating device certificate for " + str(index))
common_name_for_all_device = "device" + common_name
create_leaf_certificates(
index,
common_name_for_all_device,
intermediate_password=intermediate_password,
device_password=device_password,
key_size=key_size,
days=days,
)
def create_leaf_certificates(
index,
common_name_for_all_device,
intermediate_password="hogwartsi",
device_password="hogwartsd",
key_size=4096,
days=365,
):
key_file_name = "device_key" + str(index) + ".pem"
csr_file_name = "device_csr" + str(index) + ".pem"
cert_file_name = "device_cert" + str(index) + ".pem"
os.system(
"openssl genrsa -aes256 -out demoCA/private/"
+ key_file_name
+ " -passout pass:"
+ device_password
+ " "
+ str(key_size)
)
if os.path.exists("demoCA/private/" + key_file_name):
print("Done generating device key with filename {filename}".format(filename=key_file_name))
logging.debug(
"Done generating device key with filename {filename}".format(filename=key_file_name)
)
else:
print("device key NOT generated")
subject = "//C=US/CN=" + common_name_for_all_device + str(index)
os.system(
"openssl req -config demoCA/openssl.cnf -new -sha256 -key demoCA/private/"
+ key_file_name
+ " -passin pass:"
+ device_password
+ " "
+ "-out demoCA/newcerts/"
+ csr_file_name
+ " -subj "
+ subject
)
if os.path.exists("demoCA/newcerts/" + csr_file_name):
print("Done generating device CSR with filename {filename}".format(filename=csr_file_name))
logging.debug(
"Done generating device CSR with filename {filename}".format(filename=csr_file_name)
)
else:
print("device CSR NOT generated")
os.system(
"openssl ca -config demoCA/openssl.cnf -in demoCA/newcerts/"
+ csr_file_name
+ " -out demoCA/newcerts/"
+ cert_file_name
+ " -keyfile demoCA/private/intermediate_key.pem -cert demoCA/newcerts/intermediate_cert.pem -passin pass:"
+ intermediate_password
+ " "
+ "-extensions usr_cert -days "
+ str(days)
+ " -notext -md sha256 -batch"
)
if os.path.exists("demoCA/newcerts/" + cert_file_name):
print(
"Done generating device cert with filename {filename}".format(filename=cert_file_name)
)
logging.debug(
"Done generating device cert with filename {filename}".format(filename=cert_file_name)
)
else:
print("device cert NOT generated")
def call_intermediate_cert_creation_from_pipeline(
common_name, ca_password, intermediate_password, key_size=4096, days=30
):
os.system("mkdir demoCA")
create_directories_and_prereq_files()
shutil.copy("config/openssl.cnf", "demoCA/openssl.cnf")
if os.path.exists("demoCA/openssl.cnf"):
print("Configuration file have been copied")
else:
print("Configuration file have NOT been copied")
print("ca_password={ca_password}".format(ca_password=ca_password))
print(
"intermediate_password={intermediate_password}".format(
intermediate_password=intermediate_password
)
)
create_intermediate(
common_name=common_name,
pipeline=True,
ca_password=ca_password,
intermediate_password=intermediate_password,
key_size=key_size,
days=days,
)
def delete_directories_certs_created_from_pipeline():
dirPath = "demoCA"
try:
shutil.rmtree(dirPath)
except Exception:
print("Error while deleting directory")
if os.path.exists("out_ca_cert.pem"):
os.remove("out_ca_cert.pem")
else:
print("The file does not exist")
if os.path.exists("out_ca_key.pem"):
os.remove("out_ca_key.pem")
else:
print("The file does not exist")
if os.path.exists(".rnd"):
os.remove(".rnd")
else:
print("The file does not exist")
def call_device_cert_creation_from_pipeline(
common_name, intermediate_password, device_password, key_size=4096, days=30, device_count=1
):
"""
Creates device certificates from an already created intermediate certificate which exists in
the demoCA/newcerts directory. Assumption that intermediate has already been created with the
name 'intermediate_cert.pem' and the key file is 'intermediate_key.pem'
Hence the intermediate password is known to whoever using this function.
:param common_name: The common name for all device certificates. This will be appended by the
index of the specific device for which the cert is being created.
:param intermediate_password: The intermediate password which should be already known.
:param device_password: The device cert password.
:param key_size: Expected key size. Default is 4096.
:param days: Expected days. Default is 30
:param device_count: The count of devices for which certs needs to be created. Default is 1 device.
"""
for index in range(0, device_count):
index = index + 1
print("creating device certificate for " + str(index))
create_leaf_certificates(
index,
common_name_for_all_device=common_name,
intermediate_password=intermediate_password,
device_password=device_password,
key_size=key_size,
days=days,
)

Просмотреть файл

@ -13,9 +13,6 @@ jobs:
versionSpec: '3.x'
architecture: 'x64'
- script: 'python -m pip install --upgrade pip'
displayName: 'Update pip'
- script: 'python env_setup.py --no_dev'
displayName: 'Prepare environment (install packages + dependencies + tools)'

50
vsts/dps-e2e.yaml Normal file
Просмотреть файл

@ -0,0 +1,50 @@
resources:
- repo: self
#Multi-configuration and multi-agent job options are not exported to YAML. Configure these options using documentation guidance: https://docs.microsoft.com/vsts/pipelines/process/phases
jobs:
- job: 'Test'
pool:
vmImage: 'Ubuntu 16.04'
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.7'
architecture: 'x64'
- script: 'python env_setup.py --no_dev'
displayName: 'Prepare environment (install packages + dev dependencies + test dependencies + tools)'
- script: |
cd $(Agent.WorkFolder)
cd ..
touch .rnd
displayName: 'create RANDFILE file (needed to store seed data) separately due to openssl version issues in the pipeline'
- script: |
cd $(Build.SourcesDirectory)/azure_provisioning_e2e/tests
pytest test_*.py --junitxml=junit/dps-e2e-test-results.xml
displayName: 'Run Specified E2E Test with env variables'
env:
IOTHUB_CONNECTION_STRING: $(PYTHONPREVIEW-LINUX-IOTHUB-CONNECTION-STRING)
IOTHUB_EVENTHUB_CONNECTION_STRING: $(PYTHONPREVIEW-LINUX-IOTHUB-EVENTHUB-CONNECTION-STRING)
IOTHUB_CA_ROOT_CERT: $(PYTHONPREVIEW-LINUX-IOTHUB-CA-ROOT-CERT)
IOTHUB_CA_ROOT_CERT_KEY: $(PYTHONPREVIEW-LINUX-IOTHUB-CA-ROOT-CERT-KEY)
STORAGE_CONNECTION_STRING: $(PYTHONPREVIEW-LINUX-STORAGE-CONNECTION-STRING)
PROVISIONING_DEVICE_ENDPOINT: $(PYTHONPREVIEW-LINUX-DPS-DEVICE-ENDPOINT)
PROVISIONING_SERVICE_CONNECTION_STRING: $(PYTHONPREVIEW-LINUX-DPS-CONNECTION-STRING)
PROVISIONING_DEVICE_IDSCOPE: $(PYTHONPREVIEW-LINUX-DPS-ID-SCOPE)
PROVISIONING_ROOT_CERT: $(PYTHONPREVIEW-LINUX-IOT-PROVISIONING-ROOT-CERT)
PROVISIONING_ROOT_CERT_KEY: $(PYTHONPREVIEW-LINUX-IOT-PROVISIONING-ROOT-CERT-KEY)
PROVISIONING_ROOT_PASSWORD: $(PYTHONPREVIEW-LINUX-ROOT-CERT-PASSWORD)
- task: PublishTestResults@2
displayName: 'Publish Test Results'
condition: always()
inputs:
testResultsFiles: '**/dps-e2e-test-*.xml'
testRunTitle: 'Publish test results for Python $(python.version)'