This commit is contained in:
Yuchao Yan 2024-08-22 07:43:25 +08:00
Родитель 414977934f
Коммит 0e9cbdda89
34 изменённых файлов: 4831 добавлений и 3 удалений

Просмотреть файл

@ -59,7 +59,7 @@ class BlackScriptPlugin(Plugin): # pylint: disable=abstract-method
pass
except: # pylint: disable=bare-except
_LOGGER.error("Error: failed to format %s", file)
raise
# raise
else:
self.write_file(file, file_content)

Просмотреть файл

@ -142,7 +142,7 @@ interface RegenerateFlags {
const SpecialFlags: Record<string, Record<string, any>> = {
azure: {
"generate-test": true,
// "generate-test": true,
"generate-sample": true,
},
};
@ -166,7 +166,7 @@ async function getSubdirectories(baseDir: string, flags: RegenerateFlags): Promi
if (mainTspRelativePath.includes("xml")) return;
// after fix test generation for nested operation group, remove this check
if (mainTspRelativePath.includes("client-operation-group")) return;
// if (mainTspRelativePath.includes("client-operation-group")) return;
const hasMainTsp = await promises
.access(mainTspPath)

Просмотреть файл

@ -0,0 +1,5 @@
# Release History
## 1.0.0b1 (1970-01-01)
- Initial version

Просмотреть файл

@ -0,0 +1,21 @@
Copyright (c) Microsoft Corporation.
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Просмотреть файл

@ -0,0 +1,7 @@
include *.md
include LICENSE
include client/structure/service/py.typed
recursive-include tests *.py
recursive-include samples *.py *.md
include client/__init__.py
include client/structure/__init__.py

Просмотреть файл

@ -0,0 +1,45 @@
# Client Structure Service client library for Python
<!-- write necessary description of service -->
## Getting started
### Install the package
```bash
python -m pip install client-structure-service
```
#### Prequisites
- Python 3.8 or later is required to use this package.
- You need an [Azure subscription][azure_sub] to use this package.
- An existing Client Structure Service instance.
## Contributing
This project welcomes contributions and suggestions. Most contributions require
you to agree to a Contributor License Agreement (CLA) declaring that you have
the right to, and actually do, grant us the rights to use your contribution.
For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether
you need to provide a CLA and decorate the PR appropriately (e.g., label,
comment). Simply follow the instructions provided by the bot. You will only
need to do this once across all repos using our CLA.
This project has adopted the
[Microsoft Open Source Code of Conduct][code_of_conduct]. For more information,
see the Code of Conduct FAQ or contact opencode@microsoft.com with any
additional questions or comments.
<!-- LINKS -->
[code_of_conduct]: https://opensource.microsoft.com/codeofconduct/
[authenticate_with_token]: https://docs.microsoft.com/azure/cognitive-services/authentication?tabs=powershell#authenticate-with-an-authentication-token
[azure_identity_credentials]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity#credentials
[azure_identity_pip]: https://pypi.org/project/azure-identity/
[default_azure_credential]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity#defaultazurecredential
[pip]: https://pypi.org/project/pip/
[azure_sub]: https://azure.microsoft.com/free/

Просмотреть файл

@ -0,0 +1,12 @@
{
"CrossLanguagePackageId": "Client.Structure.Service",
"CrossLanguageDefinitionId": {
"client.structure.service.models.ClientType": "Client.Structure.Service.ClientType",
"client.structure.service.FirstClient.group3.two": "Client.Structure.ClientOperationGroup.Group3.two",
"client.structure.service.FirstClient.group3.three": "Client.Structure.ClientOperationGroup.Group3.three",
"client.structure.service.FirstClient.group4.four": "Client.Structure.ClientOperationGroup.Group4.four",
"client.structure.service.FirstClient.one": "Client.Structure.ClientOperationGroup.one",
"client.structure.service.SubNamespace.SecondClient.group5.six": "Client.Structure.AnotherClientOperationGroup.Group5.six",
"client.structure.service.SubNamespace.SecondClient.five": "Client.Structure.AnotherClientOperationGroup.five"
}
}

Просмотреть файл

@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore

Просмотреть файл

@ -0,0 +1 @@
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore

Просмотреть файл

@ -0,0 +1,27 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from ._client import FirstClient
from ._client import SubNamespace.SecondClient
from ._version import VERSION
__version__ = VERSION
try:
from ._patch import __all__ as _patch_all
from ._patch import * # pylint: disable=unused-wildcard-import
except ImportError:
_patch_all = []
from ._patch import patch_sdk as _patch_sdk
__all__ = [
'FirstClient',
'SubNamespace.SecondClient',
]
__all__.extend([p for p in _patch_all if p not in __all__])
_patch_sdk()

Просмотреть файл

@ -0,0 +1,176 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from copy import deepcopy
from typing import Any, Union
from typing_extensions import Self
from azure.core import PipelineClient
from azure.core.pipeline import policies
from azure.core.rest import HttpRequest, HttpResponse
from . import models as _models
from ._configuration import FirstClientConfiguration, SubNamespace.SecondClientConfiguration
from ._serialization import Deserializer, Serializer
from .operations import FirstClientOperationsMixin, Group3Operations, Group4Operations, Group5Operations, SubNamespace.SecondClientOperationsMixin
class FirstClient(FirstClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""FirstClient.
:ivar group3: Group3Operations operations
:vartype group3: client.structure.service.operations.Group3Operations
:ivar group4: Group4Operations operations
:vartype group4: client.structure.service.operations.Group4Operations
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__( # pylint: disable=missing-client-constructor-parameter-credential
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
_endpoint = '{endpoint}/client/structure/{client}'
self._config = FirstClientConfiguration(endpoint=endpoint, client=client, **kwargs)
_policies = kwargs.pop('policies', None)
if _policies is None:
_policies = [policies.RequestIdPolicy(**kwargs),self._config.headers_policy,self._config.user_agent_policy,self._config.proxy_policy,policies.ContentDecodePolicy(**kwargs),self._config.redirect_policy,self._config.retry_policy,self._config.authentication_policy,self._config.custom_hook_policy,self._config.logging_policy,policies.DistributedTracingPolicy(**kwargs),policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,self._config.http_logging_policy]
self._client: PipelineClient = PipelineClient(base_url=_endpoint, policies=_policies, **kwargs)
self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.group3 = Group3Operations(
self._client, self._config, self._serialize, self._deserialize
)
self.group4 = Group4Operations(
self._client, self._config, self._serialize, self._deserialize
)
def send_request(
self,
request: HttpRequest, *, stream: bool = False,
**kwargs: Any
) -> HttpResponse:
"""Runs the network request through the client's chained policies.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.HttpResponse
"""
request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore
def close(self) -> None:
self._client.close()
def __enter__(self) -> Self:
self._client.__enter__()
return self
def __exit__(self, *exc_details: Any) -> None:
self._client.__exit__(*exc_details)
class SubNamespace.SecondClient(SubNamespace.SecondClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""SubNamespace.SecondClient.
:ivar group5: Group5Operations operations
:vartype group5: client.structure.service.operations.Group5Operations
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__( # pylint: disable=missing-client-constructor-parameter-credential
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
_endpoint = '{endpoint}/client/structure/{client}'
self._config = SubNamespace.SecondClientConfiguration(endpoint=endpoint, client=client, **kwargs)
_policies = kwargs.pop('policies', None)
if _policies is None:
_policies = [policies.RequestIdPolicy(**kwargs),self._config.headers_policy,self._config.user_agent_policy,self._config.proxy_policy,policies.ContentDecodePolicy(**kwargs),self._config.redirect_policy,self._config.retry_policy,self._config.authentication_policy,self._config.custom_hook_policy,self._config.logging_policy,policies.DistributedTracingPolicy(**kwargs),policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,self._config.http_logging_policy]
self._client: PipelineClient = PipelineClient(base_url=_endpoint, policies=_policies, **kwargs)
self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.group5 = Group5Operations(
self._client, self._config, self._serialize, self._deserialize
)
def send_request(
self,
request: HttpRequest, *, stream: bool = False,
**kwargs: Any
) -> HttpResponse:
"""Runs the network request through the client's chained policies.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.HttpResponse
"""
request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore
def close(self) -> None:
self._client.close()
def __enter__(self) -> Self:
self._client.__enter__()
return self
def __exit__(self, *exc_details: Any) -> None:
self._client.__exit__(*exc_details)

Просмотреть файл

@ -0,0 +1,106 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, Union
from azure.core.pipeline import policies
from . import models as _models
from ._version import VERSION
class FirstClientConfiguration: # pylint: disable=too-many-instance-attributes
"""Configuration for FirstClient.
Note that all parameters used to create this instance are saved as instance
attributes.
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__(
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
if client is None:
raise ValueError("Parameter 'client' must not be None.")
self.endpoint = endpoint
self.client = client
kwargs.setdefault('sdk_moniker', 'client-structure-service/{}'.format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)
def _configure(
self,
**kwargs: Any
) -> None:
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.RedirectPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.RetryPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')
class SubNamespace.SecondClientConfiguration: # pylint: disable=too-many-instance-attributes,name-too-long
"""Configuration for SubNamespace.SecondClient.
Note that all parameters used to create this instance are saved as instance
attributes.
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__(
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
if client is None:
raise ValueError("Parameter 'client' must not be None.")
self.endpoint = endpoint
self.client = client
kwargs.setdefault('sdk_moniker', 'client-structure-service/{}'.format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)
def _configure(
self,
**kwargs: Any
) -> None:
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.RedirectPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.RetryPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')

Просмотреть файл

@ -0,0 +1,895 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=protected-access, arguments-differ, signature-differs, broad-except
import copy
import calendar
import decimal
import functools
import sys
import logging
import base64
import re
import typing
import enum
import email.utils
from datetime import datetime, date, time, timedelta, timezone
from json import JSONEncoder
from typing_extensions import Self
import isodate
from azure.core.exceptions import DeserializationError
from azure.core import CaseInsensitiveEnumMeta
from azure.core.pipeline import PipelineResponse
from azure.core.serialization import _Null
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping
_LOGGER = logging.getLogger(__name__)
__all__ = ["SdkJSONEncoder", "Model", "rest_field", "rest_discriminator"]
TZ_UTC = timezone.utc
_T = typing.TypeVar("_T")
def _timedelta_as_isostr(td: timedelta) -> str:
"""Converts a datetime.timedelta object into an ISO 8601 formatted string, e.g. 'P4DT12H30M05S'
Function adapted from the Tin Can Python project: https://github.com/RusticiSoftware/TinCanPython
:param timedelta td: The timedelta to convert
:rtype: str
:return: ISO8601 version of this timedelta
"""
# Split seconds to larger units
seconds = td.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
days, hours, minutes = list(map(int, (days, hours, minutes)))
seconds = round(seconds, 6)
# Build date
date_str = ""
if days:
date_str = "%sD" % days
if hours or minutes or seconds:
# Build time
time_str = "T"
# Hours
bigger_exists = date_str or hours
if bigger_exists:
time_str += "{:02}H".format(hours)
# Minutes
bigger_exists = bigger_exists or minutes
if bigger_exists:
time_str += "{:02}M".format(minutes)
# Seconds
try:
if seconds.is_integer():
seconds_string = "{:02}".format(int(seconds))
else:
# 9 chars long w/ leading 0, 6 digits after decimal
seconds_string = "%09.6f" % seconds
# Remove trailing zeros
seconds_string = seconds_string.rstrip("0")
except AttributeError: # int.is_integer() raises
seconds_string = "{:02}".format(seconds)
time_str += "{}S".format(seconds_string)
else:
time_str = ""
return "P" + date_str + time_str
def _serialize_bytes(o, format: typing.Optional[str] = None) -> str:
encoded = base64.b64encode(o).decode()
if format == "base64url":
return encoded.strip("=").replace("+", "-").replace("/", "_")
return encoded
def _serialize_datetime(o, format: typing.Optional[str] = None):
if hasattr(o, "year") and hasattr(o, "hour"):
if format == "rfc7231":
return email.utils.format_datetime(o, usegmt=True)
if format == "unix-timestamp":
return int(calendar.timegm(o.utctimetuple()))
# astimezone() fails for naive times in Python 2.7, so make make sure o is aware (tzinfo is set)
if not o.tzinfo:
iso_formatted = o.replace(tzinfo=TZ_UTC).isoformat()
else:
iso_formatted = o.astimezone(TZ_UTC).isoformat()
# Replace the trailing "+00:00" UTC offset with "Z" (RFC 3339: https://www.ietf.org/rfc/rfc3339.txt)
return iso_formatted.replace("+00:00", "Z")
# Next try datetime.date or datetime.time
return o.isoformat()
def _is_readonly(p):
try:
return p._visibility == ["read"] # pylint: disable=protected-access
except AttributeError:
return False
class SdkJSONEncoder(JSONEncoder):
"""A JSON encoder that's capable of serializing datetime objects and bytes."""
def __init__(self, *args, exclude_readonly: bool = False, format: typing.Optional[str] = None, **kwargs):
super().__init__(*args, **kwargs)
self.exclude_readonly = exclude_readonly
self.format = format
def default(self, o): # pylint: disable=too-many-return-statements
if _is_model(o):
if self.exclude_readonly:
readonly_props = [p._rest_name for p in o._attr_to_rest_field.values() if _is_readonly(p)]
return {k: v for k, v in o.items() if k not in readonly_props}
return dict(o.items())
try:
return super(SdkJSONEncoder, self).default(o)
except TypeError:
if isinstance(o, _Null):
return None
if isinstance(o, decimal.Decimal):
return float(o)
if isinstance(o, (bytes, bytearray)):
return _serialize_bytes(o, self.format)
try:
# First try datetime.datetime
return _serialize_datetime(o, self.format)
except AttributeError:
pass
# Last, try datetime.timedelta
try:
return _timedelta_as_isostr(o)
except AttributeError:
# This will be raised when it hits value.total_seconds in the method above
pass
return super(SdkJSONEncoder, self).default(o)
_VALID_DATE = re.compile(r"\d{4}[-]\d{2}[-]\d{2}T\d{2}:\d{2}:\d{2}" + r"\.?\d*Z?[-+]?[\d{2}]?:?[\d{2}]?")
_VALID_RFC7231 = re.compile(
r"(Mon|Tue|Wed|Thu|Fri|Sat|Sun),\s\d{2}\s"
r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s\d{4}\s\d{2}:\d{2}:\d{2}\sGMT"
)
def _deserialize_datetime(attr: typing.Union[str, datetime]) -> datetime:
"""Deserialize ISO-8601 formatted string into Datetime object.
:param str attr: response string to be deserialized.
:rtype: ~datetime.datetime
:returns: The datetime object from that input
"""
if isinstance(attr, datetime):
# i'm already deserialized
return attr
attr = attr.upper()
match = _VALID_DATE.match(attr)
if not match:
raise ValueError("Invalid datetime string: " + attr)
check_decimal = attr.split(".")
if len(check_decimal) > 1:
decimal_str = ""
for digit in check_decimal[1]:
if digit.isdigit():
decimal_str += digit
else:
break
if len(decimal_str) > 6:
attr = attr.replace(decimal_str, decimal_str[0:6])
date_obj = isodate.parse_datetime(attr)
test_utc = date_obj.utctimetuple()
if test_utc.tm_year > 9999 or test_utc.tm_year < 1:
raise OverflowError("Hit max or min date")
return date_obj
def _deserialize_datetime_rfc7231(attr: typing.Union[str, datetime]) -> datetime:
"""Deserialize RFC7231 formatted string into Datetime object.
:param str attr: response string to be deserialized.
:rtype: ~datetime.datetime
:returns: The datetime object from that input
"""
if isinstance(attr, datetime):
# i'm already deserialized
return attr
match = _VALID_RFC7231.match(attr)
if not match:
raise ValueError("Invalid datetime string: " + attr)
return email.utils.parsedate_to_datetime(attr)
def _deserialize_datetime_unix_timestamp(attr: typing.Union[float, datetime]) -> datetime:
"""Deserialize unix timestamp into Datetime object.
:param str attr: response string to be deserialized.
:rtype: ~datetime.datetime
:returns: The datetime object from that input
"""
if isinstance(attr, datetime):
# i'm already deserialized
return attr
return datetime.fromtimestamp(attr, TZ_UTC)
def _deserialize_date(attr: typing.Union[str, date]) -> date:
"""Deserialize ISO-8601 formatted string into Date object.
:param str attr: response string to be deserialized.
:rtype: date
:returns: The date object from that input
"""
# This must NOT use defaultmonth/defaultday. Using None ensure this raises an exception.
if isinstance(attr, date):
return attr
return isodate.parse_date(attr, defaultmonth=None, defaultday=None) # type: ignore
def _deserialize_time(attr: typing.Union[str, time]) -> time:
"""Deserialize ISO-8601 formatted string into time object.
:param str attr: response string to be deserialized.
:rtype: datetime.time
:returns: The time object from that input
"""
if isinstance(attr, time):
return attr
return isodate.parse_time(attr)
def _deserialize_bytes(attr):
if isinstance(attr, (bytes, bytearray)):
return attr
return bytes(base64.b64decode(attr))
def _deserialize_bytes_base64(attr):
if isinstance(attr, (bytes, bytearray)):
return attr
padding = "=" * (3 - (len(attr) + 3) % 4) # type: ignore
attr = attr + padding # type: ignore
encoded = attr.replace("-", "+").replace("_", "/")
return bytes(base64.b64decode(encoded))
def _deserialize_duration(attr):
if isinstance(attr, timedelta):
return attr
return isodate.parse_duration(attr)
def _deserialize_decimal(attr):
if isinstance(attr, decimal.Decimal):
return attr
return decimal.Decimal(str(attr))
_DESERIALIZE_MAPPING = {
datetime: _deserialize_datetime,
date: _deserialize_date,
time: _deserialize_time,
bytes: _deserialize_bytes,
bytearray: _deserialize_bytes,
timedelta: _deserialize_duration,
typing.Any: lambda x: x,
decimal.Decimal: _deserialize_decimal,
}
_DESERIALIZE_MAPPING_WITHFORMAT = {
"rfc3339": _deserialize_datetime,
"rfc7231": _deserialize_datetime_rfc7231,
"unix-timestamp": _deserialize_datetime_unix_timestamp,
"base64": _deserialize_bytes,
"base64url": _deserialize_bytes_base64,
}
def get_deserializer(annotation: typing.Any, rf: typing.Optional["_RestField"] = None):
if rf and rf._format:
return _DESERIALIZE_MAPPING_WITHFORMAT.get(rf._format)
return _DESERIALIZE_MAPPING.get(annotation)
def _get_type_alias_type(module_name: str, alias_name: str):
types = {
k: v
for k, v in sys.modules[module_name].__dict__.items()
if isinstance(v, typing._GenericAlias) # type: ignore
}
if alias_name not in types:
return alias_name
return types[alias_name]
def _get_model(module_name: str, model_name: str):
models = {k: v for k, v in sys.modules[module_name].__dict__.items() if isinstance(v, type)}
module_end = module_name.rsplit(".", 1)[0]
models.update({k: v for k, v in sys.modules[module_end].__dict__.items() if isinstance(v, type)})
if isinstance(model_name, str):
model_name = model_name.split(".")[-1]
if model_name not in models:
return model_name
return models[model_name]
_UNSET = object()
class _MyMutableMapping(MutableMapping[str, typing.Any]): # pylint: disable=unsubscriptable-object
def __init__(self, data: typing.Dict[str, typing.Any]) -> None:
self._data = data
def __contains__(self, key: typing.Any) -> bool:
return key in self._data
def __getitem__(self, key: str) -> typing.Any:
return self._data.__getitem__(key)
def __setitem__(self, key: str, value: typing.Any) -> None:
self._data.__setitem__(key, value)
def __delitem__(self, key: str) -> None:
self._data.__delitem__(key)
def __iter__(self) -> typing.Iterator[typing.Any]:
return self._data.__iter__()
def __len__(self) -> int:
return self._data.__len__()
def __ne__(self, other: typing.Any) -> bool:
return not self.__eq__(other)
def keys(self) -> typing.KeysView[str]:
return self._data.keys()
def values(self) -> typing.ValuesView[typing.Any]:
return self._data.values()
def items(self) -> typing.ItemsView[str, typing.Any]:
return self._data.items()
def get(self, key: str, default: typing.Any = None) -> typing.Any:
try:
return self[key]
except KeyError:
return default
@typing.overload
def pop(self, key: str) -> typing.Any: ...
@typing.overload
def pop(self, key: str, default: _T) -> _T: ...
@typing.overload
def pop(self, key: str, default: typing.Any) -> typing.Any: ...
def pop(self, key: str, default: typing.Any = _UNSET) -> typing.Any:
if default is _UNSET:
return self._data.pop(key)
return self._data.pop(key, default)
def popitem(self) -> typing.Tuple[str, typing.Any]:
return self._data.popitem()
def clear(self) -> None:
self._data.clear()
def update(self, *args: typing.Any, **kwargs: typing.Any) -> None:
self._data.update(*args, **kwargs)
@typing.overload
def setdefault(self, key: str, default: None = None) -> None: ...
@typing.overload
def setdefault(self, key: str, default: typing.Any) -> typing.Any: ...
def setdefault(self, key: str, default: typing.Any = _UNSET) -> typing.Any:
if default is _UNSET:
return self._data.setdefault(key)
return self._data.setdefault(key, default)
def __eq__(self, other: typing.Any) -> bool:
try:
other_model = self.__class__(other)
except Exception:
return False
return self._data == other_model._data
def __repr__(self) -> str:
return str(self._data)
def _is_model(obj: typing.Any) -> bool:
return getattr(obj, "_is_model", False)
def _serialize(o, format: typing.Optional[str] = None): # pylint: disable=too-many-return-statements
if isinstance(o, list):
return [_serialize(x, format) for x in o]
if isinstance(o, dict):
return {k: _serialize(v, format) for k, v in o.items()}
if isinstance(o, set):
return {_serialize(x, format) for x in o}
if isinstance(o, tuple):
return tuple(_serialize(x, format) for x in o)
if isinstance(o, (bytes, bytearray)):
return _serialize_bytes(o, format)
if isinstance(o, decimal.Decimal):
return float(o)
if isinstance(o, enum.Enum):
return o.value
try:
# First try datetime.datetime
return _serialize_datetime(o, format)
except AttributeError:
pass
# Last, try datetime.timedelta
try:
return _timedelta_as_isostr(o)
except AttributeError:
# This will be raised when it hits value.total_seconds in the method above
pass
return o
def _get_rest_field(
attr_to_rest_field: typing.Dict[str, "_RestField"], rest_name: str
) -> typing.Optional["_RestField"]:
try:
return next(rf for rf in attr_to_rest_field.values() if rf._rest_name == rest_name)
except StopIteration:
return None
def _create_value(rf: typing.Optional["_RestField"], value: typing.Any) -> typing.Any:
if not rf:
return _serialize(value, None)
if rf._is_multipart_file_input:
return value
if rf._is_model:
return _deserialize(rf._type, value)
return _serialize(value, rf._format)
class Model(_MyMutableMapping):
_is_model = True
# label whether current class's _attr_to_rest_field has been calculated
# could not see _attr_to_rest_field directly because subclass inherits it from parent class
_calculated: typing.Set[str] = set()
def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None:
class_name = self.__class__.__name__
if len(args) > 1:
raise TypeError(f"{class_name}.__init__() takes 2 positional arguments but {len(args) + 1} were given")
dict_to_pass = {
rest_field._rest_name: rest_field._default
for rest_field in self._attr_to_rest_field.values()
if rest_field._default is not _UNSET
}
if args:
dict_to_pass.update(
{k: _create_value(_get_rest_field(self._attr_to_rest_field, k), v) for k, v in args[0].items()}
)
else:
non_attr_kwargs = [k for k in kwargs if k not in self._attr_to_rest_field]
if non_attr_kwargs:
# actual type errors only throw the first wrong keyword arg they see, so following that.
raise TypeError(f"{class_name}.__init__() got an unexpected keyword argument '{non_attr_kwargs[0]}'")
dict_to_pass.update(
{
self._attr_to_rest_field[k]._rest_name: _create_value(self._attr_to_rest_field[k], v)
for k, v in kwargs.items()
if v is not None
}
)
super().__init__(dict_to_pass)
def copy(self) -> "Model":
return Model(self.__dict__)
def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> Self: # pylint: disable=unused-argument
if f"{cls.__module__}.{cls.__qualname__}" not in cls._calculated:
# we know the last nine classes in mro are going to be 'Model', '_MyMutableMapping', 'MutableMapping',
# 'Mapping', 'Collection', 'Sized', 'Iterable', 'Container' and 'object'
mros = cls.__mro__[:-9][::-1] # ignore parents, and reverse the mro order
attr_to_rest_field: typing.Dict[str, _RestField] = { # map attribute name to rest_field property
k: v for mro_class in mros for k, v in mro_class.__dict__.items() if k[0] != "_" and hasattr(v, "_type")
}
annotations = {
k: v
for mro_class in mros
if hasattr(mro_class, "__annotations__") # pylint: disable=no-member
for k, v in mro_class.__annotations__.items() # pylint: disable=no-member
}
for attr, rf in attr_to_rest_field.items():
rf._module = cls.__module__
if not rf._type:
rf._type = rf._get_deserialize_callable_from_annotation(annotations.get(attr, None))
if not rf._rest_name_input:
rf._rest_name_input = attr
cls._attr_to_rest_field: typing.Dict[str, _RestField] = dict(attr_to_rest_field.items())
cls._calculated.add(f"{cls.__module__}.{cls.__qualname__}")
return super().__new__(cls) # pylint: disable=no-value-for-parameter
def __init_subclass__(cls, discriminator: typing.Optional[str] = None) -> None:
for base in cls.__bases__:
if hasattr(base, "__mapping__"): # pylint: disable=no-member
base.__mapping__[discriminator or cls.__name__] = cls # type: ignore # pylint: disable=no-member
@classmethod
def _get_discriminator(cls, exist_discriminators) -> typing.Optional[str]:
for v in cls.__dict__.values():
if (
isinstance(v, _RestField) and v._is_discriminator and v._rest_name not in exist_discriminators
): # pylint: disable=protected-access
return v._rest_name # pylint: disable=protected-access
return None
@classmethod
def _deserialize(cls, data, exist_discriminators):
if not hasattr(cls, "__mapping__"): # pylint: disable=no-member
return cls(data)
discriminator = cls._get_discriminator(exist_discriminators)
exist_discriminators.append(discriminator)
mapped_cls = cls.__mapping__.get(data.get(discriminator), cls) # pyright: ignore # pylint: disable=no-member
if mapped_cls == cls:
return cls(data)
return mapped_cls._deserialize(data, exist_discriminators) # pylint: disable=protected-access
def as_dict(self, *, exclude_readonly: bool = False) -> typing.Dict[str, typing.Any]:
"""Return a dict that can be JSONify using json.dump.
:keyword bool exclude_readonly: Whether to remove the readonly properties.
:returns: A dict JSON compatible object
:rtype: dict
"""
result = {}
readonly_props = []
if exclude_readonly:
readonly_props = [p._rest_name for p in self._attr_to_rest_field.values() if _is_readonly(p)]
for k, v in self.items():
if exclude_readonly and k in readonly_props: # pyright: ignore
continue
is_multipart_file_input = False
try:
is_multipart_file_input = next(
rf for rf in self._attr_to_rest_field.values() if rf._rest_name == k
)._is_multipart_file_input
except StopIteration:
pass
result[k] = v if is_multipart_file_input else Model._as_dict_value(v, exclude_readonly=exclude_readonly)
return result
@staticmethod
def _as_dict_value(v: typing.Any, exclude_readonly: bool = False) -> typing.Any:
if v is None or isinstance(v, _Null):
return None
if isinstance(v, (list, tuple, set)):
return type(v)(Model._as_dict_value(x, exclude_readonly=exclude_readonly) for x in v)
if isinstance(v, dict):
return {dk: Model._as_dict_value(dv, exclude_readonly=exclude_readonly) for dk, dv in v.items()}
return v.as_dict(exclude_readonly=exclude_readonly) if hasattr(v, "as_dict") else v
def _deserialize_model(model_deserializer: typing.Optional[typing.Callable], obj):
if _is_model(obj):
return obj
return _deserialize(model_deserializer, obj)
def _deserialize_with_optional(if_obj_deserializer: typing.Optional[typing.Callable], obj):
if obj is None:
return obj
return _deserialize_with_callable(if_obj_deserializer, obj)
def _deserialize_with_union(deserializers, obj):
for deserializer in deserializers:
try:
return _deserialize(deserializer, obj)
except DeserializationError:
pass
raise DeserializationError()
def _deserialize_dict(
value_deserializer: typing.Optional[typing.Callable],
module: typing.Optional[str],
obj: typing.Dict[typing.Any, typing.Any],
):
if obj is None:
return obj
return {k: _deserialize(value_deserializer, v, module) for k, v in obj.items()}
def _deserialize_multiple_sequence(
entry_deserializers: typing.List[typing.Optional[typing.Callable]],
module: typing.Optional[str],
obj,
):
if obj is None:
return obj
return type(obj)(_deserialize(deserializer, entry, module) for entry, deserializer in zip(obj, entry_deserializers))
def _deserialize_sequence(
deserializer: typing.Optional[typing.Callable],
module: typing.Optional[str],
obj,
):
if obj is None:
return obj
return type(obj)(_deserialize(deserializer, entry, module) for entry in obj)
def _sorted_annotations(types: typing.List[typing.Any]) -> typing.List[typing.Any]:
return sorted(
types,
key=lambda x: hasattr(x, "__name__") and x.__name__.lower() in ("str", "float", "int", "bool"),
)
def _get_deserialize_callable_from_annotation( # pylint: disable=R0911, R0915, R0912
annotation: typing.Any,
module: typing.Optional[str],
rf: typing.Optional["_RestField"] = None,
) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]:
if not annotation or annotation in [int, float]:
return None
# is it a type alias?
if isinstance(annotation, str):
if module is not None:
annotation = _get_type_alias_type(module, annotation)
# is it a forward ref / in quotes?
if isinstance(annotation, (str, typing.ForwardRef)):
try:
model_name = annotation.__forward_arg__ # type: ignore
except AttributeError:
model_name = annotation
if module is not None:
annotation = _get_model(module, model_name)
try:
if module and _is_model(annotation):
if rf:
rf._is_model = True
return functools.partial(_deserialize_model, annotation) # pyright: ignore
except Exception:
pass
# is it a literal?
try:
if annotation.__origin__ is typing.Literal: # pyright: ignore
return None
except AttributeError:
pass
# is it optional?
try:
if any(a for a in annotation.__args__ if a == type(None)): # pyright: ignore
if len(annotation.__args__) <= 2: # pyright: ignore
if_obj_deserializer = _get_deserialize_callable_from_annotation(
next(a for a in annotation.__args__ if a != type(None)), module, rf # pyright: ignore
)
return functools.partial(_deserialize_with_optional, if_obj_deserializer)
# the type is Optional[Union[...]], we need to remove the None type from the Union
annotation_copy = copy.copy(annotation)
annotation_copy.__args__ = [a for a in annotation_copy.__args__ if a != type(None)] # pyright: ignore
return _get_deserialize_callable_from_annotation(annotation_copy, module, rf)
except AttributeError:
pass
# is it union?
if getattr(annotation, "__origin__", None) is typing.Union:
# initial ordering is we make `string` the last deserialization option, because it is often them most generic
deserializers = [
_get_deserialize_callable_from_annotation(arg, module, rf)
for arg in _sorted_annotations(annotation.__args__) # pyright: ignore
]
return functools.partial(_deserialize_with_union, deserializers)
try:
if annotation._name == "Dict": # pyright: ignore
value_deserializer = _get_deserialize_callable_from_annotation(
annotation.__args__[1], module, rf # pyright: ignore
)
return functools.partial(
_deserialize_dict,
value_deserializer,
module,
)
except (AttributeError, IndexError):
pass
try:
if annotation._name in ["List", "Set", "Tuple", "Sequence"]: # pyright: ignore
if len(annotation.__args__) > 1: # pyright: ignore
entry_deserializers = [
_get_deserialize_callable_from_annotation(dt, module, rf)
for dt in annotation.__args__ # pyright: ignore
]
return functools.partial(_deserialize_multiple_sequence, entry_deserializers, module)
deserializer = _get_deserialize_callable_from_annotation(
annotation.__args__[0], module, rf # pyright: ignore
)
return functools.partial(_deserialize_sequence, deserializer, module)
except (TypeError, IndexError, AttributeError, SyntaxError):
pass
def _deserialize_default(
deserializer,
obj,
):
if obj is None:
return obj
try:
return _deserialize_with_callable(deserializer, obj)
except Exception:
pass
return obj
if get_deserializer(annotation, rf):
return functools.partial(_deserialize_default, get_deserializer(annotation, rf))
return functools.partial(_deserialize_default, annotation)
def _deserialize_with_callable(
deserializer: typing.Optional[typing.Callable[[typing.Any], typing.Any]],
value: typing.Any,
):
try:
if value is None or isinstance(value, _Null):
return None
if deserializer is None:
return value
if isinstance(deserializer, CaseInsensitiveEnumMeta):
try:
return deserializer(value)
except ValueError:
# for unknown value, return raw value
return value
if isinstance(deserializer, type) and issubclass(deserializer, Model):
return deserializer._deserialize(value, [])
return typing.cast(typing.Callable[[typing.Any], typing.Any], deserializer)(value)
except Exception as e:
raise DeserializationError() from e
def _deserialize(
deserializer: typing.Any,
value: typing.Any,
module: typing.Optional[str] = None,
rf: typing.Optional["_RestField"] = None,
format: typing.Optional[str] = None,
) -> typing.Any:
if isinstance(value, PipelineResponse):
value = value.http_response.json()
if rf is None and format:
rf = _RestField(format=format)
if not isinstance(deserializer, functools.partial):
deserializer = _get_deserialize_callable_from_annotation(deserializer, module, rf)
return _deserialize_with_callable(deserializer, value)
class _RestField:
def __init__(
self,
*,
name: typing.Optional[str] = None,
type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin
is_discriminator: bool = False,
visibility: typing.Optional[typing.List[str]] = None,
default: typing.Any = _UNSET,
format: typing.Optional[str] = None,
is_multipart_file_input: bool = False,
):
self._type = type
self._rest_name_input = name
self._module: typing.Optional[str] = None
self._is_discriminator = is_discriminator
self._visibility = visibility
self._is_model = False
self._default = default
self._format = format
self._is_multipart_file_input = is_multipart_file_input
@property
def _class_type(self) -> typing.Any:
return getattr(self._type, "args", [None])[0]
@property
def _rest_name(self) -> str:
if self._rest_name_input is None:
raise ValueError("Rest name was never set")
return self._rest_name_input
def __get__(self, obj: Model, type=None): # pylint: disable=redefined-builtin
# by this point, type and rest_name will have a value bc we default
# them in __new__ of the Model class
item = obj.get(self._rest_name)
if item is None:
return item
if self._is_model:
return item
return _deserialize(self._type, _serialize(item, self._format), rf=self)
def __set__(self, obj: Model, value) -> None:
if value is None:
# we want to wipe out entries if users set attr to None
try:
obj.__delitem__(self._rest_name)
except KeyError:
pass
return
if self._is_model:
if not _is_model(value):
value = _deserialize(self._type, value)
obj.__setitem__(self._rest_name, value)
return
obj.__setitem__(self._rest_name, _serialize(value, self._format))
def _get_deserialize_callable_from_annotation(
self, annotation: typing.Any
) -> typing.Optional[typing.Callable[[typing.Any], typing.Any]]:
return _get_deserialize_callable_from_annotation(annotation, self._module, self)
def rest_field(
*,
name: typing.Optional[str] = None,
type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin
visibility: typing.Optional[typing.List[str]] = None,
default: typing.Any = _UNSET,
format: typing.Optional[str] = None,
is_multipart_file_input: bool = False,
) -> typing.Any:
return _RestField(
name=name,
type=type,
visibility=visibility,
default=default,
format=format,
is_multipart_file_input=is_multipart_file_input,
)
def rest_discriminator(
*,
name: typing.Optional[str] = None,
type: typing.Optional[typing.Callable] = None, # pylint: disable=redefined-builtin
visibility: typing.Optional[typing.List[str]] = None,
) -> typing.Any:
return _RestField(name=name, type=type, is_discriminator=True, visibility=visibility)

Просмотреть файл

@ -0,0 +1,20 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List
__all__: List[str] = [] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,35 @@
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from abc import ABC
from typing import TYPE_CHECKING
from ._configuration import FirstClientConfiguration, SubNamespace.SecondClientConfiguration
if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from azure.core import PipelineClient
from ._serialization import Deserializer, Serializer
class FirstClientMixinABC(
ABC
):
"""DO NOT use this class. It is for internal typing use only."""
_client: "PipelineClient"
_config: FirstClientConfiguration
_serialize: "Serializer"
_deserialize: "Deserializer"
class SubNamespace.SecondClientMixinABC(
ABC
):
"""DO NOT use this class. It is for internal typing use only."""
_client: "PipelineClient"
_config: SubNamespace.SecondClientConfiguration
_serialize: "Serializer"
_deserialize: "Deserializer"

Просмотреть файл

@ -0,0 +1,9 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
VERSION = "1.0.0b1"

Просмотреть файл

@ -0,0 +1,24 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from ._client import FirstClient
from ._client import SubNamespace.SecondClient
try:
from ._patch import __all__ as _patch_all
from ._patch import * # pylint: disable=unused-wildcard-import
except ImportError:
_patch_all = []
from ._patch import patch_sdk as _patch_sdk
__all__ = [
'FirstClient',
'SubNamespace.SecondClient',
]
__all__.extend([p for p in _patch_all if p not in __all__])
_patch_sdk()

Просмотреть файл

@ -0,0 +1,176 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from copy import deepcopy
from typing import Any, Awaitable, Union
from typing_extensions import Self
from azure.core import AsyncPipelineClient
from azure.core.pipeline import policies
from azure.core.rest import AsyncHttpResponse, HttpRequest
from .. import models as _models
from .._serialization import Deserializer, Serializer
from ._configuration import FirstClientConfiguration, SubNamespace.SecondClientConfiguration
from .operations import FirstClientOperationsMixin, Group3Operations, Group4Operations, Group5Operations, SubNamespace.SecondClientOperationsMixin
class FirstClient(FirstClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""FirstClient.
:ivar group3: Group3Operations operations
:vartype group3: client.structure.service.aio.operations.Group3Operations
:ivar group4: Group4Operations operations
:vartype group4: client.structure.service.aio.operations.Group4Operations
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__( # pylint: disable=missing-client-constructor-parameter-credential
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
_endpoint = '{endpoint}/client/structure/{client}'
self._config = FirstClientConfiguration(endpoint=endpoint, client=client, **kwargs)
_policies = kwargs.pop('policies', None)
if _policies is None:
_policies = [policies.RequestIdPolicy(**kwargs),self._config.headers_policy,self._config.user_agent_policy,self._config.proxy_policy,policies.ContentDecodePolicy(**kwargs),self._config.redirect_policy,self._config.retry_policy,self._config.authentication_policy,self._config.custom_hook_policy,self._config.logging_policy,policies.DistributedTracingPolicy(**kwargs),policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,self._config.http_logging_policy]
self._client: AsyncPipelineClient = AsyncPipelineClient(base_url=_endpoint, policies=_policies, **kwargs)
self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.group3 = Group3Operations(
self._client, self._config, self._serialize, self._deserialize
)
self.group4 = Group4Operations(
self._client, self._config, self._serialize, self._deserialize
)
def send_request(
self,
request: HttpRequest, *, stream: bool = False,
**kwargs: Any
) -> Awaitable[AsyncHttpResponse]:
"""Runs the network request through the client's chained policies.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.AsyncHttpResponse
"""
request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore
async def close(self) -> None:
await self._client.close()
async def __aenter__(self) -> Self:
await self._client.__aenter__()
return self
async def __aexit__(self, *exc_details: Any) -> None:
await self._client.__aexit__(*exc_details)
class SubNamespace.SecondClient(SubNamespace.SecondClientOperationsMixin): # pylint: disable=client-accepts-api-version-keyword
"""SubNamespace.SecondClient.
:ivar group5: Group5Operations operations
:vartype group5: client.structure.service.aio.operations.Group5Operations
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__( # pylint: disable=missing-client-constructor-parameter-credential
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
_endpoint = '{endpoint}/client/structure/{client}'
self._config = SubNamespace.SecondClientConfiguration(endpoint=endpoint, client=client, **kwargs)
_policies = kwargs.pop('policies', None)
if _policies is None:
_policies = [policies.RequestIdPolicy(**kwargs),self._config.headers_policy,self._config.user_agent_policy,self._config.proxy_policy,policies.ContentDecodePolicy(**kwargs),self._config.redirect_policy,self._config.retry_policy,self._config.authentication_policy,self._config.custom_hook_policy,self._config.logging_policy,policies.DistributedTracingPolicy(**kwargs),policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,self._config.http_logging_policy]
self._client: AsyncPipelineClient = AsyncPipelineClient(base_url=_endpoint, policies=_policies, **kwargs)
self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.group5 = Group5Operations(
self._client, self._config, self._serialize, self._deserialize
)
def send_request(
self,
request: HttpRequest, *, stream: bool = False,
**kwargs: Any
) -> Awaitable[AsyncHttpResponse]:
"""Runs the network request through the client's chained policies.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.AsyncHttpResponse
"""
request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore
async def close(self) -> None:
await self._client.close()
async def __aenter__(self) -> Self:
await self._client.__aenter__()
return self
async def __aexit__(self, *exc_details: Any) -> None:
await self._client.__aexit__(*exc_details)

Просмотреть файл

@ -0,0 +1,106 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, Union
from azure.core.pipeline import policies
from .. import models as _models
from .._version import VERSION
class FirstClientConfiguration: # pylint: disable=too-many-instance-attributes
"""Configuration for FirstClient.
Note that all parameters used to create this instance are saved as instance
attributes.
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__(
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
if client is None:
raise ValueError("Parameter 'client' must not be None.")
self.endpoint = endpoint
self.client = client
kwargs.setdefault('sdk_moniker', 'client-structure-service/{}'.format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)
def _configure(
self,
**kwargs: Any
) -> None:
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.AsyncRedirectPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.AsyncRetryPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')
class SubNamespace.SecondClientConfiguration: # pylint: disable=too-many-instance-attributes,name-too-long
"""Configuration for SubNamespace.SecondClient.
Note that all parameters used to create this instance are saved as instance
attributes.
:param endpoint: Need to be set as 'http://localhost:3000' in client. Required.
:type endpoint: str
:param client: Need to be set as 'default', 'multi-client', 'renamed-operation',
'two-operation-group' in client. Known values are: "default", "multi-client",
"renamed-operation", and "two-operation-group". Required.
:type client: str or ~client.structure.service.models.ClientType
"""
def __init__(
self,
endpoint: str,
client: Union[str, _models.ClientType],
**kwargs: Any
) -> None:
if endpoint is None:
raise ValueError("Parameter 'endpoint' must not be None.")
if client is None:
raise ValueError("Parameter 'client' must not be None.")
self.endpoint = endpoint
self.client = client
kwargs.setdefault('sdk_moniker', 'client-structure-service/{}'.format(VERSION))
self.polling_interval = kwargs.get("polling_interval", 30)
self._configure(**kwargs)
def _configure(
self,
**kwargs: Any
) -> None:
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.AsyncRedirectPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.AsyncRetryPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')

Просмотреть файл

@ -0,0 +1,20 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List
__all__: List[str] = [] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

Просмотреть файл

@ -0,0 +1,35 @@
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from abc import ABC
from typing import TYPE_CHECKING
from ._configuration import FirstClientConfiguration, SubNamespace.SecondClientConfiguration
if TYPE_CHECKING:
# pylint: disable=unused-import,ungrouped-imports
from azure.core import AsyncPipelineClient
from .._serialization import Deserializer, Serializer
class FirstClientMixinABC(
ABC
):
"""DO NOT use this class. It is for internal typing use only."""
_client: "AsyncPipelineClient"
_config: FirstClientConfiguration
_serialize: "Serializer"
_deserialize: "Deserializer"
class SubNamespace.SecondClientMixinABC(
ABC
):
"""DO NOT use this class. It is for internal typing use only."""
_client: "AsyncPipelineClient"
_config: SubNamespace.SecondClientConfiguration
_serialize: "Serializer"
_deserialize: "Deserializer"

Просмотреть файл

@ -0,0 +1,26 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from ._operations import Group3Operations
from ._operations import Group4Operations
from ._operations import FirstClientOperationsMixin
from ._operations import Group5Operations
from ._operations import SubNamespace.SecondClientOperationsMixin
from ._patch import __all__ as _patch_all
from ._patch import * # pylint: disable=unused-wildcard-import
from ._patch import patch_sdk as _patch_sdk
__all__ = [
'Group3Operations',
'Group4Operations',
'FirstClientOperationsMixin',
'Group5Operations',
'SubNamespace.SecondClientOperationsMixin',
]
__all__.extend([p for p in _patch_all if p not in __all__])
_patch_sdk()

Просмотреть файл

@ -0,0 +1,401 @@
# pylint: disable=too-many-lines,too-many-statements
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import sys
from typing import Any, Callable, Dict, Optional, Type, TypeVar
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.rest import AsyncHttpResponse, HttpRequest
from azure.core.tracing.decorator_async import distributed_trace_async
from ...operations._operations import build_first_one_request, build_group3_three_request, build_group3_two_request, build_group4_four_request, build_group5_six_request, build_sub_namespace._second_five_request
from .._vendor import FirstClientMixinABC, SubNamespace.SecondClientMixinABC
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
class Group3Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.aio.FirstClient`'s
:attr:`group3` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace_async
async def two( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""two.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group3_two_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
@distributed_trace_async
async def three( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""three.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group3_three_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class Group4Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.aio.FirstClient`'s
:attr:`group4` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace_async
async def four( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""four.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group4_four_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class FirstClientOperationsMixin(
FirstClientMixinABC
):
@distributed_trace_async
async def one( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""one.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_first_one_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class Group5Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.aio.SubNamespace.SecondClient`'s
:attr:`group5` attribute.
"""
def __init__(self, *args, **kwargs) -> None:
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace_async
async def six( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""six.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group5_six_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class SubNamespace.SecondClientOperationsMixin(
SubNamespace.SecondClientMixinABC
):
@distributed_trace_async
async def five( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""five.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_sub_namespace._second_five_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = await self._client._pipeline.run( # type: ignore # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore

Просмотреть файл

@ -0,0 +1,20 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List
__all__: List[str] = [] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

Просмотреть файл

@ -0,0 +1,18 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from ._enums import ClientType
from ._patch import __all__ as _patch_all
from ._patch import * # pylint: disable=unused-wildcard-import
from ._patch import patch_sdk as _patch_sdk
__all__ = [
"ClientType",
]
__all__.extend([p for p in _patch_all if p not in __all__])
_patch_sdk()

Просмотреть файл

@ -0,0 +1,19 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from enum import Enum
from azure.core import CaseInsensitiveEnumMeta
class ClientType(str, Enum, metaclass=CaseInsensitiveEnumMeta):
"""Type of ClientType."""
DEFAULT = "default"
MULTI_CLIENT = "multi-client"
RENAMED_OPERATION = "renamed-operation"
TWO_OPERATION_GROUP = "two-operation-group"

Просмотреть файл

@ -0,0 +1,20 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List
__all__: List[str] = [] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

Просмотреть файл

@ -0,0 +1,26 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from ._operations import Group3Operations
from ._operations import Group4Operations
from ._operations import FirstClientOperationsMixin
from ._operations import Group5Operations
from ._operations import SubNamespace.SecondClientOperationsMixin
from ._patch import __all__ as _patch_all
from ._patch import * # pylint: disable=unused-wildcard-import
from ._patch import patch_sdk as _patch_sdk
__all__ = [
'Group3Operations',
'Group4Operations',
'FirstClientOperationsMixin',
'Group5Operations',
'SubNamespace.SecondClientOperationsMixin',
]
__all__.extend([p for p in _patch_all if p not in __all__])
_patch_sdk()

Просмотреть файл

@ -0,0 +1,482 @@
# pylint: disable=too-many-lines,too-many-statements
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import sys
from typing import Any, Callable, Dict, Optional, Type, TypeVar
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, ResourceNotModifiedError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.rest import HttpRequest, HttpResponse
from azure.core.tracing.decorator import distributed_trace
from .._serialization import Serializer
from .._vendor import FirstClientMixinABC, SubNamespace.SecondClientMixinABC
if sys.version_info >= (3, 9):
from collections.abc import MutableMapping
else:
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, HttpResponse], T, Dict[str, Any]], Any]]
_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
def build_group3_two_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/two"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
def build_group3_three_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/three"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
def build_group4_four_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/four"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
def build_first_one_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/one"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
def build_group5_six_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/six"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
def build_sub_namespace._second_five_request(
**kwargs: Any
) -> HttpRequest:
# Construct URL
_url = "/five"
return HttpRequest(
method="POST",
url=_url,
**kwargs
)
class Group3Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.FirstClient`'s
:attr:`group3` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace
def two( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""two.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group3_two_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
@distributed_trace
def three( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""three.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group3_three_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class Group4Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.FirstClient`'s
:attr:`group4` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace
def four( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""four.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group4_four_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class FirstClientOperationsMixin(
FirstClientMixinABC
):
@distributed_trace
def one( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""one.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_first_one_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class Group5Operations:
"""
.. warning::
**DO NOT** instantiate this class directly.
Instead, you should access the following operations through
:class:`~client.structure.service.SubNamespace.SecondClient`'s
:attr:`group5` attribute.
"""
def __init__(self, *args, **kwargs):
input_args = list(args)
self._client = input_args.pop(0) if input_args else kwargs.pop("client")
self._config = input_args.pop(0) if input_args else kwargs.pop("config")
self._serialize = input_args.pop(0) if input_args else kwargs.pop("serializer")
self._deserialize = input_args.pop(0) if input_args else kwargs.pop("deserializer")
@distributed_trace
def six( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""six.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_group5_six_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore
class SubNamespace.SecondClientOperationsMixin(
SubNamespace.SecondClientMixinABC
):
@distributed_trace
def five( # pylint: disable=inconsistent-return-statements
self,
**kwargs: Any
) -> None:
"""five.
:return: None
:rtype: None
:raises ~azure.core.exceptions.HttpResponseError:
"""
error_map: MutableMapping[int, Type[HttpResponseError]] = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError, 304: ResourceNotModifiedError
}
error_map.update(kwargs.pop('error_map', {}) or {})
_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
cls: ClsType[None] = kwargs.pop(
'cls', None
)
_request = build_sub_namespace._second_five_request(
headers=_headers,
params=_params,
)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.endpoint", self._config.endpoint, 'str', skip_quote=True),
"client": self._serialize.url("self._config.client", self._config.client, 'str'),
}
_request.url = self._client.format_url(_request.url, **path_format_arguments)
_stream = False
pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access
_request,
stream=_stream,
**kwargs
)
response = pipeline_response.http_response
if response.status_code not in [204]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)
if cls:
return cls(pipeline_response, None, {}) # type: ignore

Просмотреть файл

@ -0,0 +1,20 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
"""Customize generated code here.
Follow our quickstart for examples: https://aka.ms/azsdk/python/dpcodegen/python/customize
"""
from typing import List
__all__: List[str] = [] # Add all objects you want publicly available to users at this package level
def patch_sdk():
"""Do not remove from this file.
`patch_sdk` is a last resort escape hatch that allows you to do customizations
you can't accomplish using the techniques described in
https://aka.ms/azsdk/python/dpcodegen/python/customize
"""

Просмотреть файл

@ -0,0 +1 @@
# Marker file for PEP 561.

Просмотреть файл

@ -0,0 +1,3 @@
-e ../../../tools/azure-sdk-tools
../../core/azure-core
aiohttp

Просмотреть файл

@ -0,0 +1,71 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
# coding: utf-8
import os
import re
from setuptools import setup, find_packages
PACKAGE_NAME = "client-structure-service"
PACKAGE_PPRINT_NAME = "Client Structure Service"
# a-b-c => a/b/c
package_folder_path = PACKAGE_NAME.replace("-", "/")
# Version extraction inspired from 'requests'
with open(os.path.join(package_folder_path, "_version.py"), "r") as fd:
version = re.search(r'^VERSION\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1)
if not version:
raise RuntimeError("Cannot find version information")
setup(
name=PACKAGE_NAME,
version=version,
description="Microsoft {} Client Library for Python".format(PACKAGE_PPRINT_NAME),
long_description=open("README.md", "r").read(),
long_description_content_type="text/markdown",
license="MIT License",
author="Microsoft Corporation",
author_email="azpysdkhelp@microsoft.com",
url="https://github.com/Azure/azure-sdk-for-python/tree/main/sdk",
keywords="azure, azure sdk",
classifiers=[
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"License :: OSI Approved :: MIT License",
],
zip_safe=False,
packages=find_packages(
exclude=[
"tests",
# Exclude packages that will be covered by PEP420 or nspkg
"client",
"client.structure",
]
),
include_package_data=True,
package_data={
"client.structure.service": ["py.typed"],
},
install_requires=[
"isodate>=0.6.1",
"azure-core>=1.30.0",
"typing-extensions>=4.6.0",
],
python_requires=">=3.8",
)