This commit is contained in:
Eric Bidelman 2015-07-10 10:01:36 -07:00
Родитель 9327ed9ad1
Коммит b494874ae1
9 изменённых файлов: 2760 добавлений и 3 удалений

Просмотреть файл

@ -29,7 +29,7 @@ import webapp2
import xml.dom.minidom
# Appengine imports.
from google.appengine.api import files
import cloudstorage
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.api import users
@ -168,9 +168,9 @@ class YesterdayHandler(blobstore_handlers.BlobstoreDownloadHandler):
if settings.PROD:
try:
with files.open(BIGSTORE_BUCKET + filename, 'r') as unused_f:
with cloudstorage.open(BIGSTORE_BUCKET + filename, 'r') as unused_f:
pass
except files.file.ExistenceError, e:
except Exception, e:
self.response.write(e)
return

29
cloudstorage/__init__.py Normal file
Просмотреть файл

@ -0,0 +1,29 @@
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Client Library for Google Cloud Storage."""
from .api_utils import RetryParams
from .api_utils import set_default_retry_params
from cloudstorage_api import *
from .common import CSFileStat
from .common import GCSFileStat
from .common import validate_bucket_name
from .common import validate_bucket_path
from .common import validate_file_path
from errors import *
from storage_api import *

356
cloudstorage/api_utils.py Normal file
Просмотреть файл

@ -0,0 +1,356 @@
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Util functions and classes for cloudstorage_api."""
__all__ = ['set_default_retry_params',
'RetryParams',
]
import copy
import httplib
import logging
import math
import os
import threading
import time
import urllib
try:
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
from google.appengine.api import urlfetch_errors
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import ndb
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import tasklets
from google.appengine.ext.ndb import utils
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
except ImportError:
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
from google.appengine.api import urlfetch_errors
from google.appengine.datastore import datastore_rpc
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
from google.appengine.ext import ndb
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import tasklets
from google.appengine.ext.ndb import utils
_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
urlfetch_errors.InternalTransientError,
apiproxy_errors.Error,
app_identity.InternalError,
app_identity.BackendDeadlineExceeded)
_thread_local_settings = threading.local()
_thread_local_settings.default_retry_params = None
def set_default_retry_params(retry_params):
"""Set a default RetryParams for current thread current request."""
_thread_local_settings.default_retry_params = copy.copy(retry_params)
def _get_default_retry_params():
"""Get default RetryParams for current request and current thread.
Returns:
A new instance of the default RetryParams.
"""
default = getattr(_thread_local_settings, 'default_retry_params', None)
if default is None or not default.belong_to_current_request():
return RetryParams()
else:
return copy.copy(default)
def _quote_filename(filename):
"""Quotes filename to use as a valid URI path.
Args:
filename: user provided filename. /bucket/filename.
Returns:
The filename properly quoted to use as URI's path component.
"""
return urllib.quote(filename)
def _unquote_filename(filename):
"""Unquotes a valid URI path back to its filename.
This is the opposite of _quote_filename.
Args:
filename: a quoted filename. /bucket/some%20filename.
Returns:
The filename unquoted.
"""
return urllib.unquote(filename)
def _should_retry(resp):
"""Given a urlfetch response, decide whether to retry that request."""
return (resp.status_code == httplib.REQUEST_TIMEOUT or
(resp.status_code >= 500 and
resp.status_code < 600))
class _RetryWrapper(object):
"""A wrapper that wraps retry logic around any tasklet."""
def __init__(self,
retry_params,
retriable_exceptions=_RETRIABLE_EXCEPTIONS,
should_retry=lambda r: False):
"""Init.
Args:
retry_params: an RetryParams instance.
retriable_exceptions: a list of exception classes that are retriable.
should_retry: a function that takes a result from the tasklet and returns
a boolean. True if the result should be retried.
"""
self.retry_params = retry_params
self.retriable_exceptions = retriable_exceptions
self.should_retry = should_retry
@ndb.tasklet
def run(self, tasklet, **kwds):
"""Run a tasklet with retry.
The retry should be transparent to the caller: if no results
are successful, the exception or result from the last retry is returned
to the caller.
Args:
tasklet: the tasklet to run.
**kwds: keywords arguments to run the tasklet.
Raises:
The exception from running the tasklet.
Returns:
The result from running the tasklet.
"""
start_time = time.time()
n = 1
while True:
e = None
result = None
got_result = False
try:
result = yield tasklet(**kwds)
got_result = True
if not self.should_retry(result):
raise ndb.Return(result)
except runtime.DeadlineExceededError:
logging.debug(
'Tasklet has exceeded request deadline after %s seconds total',
time.time() - start_time)
raise
except self.retriable_exceptions, e:
pass
if n == 1:
logging.debug('Tasklet is %r', tasklet)
delay = self.retry_params.delay(n, start_time)
if delay <= 0:
logging.debug(
'Tasklet failed after %s attempts and %s seconds in total',
n, time.time() - start_time)
if got_result:
raise ndb.Return(result)
elif e is not None:
raise e
else:
assert False, 'Should never reach here.'
if got_result:
logging.debug(
'Got result %r from tasklet.', result)
else:
logging.debug(
'Got exception "%r" from tasklet.', e)
logging.debug('Retry in %s seconds.', delay)
n += 1
yield tasklets.sleep(delay)
class RetryParams(object):
"""Retry configuration parameters."""
_DEFAULT_USER_AGENT = 'App Engine Python GCS Client'
@datastore_rpc._positional(1)
def __init__(self,
backoff_factor=2.0,
initial_delay=0.1,
max_delay=10.0,
min_retries=3,
max_retries=6,
max_retry_period=30.0,
urlfetch_timeout=None,
save_access_token=False,
_user_agent=None):
"""Init.
This object is unique per request per thread.
Library will retry according to this setting when App Engine Server
can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
500-600 response.
Args:
backoff_factor: exponential backoff multiplier.
initial_delay: seconds to delay for the first retry.
max_delay: max seconds to delay for every retry.
min_retries: min number of times to retry. This value is automatically
capped by max_retries.
max_retries: max number of times to retry. Set this to 0 for no retry.
max_retry_period: max total seconds spent on retry. Retry stops when
this period passed AND min_retries has been attempted.
urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
in which case the value will be chosen by urlfetch module.
save_access_token: persist access token to datastore to avoid
excessive usage of GetAccessToken API. Usually the token is cached
in process and in memcache. In some cases, memcache isn't very
reliable.
_user_agent: The user agent string that you want to use in your requests.
"""
self.backoff_factor = self._check('backoff_factor', backoff_factor)
self.initial_delay = self._check('initial_delay', initial_delay)
self.max_delay = self._check('max_delay', max_delay)
self.max_retry_period = self._check('max_retry_period', max_retry_period)
self.max_retries = self._check('max_retries', max_retries, True, int)
self.min_retries = self._check('min_retries', min_retries, True, int)
if self.min_retries > self.max_retries:
self.min_retries = self.max_retries
self.urlfetch_timeout = None
if urlfetch_timeout is not None:
self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
self.save_access_token = self._check('save_access_token', save_access_token,
True, bool)
self._user_agent = _user_agent or self._DEFAULT_USER_AGENT
self._request_id = os.getenv('REQUEST_LOG_ID')
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
@classmethod
def _check(cls, name, val, can_be_zero=False, val_type=float):
"""Check init arguments.
Args:
name: name of the argument. For logging purpose.
val: value. Value has to be non negative number.
can_be_zero: whether value can be zero.
val_type: Python type of the value.
Returns:
The value.
Raises:
ValueError: when invalid value is passed in.
TypeError: when invalid value type is passed in.
"""
valid_types = [val_type]
if val_type is float:
valid_types.append(int)
if type(val) not in valid_types:
raise TypeError(
'Expect type %s for parameter %s' % (val_type.__name__, name))
if val < 0:
raise ValueError(
'Value for parameter %s has to be greater than 0' % name)
if not can_be_zero and val == 0:
raise ValueError(
'Value for parameter %s can not be 0' % name)
return val
def belong_to_current_request(self):
return os.getenv('REQUEST_LOG_ID') == self._request_id
def delay(self, n, start_time):
"""Calculate delay before the next retry.
Args:
n: the number of current attempt. The first attempt should be 1.
start_time: the time when retry started in unix time.
Returns:
Number of seconds to wait before next retry. -1 if retry should give up.
"""
if (n > self.max_retries or
(n > self.min_retries and
time.time() - start_time > self.max_retry_period)):
return -1
return min(
math.pow(self.backoff_factor, n-1) * self.initial_delay,
self.max_delay)
def _run_until_rpc():
"""Eagerly evaluate tasklets until it is blocking on some RPC.
Usually ndb eventloop el isn't run until some code calls future.get_result().
When an async tasklet is called, the tasklet wrapper evaluates the tasklet
code into a generator, enqueues a callback _help_tasklet_along onto
the el.current queue, and returns a future.
_help_tasklet_along, when called by the el, will
get one yielded value from the generator. If the value if another future,
set up a callback _on_future_complete to invoke _help_tasklet_along
when the dependent future fulfills. If the value if a RPC, set up a
callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
Thus _help_tasklet_along drills down
the chain of futures until some future is blocked by RPC. El runs
all callbacks and constantly check pending RPC status.
"""
el = eventloop.get_event_loop()
while el.current:
el.run0()
def _eager_tasklet(tasklet):
"""Decorator to turn tasklet to run eagerly."""
@utils.wrapping(tasklet)
def eager_wrapper(*args, **kwds):
fut = tasklet(*args, **kwds)
_run_until_rpc()
return fut
return eager_wrapper

