Merge pull request #92 from soxhub/master

Added support for carrying forward field_selector/selector to WatchQuery
This commit is contained in:
Brian Rosner 2017-03-06 11:10:45 -07:00 коммит произвёл GitHub
Родитель f2cf4514a7 452f6912df
Коммит ba184da372
3 изменённых файлов: 36 добавлений и 7 удалений

Просмотреть файл

@ -74,6 +74,21 @@ Selector query:
field_selector={"status.phase": "Pending"}
)
Watch query:
.. code:: python
watch = pykube.Job.objects(
api,
namespace="gondor-system")
.filter(field_selector={"metadata.name":"my-job"})
.watch()
# watch is a generator:
for watch_event in watch:
print(watch.type) # 'ADDED', 'DELETED', 'MODIFIED'
print(watch.object) # pykube.Job object
Create a ReplicationController:
.. code:: python

Просмотреть файл

@ -69,7 +69,10 @@ class APIObject(object):
else:
operation = kwargs.pop("operation", "")
kw["url"] = op.normpath(op.join(self.endpoint, self.name, operation))
params = kwargs.pop("params", None)
if params is not None:
query_string = urlencode(params)
kw["url"] = "{}{}".format(kw["url"], "?{}".format(query_string) if query_string else "")
if self.base:
kw["base"] = self.base
kw["version"] = self.version
@ -99,6 +102,14 @@ class APIObject(object):
self.api.raise_for_status(r)
self.set_obj(r.json())
def watch(self):
return self.__class__.objects(
self.api,
namespace=self.namespace
).filter(field_selector={
"metadata.name": self.name
}).watch()
def update(self):
self.obj = obj_merge(self.obj, self._original_obj)
r = self.api.patch(**self.api_kwargs(

Просмотреть файл

@ -34,9 +34,12 @@ class BaseQuery(object):
clone.field_selector = field_selector
return clone
def _clone(self):
clone = self.__class__(self.api, self.api_obj_class, namespace=self.namespace)
def _clone(self, cls=None):
if cls is None:
cls = self.__class__
clone = cls(self.api, self.api_obj_class, namespace=self.namespace)
clone.selector = self.selector
clone.field_selector = self.field_selector
return clone
def _build_api_url(self, params=None):
@ -86,12 +89,12 @@ class Query(BaseQuery):
return None
def watch(self, since=None):
kwargs = {"namespace": self.namespace}
query = self._clone(WatchQuery)
if since is now:
kwargs["resource_version"] = self.response["metadata"]["resourceVersion"]
query.resource_version = self.response["metadata"]["resourceVersion"]
elif since is not None:
kwargs["resource_version"] = since
return WatchQuery(self.api, self.api_obj_class, **kwargs)
query.resource_version = since
return query
def execute(self):
kwargs = {"url": self._build_api_url()}