Просмотреть файл

@ -0,0 +1,583 @@
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""File Interface for Google Cloud Storage."""
from __future__ import with_statement
__all__ = ['copy2',
'delete',
'listbucket',
'open',
'stat',
'compose',
]
import logging
import StringIO
import urllib
import os
import itertools
import types
import xml.etree.cElementTree as ET
from . import api_utils
from . import common
from . import errors
from . import storage_api
def open(filename,
mode='r',
content_type=None,
options=None,
read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE,
retry_params=None,
_account_id=None,
offset=0):
"""Opens a Google Cloud Storage file and returns it as a File-like object.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
mode: 'r' for reading mode. 'w' for writing mode.
In reading mode, the file must exist. In writing mode, a file will
be created or be overrode.
content_type: The MIME type of the file. str. Only valid in writing mode.
options: A str->basestring dict to specify additional headers to pass to
GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
Supported options are x-goog-acl, x-goog-meta-, cache-control,
content-disposition, and content-encoding.
Only valid in writing mode.
See https://developers.google.com/storage/docs/reference-headers
for details.
read_buffer_size: The buffer size for read. Read keeps a buffer
and prefetches another one. To minimize blocking for large files,
always read by buffer size. To minimize number of RPC requests for
small files, set a large buffer size. Max is 30MB.
retry_params: An instance of api_utils.RetryParams for subsequent calls
to GCS from this file handle. If None, the default one is used.
_account_id: Internal-use only.
offset: Number of bytes to skip at the start of the file. If None, 0 is
used.
Returns:
A reading or writing buffer that supports File-like interface. Buffer
must be closed after operations are done.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
ValueError: invalid open mode or if content_type or options are specified
in reading mode.
"""
common.validate_file_path(filename)
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
filename = api_utils._quote_filename(filename)
if mode == 'w':
common.validate_options(options)
return storage_api.StreamingBuffer(api, filename, content_type, options)
elif mode == 'r':
if content_type or options:
raise ValueError('Options and content_type can only be specified '
'for writing mode.')
return storage_api.ReadBuffer(api,
filename,
buffer_size=read_buffer_size,
offset=offset)
else:
raise ValueError('Invalid mode %s.' % mode)
def delete(filename, retry_params=None, _account_id=None):
"""Delete a Google Cloud Storage file.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Raises:
errors.NotFoundError: if the file doesn't exist prior to deletion.
"""
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
common.validate_file_path(filename)
filename = api_utils._quote_filename(filename)
status, resp_headers, content = api.delete_object(filename)
errors.check_status(status, [204], filename, resp_headers=resp_headers,
body=content)
def stat(filename, retry_params=None, _account_id=None):
"""Get GCSFileStat of a Google Cloud storage file.
Args:
filename: A Google Cloud Storage filename of form '/bucket/filename'.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Returns:
a GCSFileStat object containing info about this file.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
"""
common.validate_file_path(filename)
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
status, headers, content = api.head_object(
api_utils._quote_filename(filename))
errors.check_status(status, [200], filename, resp_headers=headers,
body=content)
file_stat = common.GCSFileStat(
filename=filename,
st_size=common.get_stored_content_length(headers),
st_ctime=common.http_time_to_posix(headers.get('last-modified')),
etag=headers.get('etag'),
content_type=headers.get('content-type'),
metadata=common.get_metadata(headers))
return file_stat
def copy2(src, dst, metadata=None, retry_params=None):
"""Copy the file content from src to dst.
Args:
src: /bucket/filename
dst: /bucket/filename
metadata: a dict of metadata for this copy. If None, old metadata is copied.
For example, {'x-goog-meta-foo': 'bar'}.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
Raises:
errors.AuthorizationError: if authorization failed.
errors.NotFoundError: if an object that's expected to exist doesn't.
"""
common.validate_file_path(src)
common.validate_file_path(dst)
if metadata is None:
metadata = {}
copy_meta = 'COPY'
else:
copy_meta = 'REPLACE'
metadata.update({'x-goog-copy-source': src,
'x-goog-metadata-directive': copy_meta})
api = storage_api._get_storage_api(retry_params=retry_params)
status, resp_headers, content = api.put_object(
api_utils._quote_filename(dst), headers=metadata)
errors.check_status(status, [200], src, metadata, resp_headers, body=content)
def listbucket(path_prefix, marker=None, prefix=None, max_keys=None,
delimiter=None, retry_params=None, _account_id=None):
"""Returns a GCSFileStat iterator over a bucket.
Optional arguments can limit the result to a subset of files under bucket.
This function has two modes:
1. List bucket mode: Lists all files in the bucket without any concept of
hierarchy. GCS doesn't have real directory hierarchies.
2. Directory emulation mode: If you specify the 'delimiter' argument,
it is used as a path separator to emulate a hierarchy of directories.
In this mode, the "path_prefix" argument should end in the delimiter
specified (thus designates a logical directory). The logical directory's
contents, both files and subdirectories, are listed. The names of
subdirectories returned will end with the delimiter. So listbucket
can be called with the subdirectory name to list the subdirectory's
contents.
Args:
path_prefix: A Google Cloud Storage path of format "/bucket" or
"/bucket/prefix". Only objects whose fullpath starts with the
path_prefix will be returned.
marker: Another path prefix. Only objects whose fullpath starts
lexicographically after marker will be returned (exclusive).
prefix: Deprecated. Use path_prefix.
max_keys: The limit on the number of objects to return. int.
For best performance, specify max_keys only if you know how many objects
you want. Otherwise, this method requests large batches and handles
pagination for you.
delimiter: Use to turn on directory mode. str of one or multiple chars
that your bucket uses as its directory separator.
retry_params: An api_utils.RetryParams for this call to GCS. If None,
the default one is used.
_account_id: Internal-use only.
Examples:
For files "/bucket/a",
"/bucket/bar/1"
"/bucket/foo",
"/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1",
Regular mode:
listbucket("/bucket/f", marker="/bucket/foo/1")
will match "/bucket/foo/2/1", "/bucket/foo/3/1".
Directory mode:
listbucket("/bucket/", delimiter="/")
will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/".
listbucket("/bucket/foo/", delimiter="/")
will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/"
Returns:
Regular mode:
A GCSFileStat iterator over matched files ordered by filename.
The iterator returns GCSFileStat objects. filename, etag, st_size,
st_ctime, and is_dir are set.
Directory emulation mode:
A GCSFileStat iterator over matched files and directories ordered by
name. The iterator returns GCSFileStat objects. For directories,
only the filename and is_dir fields are set.
The last name yielded can be used as next call's marker.
"""
if prefix:
common.validate_bucket_path(path_prefix)
bucket = path_prefix
else:
bucket, prefix = common._process_path_prefix(path_prefix)
if marker and marker.startswith(bucket):
marker = marker[len(bucket) + 1:]
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
options = {}
if marker:
options['marker'] = marker
if max_keys:
options['max-keys'] = max_keys
if prefix:
options['prefix'] = prefix
if delimiter:
options['delimiter'] = delimiter
return _Bucket(api, bucket, options)
def compose(list_of_files, destination_file, files_metadata=None,
content_type=None, retry_params=None, _account_id=None):
"""Runs the GCS Compose on the given files.
Merges between 2 and 32 files into one file. Composite files may even
be built from other existing composites, provided that the total
component count does not exceed 1024. See here for details:
https://cloud.google.com/storage/docs/composite-objects
Args:
list_of_files: List of file name strings with no leading slashes or bucket.
destination_file: Path to the output file. Must have the bucket in the path.
files_metadata: Optional, file metadata, order must match list_of_files,
see link for available options:
https://cloud.google.com/storage/docs/composite-objects#_Xml
content_type: Optional, used to specify content-header of the output file.
retry_params: Optional, an api_utils.RetryParams for this call to GCS.
If None,the default one is used.
_account_id: Internal-use only.
Raises:
ValueError: If the number of files is outside the range of 2-32.
"""
api = storage_api._get_storage_api(retry_params=retry_params,
account_id=_account_id)
if os.getenv('SERVER_SOFTWARE').startswith('Dev'):
def _temp_func(file_list, destination_file, content_type):
bucket = '/' + destination_file.split('/')[1] + '/'
with open(destination_file, 'w', content_type=content_type) as gcs_merge:
for source_file in file_list:
with open(bucket + source_file['Name'], 'r') as gcs_source:
gcs_merge.write(gcs_source.read())
compose_object = _temp_func
else:
compose_object = api.compose_object
file_list, _ = _validate_compose_list(destination_file,
list_of_files,
files_metadata, 32)
compose_object(file_list, destination_file, content_type)
def _file_exists(destination):
"""Checks if a file exists.
Tries to open the file.
If it succeeds returns True otherwise False.
Args:
destination: Full path to the file (ie. /bucket/object) with leading slash.
Returns:
True if the file is accessible otherwise False.
"""
try:
with open(destination, "r"):
return True
except errors.NotFoundError:
return False
def _validate_compose_list(destination_file, file_list,
files_metadata=None, number_of_files=32):
"""Validates the file_list and merges the file_list, files_metadata.
Args:
destination: Path to the file (ie. /destination_bucket/destination_file).
file_list: List of files to compose, see compose for details.
files_metadata: Meta details for each file in the file_list.
number_of_files: Maximum number of files allowed in the list.
Returns:
A tuple (list_of_files, bucket):
list_of_files: Ready to use dict version of the list.
bucket: bucket name extracted from the file paths.
"""
common.validate_file_path(destination_file)
bucket = destination_file[0:(destination_file.index('/', 1) + 1)]
try:
if isinstance(file_list, types.StringTypes):
raise TypeError
list_len = len(file_list)
except TypeError:
raise TypeError('file_list must be a list')
if list_len > number_of_files:
raise ValueError(
'Compose attempted to create composite with too many'
'(%i) components; limit is (%i).' % (list_len, number_of_files))
if list_len <= 1:
raise ValueError('Compose operation requires at'
' least two components; %i provided.' % list_len)
if files_metadata is None:
files_metadata = []
elif len(files_metadata) > list_len:
raise ValueError('files_metadata contains more entries(%i)'
' than file_list(%i)'
% (len(files_metadata), list_len))
list_of_files = []
for source_file, meta_data in itertools.izip_longest(file_list,
files_metadata):
if not isinstance(source_file, str):
raise TypeError('Each item of file_list must be a string')
if source_file.startswith('/'):
logging.warn('Detected a "/" at the start of the file, '
'Unless the file name contains a "/" it '
' may cause files to be misread')
if source_file.startswith(bucket):
logging.warn('Detected bucket name at the start of the file, '
'must not specify the bucket when listing file_names.'
' May cause files to be misread')
common.validate_file_path(bucket + source_file)
list_entry = {}
if meta_data is not None:
list_entry.update(meta_data)
list_entry['Name'] = source_file
list_of_files.append(list_entry)
return list_of_files, bucket
class _Bucket(object):
"""A wrapper for a GCS bucket as the return value of listbucket."""
def __init__(self, api, path, options):
"""Initialize.
Args:
api: storage_api instance.
path: bucket path of form '/bucket'.
options: a dict of listbucket options. Please see listbucket doc.
"""
self._init(api, path, options)
def _init(self, api, path, options):
self._api = api
self._path = path
self._options = options.copy()
self._get_bucket_fut = self._api.get_bucket_async(
self._path + '?' + urllib.urlencode(self._options))
self._last_yield = None
self._new_max_keys = self._options.get('max-keys')
def __getstate__(self):
options = self._options
if self._last_yield:
options['marker'] = self._last_yield.filename[len(self._path) + 1:]
if self._new_max_keys is not None:
options['max-keys'] = self._new_max_keys
return {'api': self._api,
'path': self._path,
'options': options}
def __setstate__(self, state):
self._init(state['api'], state['path'], state['options'])
def __iter__(self):
"""Iter over the bucket.
Yields:
GCSFileStat: a GCSFileStat for an object in the bucket.
They are ordered by GCSFileStat.filename.
"""
total = 0
max_keys = self._options.get('max-keys')
while self._get_bucket_fut:
status, resp_headers, content = self._get_bucket_fut.get_result()
errors.check_status(status, [200], self._path, resp_headers=resp_headers,
body=content, extras=self._options)
if self._should_get_another_batch(content):
self._get_bucket_fut = self._api.get_bucket_async(
self._path + '?' + urllib.urlencode(self._options))
else:
self._get_bucket_fut = None
root = ET.fromstring(content)
dirs = self._next_dir_gen(root)
files = self._next_file_gen(root)
next_file = files.next()
next_dir = dirs.next()
while ((max_keys is None or total < max_keys) and
not (next_file is None and next_dir is None)):
total += 1
if next_file is None:
self._last_yield = next_dir
next_dir = dirs.next()
elif next_dir is None:
self._last_yield = next_file
next_file = files.next()
elif next_dir < next_file:
self._last_yield = next_dir
next_dir = dirs.next()
elif next_file < next_dir:
self._last_yield = next_file
next_file = files.next()
else:
logging.error(
'Should never reach. next file is %r. next dir is %r.',
next_file, next_dir)
if self._new_max_keys:
self._new_max_keys -= 1
yield self._last_yield
def _next_file_gen(self, root):
"""Generator for next file element in the document.
Args:
root: root element of the XML tree.
Yields:
GCSFileStat for the next file.
"""
for e in root.getiterator(common._T_CONTENTS):
st_ctime, size, etag, key = None, None, None, None
for child in e.getiterator('*'):
if child.tag == common._T_LAST_MODIFIED:
st_ctime = common.dt_str_to_posix(child.text)
elif child.tag == common._T_ETAG:
etag = child.text
elif child.tag == common._T_SIZE:
size = child.text
elif child.tag == common._T_KEY:
key = child.text
yield common.GCSFileStat(self._path + '/' + key,
size, etag, st_ctime)
e.clear()
yield None
def _next_dir_gen(self, root):
"""Generator for next directory element in the document.
Args:
root: root element in the XML tree.
Yields:
GCSFileStat for the next directory.
"""
for e in root.getiterator(common._T_COMMON_PREFIXES):
yield common.GCSFileStat(
self._path + '/' + e.find(common._T_PREFIX).text,
st_size=None, etag=None, st_ctime=None, is_dir=True)
e.clear()
yield None
def _should_get_another_batch(self, content):
"""Whether to issue another GET bucket call.
Args:
content: response XML.
Returns:
True if should, also update self._options for the next request.
False otherwise.
"""
if ('max-keys' in self._options and
self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT):
return False
elements = self._find_elements(
content, set([common._T_IS_TRUNCATED,
common._T_NEXT_MARKER]))
if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true':
return False
next_marker = elements.get(common._T_NEXT_MARKER)
if next_marker is None:
self._options.pop('marker', None)
return False
self._options['marker'] = next_marker
return True
def _find_elements(self, result, elements):
"""Find interesting elements from XML.
This function tries to only look for specified elements
without parsing the entire XML. The specified elements is better
located near the beginning.
Args:
result: response XML.
elements: a set of interesting element tags.
Returns:
A dict from element tag to element value.
"""
element_mapping = {}
result = StringIO.StringIO(result)
for _, e in ET.iterparse(result, events=('end',)):
if not elements:
break
if e.tag in elements:
element_mapping[e.tag] = e.text
elements.remove(e.tag)
return element_mapping

429
cloudstorage/common.py Normal file
Просмотреть файл

@ -0,0 +1,429 @@
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Helpers shared by cloudstorage_stub and cloudstorage_api."""
__all__ = ['CS_XML_NS',
'CSFileStat',
'dt_str_to_posix',
'local_api_url',
'LOCAL_GCS_ENDPOINT',
'local_run',
'get_access_token',
'get_stored_content_length',
'get_metadata',
'GCSFileStat',
'http_time_to_posix',
'memory_usage',
'posix_time_to_http',
'posix_to_dt_str',
'set_access_token',
'validate_options',
'validate_bucket_name',
'validate_bucket_path',
'validate_file_path',
]
import calendar
import datetime
from email import utils as email_utils
import logging
import os
import re
try:
from google.appengine.api import runtime
except ImportError:
from google.appengine.api import runtime
_GCS_BUCKET_REGEX_BASE = r'[a-z0-9\.\-_]{3,63}'
_GCS_BUCKET_REGEX = re.compile(_GCS_BUCKET_REGEX_BASE + r'$')
_GCS_BUCKET_PATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'$')
_GCS_PATH_PREFIX_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'.*')
_GCS_FULLPATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'/.*')
_GCS_METADATA = ['x-goog-meta-',
'content-disposition',
'cache-control',
'content-encoding']
_GCS_OPTIONS = _GCS_METADATA + ['x-goog-acl']
CS_XML_NS = 'http://doc.s3.amazonaws.com/2006-03-01'
LOCAL_GCS_ENDPOINT = '/_ah/gcs'
_access_token = ''
_MAX_GET_BUCKET_RESULT = 1000
def set_access_token(access_token):
"""Set the shared access token to authenticate with Google Cloud Storage.
When set, the library will always attempt to communicate with the
real Google Cloud Storage with this token even when running on dev appserver.
Note the token could expire so it's up to you to renew it.
When absent, the library will automatically request and refresh a token
on appserver, or when on dev appserver, talk to a Google Cloud Storage
stub.
Args:
access_token: you can get one by run 'gsutil -d ls' and copy the
str after 'Bearer'.
"""
global _access_token
_access_token = access_token
def get_access_token():
"""Returns the shared access token."""
return _access_token
class GCSFileStat(object):
"""Container for GCS file stat."""
def __init__(self,
filename,
st_size,
etag,
st_ctime,
content_type=None,
metadata=None,
is_dir=False):
"""Initialize.
For files, the non optional arguments are always set.
For directories, only filename and is_dir is set.
Args:
filename: a Google Cloud Storage filename of form '/bucket/filename'.
st_size: file size in bytes. long compatible.
etag: hex digest of the md5 hash of the file's content. str.
st_ctime: posix file creation time. float compatible.
content_type: content type. str.
metadata: a str->str dict of user specified options when creating
the file. Possible keys are x-goog-meta-, content-disposition,
content-encoding, and cache-control.
is_dir: True if this represents a directory. False if this is a real file.
"""
self.filename = filename
self.is_dir = is_dir
self.st_size = None
self.st_ctime = None
self.etag = None
self.content_type = content_type
self.metadata = metadata
if not is_dir:
self.st_size = long(st_size)
self.st_ctime = float(st_ctime)
if etag[0] == '"' and etag[-1] == '"':
etag = etag[1:-1]
self.etag = etag
def __repr__(self):
if self.is_dir:
return '(directory: %s)' % self.filename
return (
'(filename: %(filename)s, st_size: %(st_size)s, '
'st_ctime: %(st_ctime)s, etag: %(etag)s, '
'content_type: %(content_type)s, '
'metadata: %(metadata)s)' %
dict(filename=self.filename,
st_size=self.st_size,
st_ctime=self.st_ctime,
etag=self.etag,
content_type=self.content_type,
metadata=self.metadata))
def __cmp__(self, other):
if not isinstance(other, self.__class__):
raise ValueError('Argument to cmp must have the same type. '
'Expect %s, got %s', self.__class__.__name__,
other.__class__.__name__)
if self.filename > other.filename:
return 1
elif self.filename < other.filename:
return -1
return 0
def __hash__(self):
if self.etag:
return hash(self.etag)
return hash(self.filename)
CSFileStat = GCSFileStat
def get_stored_content_length(headers):
"""Return the content length (in bytes) of the object as stored in GCS.
x-goog-stored-content-length should always be present except when called via
the local dev_appserver. Therefore if it is not present we default to the
standard content-length header.
Args:
headers: a dict of headers from the http response.
Returns:
the stored content length.
"""
length = headers.get('x-goog-stored-content-length')
if length is None:
length = headers.get('content-length')
return length
def get_metadata(headers):
"""Get user defined options from HTTP response headers."""
return dict((k, v) for k, v in headers.iteritems()
if any(k.lower().startswith(valid) for valid in _GCS_METADATA))
def validate_bucket_name(name):
"""Validate a Google Storage bucket name.
Args:
name: a Google Storage bucket name with no prefix or suffix.
Raises:
ValueError: if name is invalid.
"""
_validate_path(name)
if not _GCS_BUCKET_REGEX.match(name):
raise ValueError('Bucket should be 3-63 characters long using only a-z,'
'0-9, underscore, dash or dot but got %s' % name)
def validate_bucket_path(path):
"""Validate a Google Cloud Storage bucket path.
Args:
path: a Google Storage bucket path. It should have form '/bucket'.
Raises:
ValueError: if path is invalid.
"""
_validate_path(path)
if not _GCS_BUCKET_PATH_REGEX.match(path):
raise ValueError('Bucket should have format /bucket '
'but got %s' % path)
def validate_file_path(path):
"""Validate a Google Cloud Storage file path.
Args:
path: a Google Storage file path. It should have form '/bucket/filename'.
Raises:
ValueError: if path is invalid.
"""
_validate_path(path)
if not _GCS_FULLPATH_REGEX.match(path):
raise ValueError('Path should have format /bucket/filename '
'but got %s' % path)
def _process_path_prefix(path_prefix):
"""Validate and process a Google Cloud Stoarge path prefix.
Args:
path_prefix: a Google Cloud Storage path prefix of format '/bucket/prefix'
or '/bucket/' or '/bucket'.
Raises:
ValueError: if path is invalid.
Returns:
a tuple of /bucket and prefix. prefix can be None.
"""
_validate_path(path_prefix)
if not _GCS_PATH_PREFIX_REGEX.match(path_prefix):
raise ValueError('Path prefix should have format /bucket, /bucket/, '
'or /bucket/prefix but got %s.' % path_prefix)
bucket_name_end = path_prefix.find('/', 1)
bucket = path_prefix
prefix = None
if bucket_name_end != -1:
bucket = path_prefix[:bucket_name_end]
prefix = path_prefix[bucket_name_end + 1:] or None
return bucket, prefix
def _validate_path(path):
"""Basic validation of Google Storage paths.
Args:
path: a Google Storage path. It should have form '/bucket/filename'
or '/bucket'.
Raises:
ValueError: if path is invalid.
TypeError: if path is not of type basestring.
"""
if not path:
raise ValueError('Path is empty')
if not isinstance(path, basestring):
raise TypeError('Path should be a string but is %s (%s).' %
(path.__class__, path))
def validate_options(options):
"""Validate Google Cloud Storage options.
Args:
options: a str->basestring dict of options to pass to Google Cloud Storage.
Raises:
ValueError: if option is not supported.
TypeError: if option is not of type str or value of an option
is not of type basestring.
"""
if not options:
return
for k, v in options.iteritems():
if not isinstance(k, str):
raise TypeError('option %r should be a str.' % k)
if not any(k.lower().startswith(valid) for valid in _GCS_OPTIONS):
raise ValueError('option %s is not supported.' % k)
if not isinstance(v, basestring):
raise TypeError('value %r for option %s should be of type basestring.' %
(v, k))
def http_time_to_posix(http_time):
"""Convert HTTP time format to posix time.
See http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1
for http time format.
Args:
http_time: time in RFC 2616 format. e.g.
"Mon, 20 Nov 1995 19:12:08 GMT".
Returns:
A float of secs from unix epoch.
"""
if http_time is not None:
return email_utils.mktime_tz(email_utils.parsedate_tz(http_time))
def posix_time_to_http(posix_time):
"""Convert posix time to HTML header time format.
Args:
posix_time: unix time.
Returns:
A datatime str in RFC 2616 format.
"""
if posix_time:
return email_utils.formatdate(posix_time, usegmt=True)
_DT_FORMAT = '%Y-%m-%dT%H:%M:%S'
def dt_str_to_posix(dt_str):
"""format str to posix.
datetime str is of format %Y-%m-%dT%H:%M:%S.%fZ,
e.g. 2013-04-12T00:22:27.978Z. According to ISO 8601, T is a separator
between date and time when they are on the same line.
Z indicates UTC (zero meridian).
A pointer: http://www.cl.cam.ac.uk/~mgk25/iso-time.html
This is used to parse LastModified node from GCS's GET bucket XML response.
Args:
dt_str: A datetime str.
Returns:
A float of secs from unix epoch. By posix definition, epoch is midnight
1970/1/1 UTC.
"""
parsable, _ = dt_str.split('.')
dt = datetime.datetime.strptime(parsable, _DT_FORMAT)
return calendar.timegm(dt.utctimetuple())
def posix_to_dt_str(posix):
"""Reverse of str_to_datetime.
This is used by GCS stub to generate GET bucket XML response.
Args:
posix: A float of secs from unix epoch.
Returns:
A datetime str.
"""
dt = datetime.datetime.utcfromtimestamp(posix)
dt_str = dt.strftime(_DT_FORMAT)
return dt_str + '.000Z'
def local_run():
"""Whether we should hit GCS dev appserver stub."""
server_software = os.environ.get('SERVER_SOFTWARE')
if server_software is None:
return True
if 'remote_api' in server_software:
return False
if server_software.startswith(('Development', 'testutil')):
return True
return False
def local_api_url():
"""Return URL for GCS emulation on dev appserver."""
return 'http://%s%s' % (os.environ.get('HTTP_HOST'), LOCAL_GCS_ENDPOINT)
def memory_usage(method):
"""Log memory usage before and after a method."""
def wrapper(*args, **kwargs):
logging.info('Memory before method %s is %s.',
method.__name__, runtime.memory_usage().current())
result = method(*args, **kwargs)
logging.info('Memory after method %s is %s',
method.__name__, runtime.memory_usage().current())
return result
return wrapper
def _add_ns(tagname):
return '{%(ns)s}%(tag)s' % {'ns': CS_XML_NS,
'tag': tagname}
_T_CONTENTS = _add_ns('Contents')
_T_LAST_MODIFIED = _add_ns('LastModified')
_T_ETAG = _add_ns('ETag')
_T_KEY = _add_ns('Key')
_T_SIZE = _add_ns('Size')
_T_PREFIX = _add_ns('Prefix')
_T_COMMON_PREFIXES = _add_ns('CommonPrefixes')
_T_NEXT_MARKER = _add_ns('NextMarker')
_T_IS_TRUNCATED = _add_ns('IsTruncated')

143
cloudstorage/errors.py Normal file
Просмотреть файл

@ -0,0 +1,143 @@
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Google Cloud Storage specific Files API calls."""
__all__ = ['AuthorizationError',
'check_status',
'Error',
'FatalError',
'FileClosedError',
'ForbiddenError',
'InvalidRange',
'NotFoundError',
'ServerError',
'TimeoutError',
'TransientError',
]
import httplib
class Error(Exception):
"""Base error for all gcs operations.
Error can happen on GAE side or GCS server side.
For details on a particular GCS HTTP response code, see
https://developers.google.com/storage/docs/reference-status#standardcodes
"""
class TransientError(Error):
"""TransientError could be retried."""
class TimeoutError(TransientError):
"""HTTP 408 timeout."""
class FatalError(Error):
"""FatalError shouldn't be retried."""
class FileClosedError(FatalError):
"""File is already closed.
This can happen when the upload has finished but 'write' is called on
a stale upload handle.
"""
class NotFoundError(FatalError):
"""HTTP 404 resource not found."""
class ForbiddenError(FatalError):
"""HTTP 403 Forbidden.
While GCS replies with a 403 error for many reasons, the most common one
is due to bucket permission not correctly setup for your app to access.
"""
class AuthorizationError(FatalError):
"""HTTP 401 authentication required.
Unauthorized request has been received by GCS.
This error is mostly handled by GCS client. GCS client will request
a new access token and retry the request.
"""
class InvalidRange(FatalError):
"""HTTP 416 RequestRangeNotSatifiable."""
class ServerError(TransientError):
"""HTTP >= 500 server side error."""
def check_status(status, expected, path, headers=None,
resp_headers=None, body=None, extras=None):
"""Check HTTP response status is expected.
Args:
status: HTTP response status. int.
expected: a list of expected statuses. A list of ints.
path: filename or a path prefix.
headers: HTTP request headers.
resp_headers: HTTP response headers.
body: HTTP response body.
extras: extra info to be logged verbatim if error occurs.
Raises:
AuthorizationError: if authorization failed.
NotFoundError: if an object that's expected to exist doesn't.
TimeoutError: if HTTP request timed out.
ServerError: if server experienced some errors.
FatalError: if any other unexpected errors occurred.
"""
if status in expected:
return
msg = ('Expect status %r from Google Storage. But got status %d.\n'
'Path: %r.\n'
'Request headers: %r.\n'
'Response headers: %r.\n'
'Body: %r.\n'
'Extra info: %r.\n' %
(expected, status, path, headers, resp_headers, body, extras))
if status == httplib.UNAUTHORIZED:
raise AuthorizationError(msg)
elif status == httplib.FORBIDDEN:
raise ForbiddenError(msg)
elif status == httplib.NOT_FOUND:
raise NotFoundError(msg)
elif status == httplib.REQUEST_TIMEOUT:
raise TimeoutError(msg)
elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE:
raise InvalidRange(msg)
elif (status == httplib.OK and 308 in expected and
httplib.OK not in expected):
raise FileClosedError(msg)
elif status >= 500:
raise ServerError(msg)
else:
raise FatalError(msg)

268
cloudstorage/rest_api.py Normal file
Просмотреть файл

@ -0,0 +1,268 @@
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Base and helper classes for Google RESTful APIs."""
__all__ = ['add_sync_methods']
import logging
import os
import random
import time
from . import api_utils
try:
from google.appengine.api import app_identity
from google.appengine.ext import ndb
except ImportError:
from google.appengine.api import app_identity
from google.appengine.ext import ndb
def _make_sync_method(name):
"""Helper to synthesize a synchronous method from an async method name.
Used by the @add_sync_methods class decorator below.
Args:
name: The name of the synchronous method.
Returns:
A method (with first argument 'self') that retrieves and calls
self.<name>, passing its own arguments, expects it to return a
Future, and then waits for and returns that Future's result.
"""
def sync_wrapper(self, *args, **kwds):
method = getattr(self, name)
future = method(*args, **kwds)
return future.get_result()
return sync_wrapper
def add_sync_methods(cls):
"""Class decorator to add synchronous methods corresponding to async methods.
This modifies the class in place, adding additional methods to it.
If a synchronous method of a given name already exists it is not
replaced.
Args:
cls: A class.
Returns:
The same class, modified in place.
"""
for name in cls.__dict__.keys():
if name.endswith('_async'):
sync_name = name[:-6]
if not hasattr(cls, sync_name):
setattr(cls, sync_name, _make_sync_method(name))
return cls
class _AE_TokenStorage_(ndb.Model):
"""Entity to store app_identity tokens in memcache."""
token = ndb.StringProperty()
expires = ndb.FloatProperty()
@ndb.tasklet
def _make_token_async(scopes, service_account_id):
"""Get a fresh authentication token.
Args:
scopes: A list of scopes.
service_account_id: Internal-use only.
Raises:
An ndb.Return with a tuple (token, expiration_time) where expiration_time is
seconds since the epoch.
"""
rpc = app_identity.create_rpc()
app_identity.make_get_access_token_call(rpc, scopes, service_account_id)
token, expires_at = yield rpc
raise ndb.Return((token, expires_at))
class _RestApi(object):
"""Base class for REST-based API wrapper classes.
This class manages authentication tokens and request retries. All
APIs are available as synchronous and async methods; synchronous
methods are synthesized from async ones by the add_sync_methods()
function in this module.
WARNING: Do NOT directly use this api. It's an implementation detail
and is subject to change at any release.
"""
def __init__(self, scopes, service_account_id=None, token_maker=None,
retry_params=None):
"""Constructor.
Args:
scopes: A scope or a list of scopes.
service_account_id: Internal use only.
token_maker: An asynchronous function of the form
(scopes, service_account_id) -> (token, expires).
retry_params: An instance of api_utils.RetryParams. If None, the
default for current thread will be used.
"""
if isinstance(scopes, basestring):
scopes = [scopes]
self.scopes = scopes
self.service_account_id = service_account_id
self.make_token_async = token_maker or _make_token_async
if not retry_params:
retry_params = api_utils._get_default_retry_params()
self.retry_params = retry_params
self.user_agent = {'User-Agent': retry_params._user_agent}
self.expiration_headroom = random.randint(60, 240)
def __getstate__(self):
"""Store state as part of serialization/pickling."""
return {'scopes': self.scopes,
'id': self.service_account_id,
'a_maker': (None if self.make_token_async == _make_token_async
else self.make_token_async),
'retry_params': self.retry_params,
'expiration_headroom': self.expiration_headroom}
def __setstate__(self, state):
"""Restore state as part of deserialization/unpickling."""
self.__init__(state['scopes'],
service_account_id=state['id'],
token_maker=state['a_maker'],
retry_params=state['retry_params'])
self.expiration_headroom = state['expiration_headroom']
@ndb.tasklet
def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Issue one HTTP request.
It performs async retries using tasklets.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
Yields:
The async fetch of the url.
"""
retry_wrapper = api_utils._RetryWrapper(
self.retry_params,
retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS,
should_retry=api_utils._should_retry)
resp = yield retry_wrapper.run(
self.urlfetch_async,
url=url,
method=method,
headers=headers,
payload=payload,
deadline=deadline,
callback=callback,
follow_redirects=False)
raise ndb.Return((resp.status_code, resp.headers, resp.content))
@ndb.tasklet
def get_token_async(self, refresh=False):
"""Get an authentication token.
The token is cached in memcache, keyed by the scopes argument.
Uses a random token expiration headroom value generated in the constructor
to eliminate a burst of GET_ACCESS_TOKEN API requests.
Args:
refresh: If True, ignore a cached token; default False.
Yields:
An authentication token. This token is guaranteed to be non-expired.
"""
key = '%s,%s' % (self.service_account_id, ','.join(self.scopes))
ts = yield _AE_TokenStorage_.get_by_id_async(
key, use_cache=True, use_memcache=True,
use_datastore=self.retry_params.save_access_token)
if refresh or ts is None or ts.expires < (
time.time() + self.expiration_headroom):
token, expires_at = yield self.make_token_async(
self.scopes, self.service_account_id)
timeout = int(expires_at - time.time())
ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at)
if timeout > 0:
yield ts.put_async(memcache_timeout=timeout,
use_datastore=self.retry_params.save_access_token,
use_cache=True, use_memcache=True)
raise ndb.Return(ts.token)
@ndb.tasklet
def urlfetch_async(self, url, method='GET', headers=None,
payload=None, deadline=None, callback=None,
follow_redirects=False):
"""Make an async urlfetch() call.
This is an async wrapper around urlfetch(). It adds an authentication
header.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
follow_redirects: whether or not to follow redirects.
Yields:
This returns a Future despite not being decorated with @ndb.tasklet!
"""
headers = {} if headers is None else dict(headers)
headers.update(self.user_agent)
try:
self.token = yield self.get_token_async()
except app_identity.InternalError, e:
if os.environ.get('DATACENTER', '').endswith('sandman'):
self.token = None
logging.warning('Could not fetch an authentication token in sandman '
'based Appengine devel setup; proceeding without one.')
else:
raise e
if self.token:
headers['authorization'] = 'OAuth ' + self.token
deadline = deadline or self.retry_params.urlfetch_timeout
ctx = ndb.get_context()
resp = yield ctx.urlfetch(
url, payload=payload, method=method,
headers=headers, follow_redirects=follow_redirects,
deadline=deadline, callback=callback)
raise ndb.Return(resp)
_RestApi = add_sync_methods(_RestApi)

924
cloudstorage/storage_api.py Normal file
Просмотреть файл

@ -0,0 +1,924 @@
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Python wrappers for the Google Storage RESTful API."""
__all__ = ['ReadBuffer',
'StreamingBuffer',
]
import collections
import os
import urlparse
from . import api_utils
from . import common
from . import errors
from . import rest_api
try:
from google.appengine.api import urlfetch
from google.appengine.ext import ndb
except ImportError:
from google.appengine.api import urlfetch
from google.appengine.ext import ndb
def _get_storage_api(retry_params, account_id=None):
"""Returns storage_api instance for API methods.
Args:
retry_params: An instance of api_utils.RetryParams. If none,
thread's default will be used.
account_id: Internal-use only.
Returns:
A storage_api instance to handle urlfetch work to GCS.
On dev appserver, this instance by default will talk to a local stub
unless common.ACCESS_TOKEN is set. That token will be used to talk
to the real GCS.
"""
api = _StorageApi(_StorageApi.full_control_scope,
service_account_id=account_id,
retry_params=retry_params)
if common.local_run() and not common.get_access_token():
api.api_url = common.local_api_url()
if common.get_access_token():
api.token = common.get_access_token()
return api
class _StorageApi(rest_api._RestApi):
"""A simple wrapper for the Google Storage RESTful API.
WARNING: Do NOT directly use this api. It's an implementation detail
and is subject to change at any release.
All async methods have similar args and returns.
Args:
path: The path to the Google Storage object or bucket, e.g.
'/mybucket/myfile' or '/mybucket'.
**kwd: Options for urlfetch. e.g.
headers={'content-type': 'text/plain'}, payload='blah'.
Returns:
A ndb Future. When fulfilled, future.get_result() should return
a tuple of (status, headers, content) that represents a HTTP response
of Google Cloud Storage XML API.
"""
api_url = 'https://storage.googleapis.com'
read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
def __getstate__(self):
"""Store state as part of serialization/pickling.
Returns:
A tuple (of dictionaries) with the state of this object
"""
return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
def __setstate__(self, state):
"""Restore state as part of deserialization/unpickling.
Args:
state: the tuple from a __getstate__ call
"""
superstate, localstate = state
super(_StorageApi, self).__setstate__(superstate)
self.api_url = localstate['api_url']
@api_utils._eager_tasklet
@ndb.tasklet
def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Inherit docs.
This method translates urlfetch exceptions to more service specific ones.
"""
if headers is None:
headers = {}
if 'x-goog-api-version' not in headers:
headers['x-goog-api-version'] = '2'
headers['accept-encoding'] = 'gzip, *'
try:
resp_tuple = yield super(_StorageApi, self).do_request_async(
url, method=method, headers=headers, payload=payload,
deadline=deadline, callback=callback)
except urlfetch.DownloadError, e:
raise errors.TimeoutError(
'Request to Google Cloud Storage timed out.', e)
raise ndb.Return(resp_tuple)
def post_object_async(self, path, **kwds):
"""POST to an object."""
return self.do_request_async(self.api_url + path, 'POST', **kwds)
def put_object_async(self, path, **kwds):
"""PUT an object."""
return self.do_request_async(self.api_url + path, 'PUT', **kwds)
def get_object_async(self, path, **kwds):
"""GET an object.
Note: No payload argument is supported.
"""
return self.do_request_async(self.api_url + path, 'GET', **kwds)
def delete_object_async(self, path, **kwds):
"""DELETE an object.
Note: No payload argument is supported.
"""
return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
def head_object_async(self, path, **kwds):
"""HEAD an object.
Depending on request headers, HEAD returns various object properties,
e.g. Content-Length, Last-Modified, and ETag.
Note: No payload argument is supported.
"""
return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
def get_bucket_async(self, path, **kwds):
"""GET a bucket."""
return self.do_request_async(self.api_url + path, 'GET', **kwds)
def compose_object(self, file_list, destination_file, content_type):
"""COMPOSE multiple objects together.
Using the given list of files, calls the put object with the compose flag.
This call merges all the files into the destination file.
Args:
file_list: list of dicts with the file name.
destination_file: Path to the destination file.
content_type: Content type for the destination file.
"""
xml_setting_list = ['<ComposeRequest>']
for meta_data in file_list:
xml_setting_list.append('<Component>')
for key, val in meta_data.iteritems():
xml_setting_list.append('<%s>%s</%s>' % (key, val, key))
xml_setting_list.append('</Component>')
xml_setting_list.append('</ComposeRequest>')
xml = ''.join(xml_setting_list)
if content_type is not None:
headers = {'Content-Type': content_type}
else:
headers = None
status, resp_headers, content = self.put_object(
api_utils._quote_filename(destination_file) + '?compose',
payload=xml,
headers=headers)
errors.check_status(status, [200], destination_file, resp_headers,
body=content)
_StorageApi = rest_api.add_sync_methods(_StorageApi)
class ReadBuffer(object):
"""A class for reading Google storage files."""
DEFAULT_BUFFER_SIZE = 1024 * 1024
MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
def __init__(self,
api,
path,
buffer_size=DEFAULT_BUFFER_SIZE,
max_request_size=MAX_REQUEST_SIZE,
offset=0):
"""Constructor.
Args:
api: A StorageApi instance.
path: Quoted/escaped path to the object, e.g. /mybucket/myfile
buffer_size: buffer size. The ReadBuffer keeps
one buffer. But there may be a pending future that contains
a second buffer. This size must be less than max_request_size.
max_request_size: Max bytes to request in one urlfetch.
offset: Number of bytes to skip at the start of the file. If None, 0 is
used.
"""
self._api = api
self._path = path
self.name = api_utils._unquote_filename(path)
self.closed = False
assert buffer_size <= max_request_size
self._buffer_size = buffer_size
self._max_request_size = max_request_size
self._offset = offset
self._buffer = _Buffer()
self._etag = None
get_future = self._get_segment(offset, self._buffer_size, check_response=False)
status, headers, content = self._api.head_object(path)
errors.check_status(status, [200], path, resp_headers=headers, body=content)
self._file_size = long(common.get_stored_content_length(headers))
self._check_etag(headers.get('etag'))
self._buffer_future = None
if self._file_size != 0:
content, check_response_closure = get_future.get_result()
check_response_closure()
self._buffer.reset(content)
self._request_next_buffer()
def __getstate__(self):
"""Store state as part of serialization/pickling.
The contents of the read buffer are not stored, only the current offset for
data read by the client. A new read buffer is established at unpickling.
The head information for the object (file size and etag) are stored to
reduce startup and ensure the file has not changed.
Returns:
A dictionary with the state of this object
"""
return {'api': self._api,
'path': self._path,
'buffer_size': self._buffer_size,
'request_size': self._max_request_size,
'etag': self._etag,
'size': self._file_size,
'offset': self._offset,
'closed': self.closed}
def __setstate__(self, state):
"""Restore state as part of deserialization/unpickling.
Args:
state: the dictionary from a __getstate__ call
Along with restoring the state, pre-fetch the next read buffer.
"""
self._api = state['api']
self._path = state['path']
self.name = api_utils._unquote_filename(self._path)
self._buffer_size = state['buffer_size']
self._max_request_size = state['request_size']
self._etag = state['etag']
self._file_size = state['size']
self._offset = state['offset']
self._buffer = _Buffer()
self.closed = state['closed']
self._buffer_future = None
if self._remaining() and not self.closed:
self._request_next_buffer()
def __iter__(self):
"""Iterator interface.
Note the ReadBuffer container itself is the iterator. It's
(quote PEP0234)
'destructive: they consumes all the values and a second iterator
cannot easily be created that iterates independently over the same values.
You could open the file for the second time, or seek() to the beginning.'
Returns:
Self.
"""
return self
def next(self):
line = self.readline()
if not line:
raise StopIteration()
return line
def readline(self, size=-1):
"""Read one line delimited by '\n' from the file.
A trailing newline character is kept in the string. It may be absent when a
file ends with an incomplete line. If the size argument is non-negative,
it specifies the maximum string size (counting the newline) to return.
A negative size is the same as unspecified. Empty string is returned
only when EOF is encountered immediately.
Args:
size: Maximum number of bytes to read. If not specified, readline stops
only on '\n' or EOF.
Returns:
The data read as a string.
Raises:
IOError: When this buffer is closed.
"""
self._check_open()
if size == 0 or not self._remaining():
return ''
data_list = []
newline_offset = self._buffer.find_newline(size)
while newline_offset < 0:
data = self._buffer.read(size)
size -= len(data)
self._offset += len(data)
data_list.append(data)
if size == 0 or not self._remaining():
return ''.join(data_list)
self._buffer.reset(self._buffer_future.get_result())
self._request_next_buffer()
newline_offset = self._buffer.find_newline(size)
data = self._buffer.read_to_offset(newline_offset + 1)
self._offset += len(data)
data_list.append(data)
return ''.join(data_list)
def read(self, size=-1):
"""Read data from RAW file.
Args:
size: Number of bytes to read as integer. Actual number of bytes
read is always equal to size unless EOF is reached. If size is
negative or unspecified, read the entire file.
Returns:
data read as str.
Raises:
IOError: When this buffer is closed.
"""
self._check_open()
if not self._remaining():
return ''
data_list = []
while True:
remaining = self._buffer.remaining()
if size >= 0 and size < remaining:
data_list.append(self._buffer.read(size))
self._offset += size
break
else:
size -= remaining
self._offset += remaining
data_list.append(self._buffer.read())
if self._buffer_future is None:
if size < 0 or size >= self._remaining():
needs = self._remaining()
else:
needs = size
data_list.extend(self._get_segments(self._offset, needs))
self._offset += needs
break
if self._buffer_future:
self._buffer.reset(self._buffer_future.get_result())
self._buffer_future = None
if self._buffer_future is None:
self._request_next_buffer()
return ''.join(data_list)
def _remaining(self):
return self._file_size - self._offset
def _request_next_buffer(self):
"""Request next buffer.
Requires self._offset and self._buffer are in consistent state.
"""
self._buffer_future = None
next_offset = self._offset + self._buffer.remaining()
if next_offset != self._file_size:
self._buffer_future = self._get_segment(next_offset,
self._buffer_size)
def _get_segments(self, start, request_size):
"""Get segments of the file from Google Storage as a list.
A large request is broken into segments to avoid hitting urlfetch
response size limit. Each segment is returned from a separate urlfetch.
Args:
start: start offset to request. Inclusive. Have to be within the
range of the file.
request_size: number of bytes to request.
Returns:
A list of file segments in order
"""
if not request_size:
return []
end = start + request_size
futures = []
while request_size > self._max_request_size:
futures.append(self._get_segment(start, self._max_request_size))
request_size -= self._max_request_size
start += self._max_request_size
if start < end:
futures.append(self._get_segment(start, end - start))
return [fut.get_result() for fut in futures]
@ndb.tasklet
def _get_segment(self, start, request_size, check_response=True):
"""Get a segment of the file from Google Storage.
Args:
start: start offset of the segment. Inclusive. Have to be within the
range of the file.
request_size: number of bytes to request. Have to be small enough
for a single urlfetch request. May go over the logical range of the
file.
check_response: True to check the validity of GCS response automatically
before the future returns. False otherwise. See Yields section.
Yields:
If check_response is True, the segment [start, start + request_size)
of the file.
Otherwise, a tuple. The first element is the unverified file segment.
The second element is a closure that checks response. Caller should
first invoke the closure before consuing the file segment.
Raises:
ValueError: if the file has changed while reading.
"""
end = start + request_size - 1
content_range = '%d-%d' % (start, end)
headers = {'Range': 'bytes=' + content_range}
status, resp_headers, content = yield self._api.get_object_async(
self._path, headers=headers)
def _checker():
errors.check_status(status, [200, 206], self._path, headers,
resp_headers, body=content)
self._check_etag(resp_headers.get('etag'))
if check_response:
_checker()
raise ndb.Return(content)
raise ndb.Return(content, _checker)
def _check_etag(self, etag):
"""Check if etag is the same across requests to GCS.
If self._etag is None, set it. If etag is set, check that the new
etag equals the old one.
In the __init__ method, we fire one HEAD and one GET request using
ndb tasklet. One of them would return first and set the first value.
Args:
etag: etag from a GCS HTTP response. None if etag is not part of the
response header. It could be None for example in the case of GCS
composite file.
Raises:
ValueError: if two etags are not equal.
"""
if etag is None:
return
elif self._etag is None:
self._etag = etag
elif self._etag != etag:
raise ValueError('File on GCS has changed while reading.')
def close(self):
self.closed = True
self._buffer = None
self._buffer_future = None
def __enter__(self):
return self
def __exit__(self, atype, value, traceback):
self.close()
return False
def seek(self, offset, whence=os.SEEK_SET):
"""Set the file's current offset.
Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
Args:
offset: seek offset as number.
whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
(seek relative to the end, offset should be negative).
Raises:
IOError: When this buffer is closed.
ValueError: When whence is invalid.
"""
self._check_open()
self._buffer.reset()
self._buffer_future = None
if whence == os.SEEK_SET:
self._offset = offset
elif whence == os.SEEK_CUR:
self._offset += offset
elif whence == os.SEEK_END:
self._offset = self._file_size + offset
else:
raise ValueError('Whence mode %s is invalid.' % str(whence))
self._offset = min(self._offset, self._file_size)
self._offset = max(self._offset, 0)
if self._remaining():
self._request_next_buffer()
def tell(self):
"""Tell the file's current offset.
Returns:
current offset in reading this file.
Raises:
IOError: When this buffer is closed.
"""
self._check_open()
return self._offset
def _check_open(self):
if self.closed:
raise IOError('Buffer is closed.')
def seekable(self):
return True
def readable(self):
return True
def writable(self):
return False
class _Buffer(object):
"""In memory buffer."""
def __init__(self):
self.reset()
def reset(self, content='', offset=0):
self._buffer = content
self._offset = offset
def read(self, size=-1):
"""Returns bytes from self._buffer and update related offsets.
Args:
size: number of bytes to read starting from current offset.
Read the entire buffer if negative.
Returns:
Requested bytes from buffer.
"""
if size < 0:
offset = len(self._buffer)
else:
offset = self._offset + size
return self.read_to_offset(offset)
def read_to_offset(self, offset):
"""Returns bytes from self._buffer and update related offsets.
Args:
offset: read from current offset to this offset, exclusive.
Returns:
Requested bytes from buffer.
"""
assert offset >= self._offset
result = self._buffer[self._offset: offset]
self._offset += len(result)
return result
def remaining(self):
return len(self._buffer) - self._offset
def find_newline(self, size=-1):
"""Search for newline char in buffer starting from current offset.
Args:
size: number of bytes to search. -1 means all.
Returns:
offset of newline char in buffer. -1 if doesn't exist.
"""
if size < 0:
return self._buffer.find('\n', self._offset)
return self._buffer.find('\n', self._offset, self._offset + size)
class StreamingBuffer(object):
"""A class for creating large objects using the 'resumable' API.
The API is a subset of the Python writable stream API sufficient to
support writing zip files using the zipfile module.
The exact sequence of calls and use of headers is documented at
https://developers.google.com/storage/docs/developer-guide#unknownresumables
"""
_blocksize = 256 * 1024
_flushsize = 8 * _blocksize
_maxrequestsize = 9 * 4 * _blocksize
def __init__(self,
api,
path,
content_type=None,
gcs_headers=None):
"""Constructor.
Args:
api: A StorageApi instance.
path: Quoted/escaped path to the object, e.g. /mybucket/myfile
content_type: Optional content-type; Default value is
delegate to Google Cloud Storage.
gcs_headers: additional gs headers as a str->str dict, e.g
{'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
Raises:
IOError: When this location can not be found.
"""
assert self._maxrequestsize > self._blocksize
assert self._maxrequestsize % self._blocksize == 0
assert self._maxrequestsize >= self._flushsize
self._api = api
self._path = path
self.name = api_utils._unquote_filename(path)
self.closed = False
self._buffer = collections.deque()
self._buffered = 0
self._written = 0
self._offset = 0
headers = {'x-goog-resumable': 'start'}
if content_type:
headers['content-type'] = content_type
if gcs_headers:
headers.update(gcs_headers)
status, resp_headers, content = self._api.post_object(path, headers=headers)
errors.check_status(status, [201], path, headers, resp_headers,
body=content)
loc = resp_headers.get('location')
if not loc:
raise IOError('No location header found in 201 response')
parsed = urlparse.urlparse(loc)
self._path_with_token = '%s?%s' % (self._path, parsed.query)
def __getstate__(self):
"""Store state as part of serialization/pickling.
The contents of the write buffer are stored. Writes to the underlying
storage are required to be on block boundaries (_blocksize) except for the
last write. In the worst case the pickled version of this object may be
slightly larger than the blocksize.
Returns:
A dictionary with the state of this object
"""
return {'api': self._api,
'path': self._path,
'path_token': self._path_with_token,
'buffer': self._buffer,
'buffered': self._buffered,
'written': self._written,
'offset': self._offset,
'closed': self.closed}
def __setstate__(self, state):
"""Restore state as part of deserialization/unpickling.
Args:
state: the dictionary from a __getstate__ call
"""
self._api = state['api']
self._path_with_token = state['path_token']
self._buffer = state['buffer']
self._buffered = state['buffered']
self._written = state['written']
self._offset = state['offset']
self.closed = state['closed']
self._path = state['path']
self.name = api_utils._unquote_filename(self._path)
def write(self, data):
"""Write some bytes.
Args:
data: data to write. str.
Raises:
TypeError: if data is not of type str.
"""
self._check_open()
if not isinstance(data, str):
raise TypeError('Expected str but got %s.' % type(data))
if not data:
return
self._buffer.append(data)
self._buffered += len(data)
self._offset += len(data)
if self._buffered >= self._flushsize:
self._flush()
def flush(self):
"""Flush as much as possible to GCS.
GCS *requires* that all writes except for the final one align on
256KB boundaries. So the internal buffer may still have < 256KB bytes left
after flush.
"""
self._check_open()
self._flush(finish=False)
def tell(self):
"""Return the total number of bytes passed to write() so far.
(There is no seek() method.)
"""
return self._offset
def close(self):
"""Flush the buffer and finalize the file.
When this returns the new file is available for reading.
"""
if not self.closed:
self.closed = True
self._flush(finish=True)
self._buffer = None
def __enter__(self):
return self
def __exit__(self, atype, value, traceback):
self.close()
return False
def _flush(self, finish=False):
"""Internal API to flush.
Buffer is flushed to GCS only when the total amount of buffered data is at
least self._blocksize, or to flush the final (incomplete) block of
the file with finish=True.
"""
while ((finish and self._buffered >= 0) or
(not finish and self._buffered >= self._blocksize)):
tmp_buffer = []
tmp_buffer_len = 0
excess = 0
while self._buffer:
buf = self._buffer.popleft()
size = len(buf)
self._buffered -= size
tmp_buffer.append(buf)
tmp_buffer_len += size
if tmp_buffer_len >= self._maxrequestsize:
excess = tmp_buffer_len - self._maxrequestsize
break
if not finish and (
tmp_buffer_len % self._blocksize + self._buffered <
self._blocksize):
excess = tmp_buffer_len % self._blocksize
break
if excess:
over = tmp_buffer.pop()
size = len(over)
assert size >= excess
tmp_buffer_len -= size
head, tail = over[:-excess], over[-excess:]
self._buffer.appendleft(tail)
self._buffered += len(tail)
if head:
tmp_buffer.append(head)
tmp_buffer_len += len(head)
data = ''.join(tmp_buffer)
file_len = '*'
if finish and not self._buffered:
file_len = self._written + len(data)
self._send_data(data, self._written, file_len)
self._written += len(data)
if file_len != '*':
break
def _send_data(self, data, start_offset, file_len):
"""Send the block to the storage service.
This is a utility method that does not modify self.
Args:
data: data to send in str.
start_offset: start offset of the data in relation to the file.
file_len: an int if this is the last data to append to the file.
Otherwise '*'.
"""
headers = {}
end_offset = start_offset + len(data) - 1
if data:
headers['content-range'] = ('bytes %d-%d/%s' %
(start_offset, end_offset, file_len))
else:
headers['content-range'] = ('bytes */%s' % file_len)
status, response_headers, content = self._api.put_object(
self._path_with_token, payload=data, headers=headers)
if file_len == '*':
expected = 308
else:
expected = 200
errors.check_status(status, [expected], self._path, headers,
response_headers, content,
{'upload_path': self._path_with_token})
def _get_offset_from_gcs(self):
"""Get the last offset that has been written to GCS.
This is a utility method that does not modify self.
Returns:
an int of the last offset written to GCS by this upload, inclusive.
-1 means nothing has been written.
"""
headers = {'content-range': 'bytes */*'}
status, response_headers, content = self._api.put_object(
self._path_with_token, headers=headers)
errors.check_status(status, [308], self._path, headers,
response_headers, content,
{'upload_path': self._path_with_token})
val = response_headers.get('range')
if val is None:
return -1
_, offset = val.rsplit('-', 1)
return int(offset)
def _force_close(self, file_length=None):
"""Close this buffer on file_length.
Finalize this upload immediately on file_length.
Contents that are still in memory will not be uploaded.
This is a utility method that does not modify self.
Args:
file_length: file length. Must match what has been uploaded. If None,
it will be queried from GCS.
"""
if file_length is None:
file_length = self._get_offset_from_gcs() + 1
self._send_data('', 0, file_length)
def _check_open(self):
if self.closed:
raise IOError('Buffer is closed.')
def seekable(self):
return False
def readable(self):
return False
def writable(self):
return True

Просмотреть файл

@ -0,0 +1,25 @@
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Utils for testing."""
class MockUrlFetchResult(object):
def __init__(self, status, headers, body):
self.status_code = status
self.headers = headers
self.content = body
self.content_was_truncated = False
self.final_url = None