зеркало из https://github.com/github/vitess-gh.git
Refactoring the python vtctl_client to use a generator,
and a helper method to do the logging.
This commit is contained in:
Родитель
72b8c89592
Коммит
0b17e2f681
|
@ -2,9 +2,6 @@
|
|||
# Use of this source code is governed by a BSD-style license that can
|
||||
# be found in the LICENSE file.
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from net import bsonrpc
|
||||
from vtctl import vtctl_client
|
||||
|
||||
|
@ -38,40 +35,17 @@ class GoRpcVtctlClient(vtctl_client.VctlClient):
|
|||
def is_closed(self):
|
||||
return self.client.is_closed()
|
||||
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0,
|
||||
lock_timeout=5.0, info_to_debug=False):
|
||||
"""Executes a remote command on the vtctl server.
|
||||
|
||||
Args:
|
||||
args: Command line to run.
|
||||
action_timeout: total timeout for the action (float, in seconds).
|
||||
lock_timeout: timeout for locking topology (float, in seconds).
|
||||
info_to_debug: if set, changes the info messages to debug.
|
||||
|
||||
Returns:
|
||||
The console output of the action.
|
||||
"""
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0, lock_timeout=5.0):
|
||||
req = {
|
||||
'Args': args,
|
||||
'ActionTimeout': long(action_timeout * 1000000000),
|
||||
'LockTimeout': long(lock_timeout * 1000000000),
|
||||
}
|
||||
self.client.stream_call('VtctlServer.ExecuteVtctlCommand', req)
|
||||
console_result = ''
|
||||
while True:
|
||||
e = self.client.stream_next()
|
||||
if e is None:
|
||||
break
|
||||
if e.reply['Level'] == 0:
|
||||
if info_to_debug:
|
||||
logging.debug('%s', e.reply['Value'])
|
||||
else:
|
||||
logging.info('%s', e.reply['Value'])
|
||||
elif e.reply['Level'] == 1:
|
||||
logging.warning('%s', e.reply['Value'])
|
||||
elif e.reply['Level'] == 2:
|
||||
logging.error('%s', e.reply['Value'])
|
||||
elif e.reply['Level'] == 3:
|
||||
console_result += e.reply['Value']
|
||||
|
||||
return console_result
|
||||
yield vtctl_client.Event(e.reply['Time'], e.reply['Level'],
|
||||
e.reply['File'], e.reply['Line'],
|
||||
e.reply['Value'])
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
# It is untested and doesn't work just yet: ExecuteVtctlCommand
|
||||
# just seems to time out.
|
||||
|
||||
import logging
|
||||
import datetime
|
||||
from urlparse import urlparse
|
||||
|
||||
import vtctl_client
|
||||
|
@ -14,66 +14,44 @@ import vtctldata_pb2
|
|||
import vtctlservice_pb2
|
||||
|
||||
class GRPCVtctlClient(vtctl_client.VctlClient):
|
||||
"""GoRpcVtctlClient is the gRPC implementation of VctlClient.
|
||||
It is registered as 'grpc' protocol.
|
||||
"""
|
||||
"""GoRpcVtctlClient is the gRPC implementation of VctlClient.
|
||||
It is registered as 'grpc' protocol.
|
||||
"""
|
||||
|
||||
def __init__(self, addr, timeout, user=None, password=None, encrypted=False,
|
||||
keyfile=None, certfile=None):
|
||||
self.addr = addr
|
||||
self.timeout = timeout
|
||||
self.stub = None
|
||||
def __init__(self, addr, timeout, user=None, password=None, encrypted=False,
|
||||
keyfile=None, certfile=None):
|
||||
self.addr = addr
|
||||
self.timeout = timeout
|
||||
self.stub = None
|
||||
|
||||
def __str__(self):
|
||||
return '<VtctlClient %s>' % self.addr
|
||||
def __str__(self):
|
||||
return '<VtctlClient %s>' % self.addr
|
||||
|
||||
def dial(self):
|
||||
if self.stub:
|
||||
self.stub.close()
|
||||
def dial(self):
|
||||
if self.stub:
|
||||
self.stub.close()
|
||||
|
||||
p = urlparse("http://" + self.addr)
|
||||
self.stub = vtctlservice_pb2.early_adopter_create_Vtctl_stub(p.hostname,
|
||||
p.port)
|
||||
p = urlparse("http://" + self.addr)
|
||||
self.stub = vtctlservice_pb2.early_adopter_create_Vtctl_stub(p.hostname,
|
||||
p.port)
|
||||
|
||||
def close(self):
|
||||
self.stub.close()
|
||||
self.stub = None
|
||||
def close(self):
|
||||
self.stub.close()
|
||||
self.stub = None
|
||||
|
||||
def is_closed(self):
|
||||
return self.stub == None
|
||||
def is_closed(self):
|
||||
return self.stub == None
|
||||
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0,
|
||||
lock_timeout=5.0, info_to_debug=False):
|
||||
"""Executes a remote command on the vtctl server.
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0, lock_timeout=5.0):
|
||||
req = vtctldata_pb2.ExecuteVtctlCommandRequest(
|
||||
args=args,
|
||||
action_timeout=long(action_timeout * 1000000000),
|
||||
lock_timeout=long(lock_timeout * 1000000000))
|
||||
with self.stub as stub:
|
||||
for response in stub.ExecuteVtctlCommand(req, action_timeout):
|
||||
t = datetime.datetime.utcfromtimestamp(response.event.time.seconds)
|
||||
yield vtctl_client.Event(t, response.event.level, response.event.file,
|
||||
response.event.line, response.event.value)
|
||||
|
||||
Args:
|
||||
args: Command line to run.
|
||||
action_timeout: total timeout for the action (float, in seconds).
|
||||
lock_timeout: timeout for locking topology (float, in seconds).
|
||||
info_to_debug: if set, changes the info messages to debug.
|
||||
|
||||
Returns:
|
||||
The console output of the action.
|
||||
"""
|
||||
req = vtctldata_pb2.ExecuteVtctlCommandRequest(
|
||||
args=args,
|
||||
action_timeout=long(action_timeout * 1000000000),
|
||||
lock_timeout=long(lock_timeout * 1000000000))
|
||||
console_result = ''
|
||||
with self.stub as stub:
|
||||
for response in stub.ExecuteVtctlCommand(req, action_timeout):
|
||||
if response.event.level == 0:
|
||||
if info_to_debug:
|
||||
logging.debug('%s', response.event.value)
|
||||
else:
|
||||
logging.info('%s', response.event.value)
|
||||
elif response.event.level == 1:
|
||||
logging.warning('%s', response.event.value)
|
||||
elif response.event.level == 2:
|
||||
logging.error('%s', response.event.value)
|
||||
elif response.event.level == 3:
|
||||
console_result += response.event.value
|
||||
|
||||
return console_result
|
||||
|
||||
vtctl_client.register_conn_class("grpc", GRPCVtctlClient)
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
# Use of this source code is governed by a BSD-style license that can
|
||||
# be found in the LICENSE file.
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
# mapping from protocol to python class. The protocol matches the string
|
||||
# used by vtctlclient as a -vtctl_client_protocol parameter.
|
||||
|
@ -35,6 +37,24 @@ def connect(protocol, *pargs, **kargs):
|
|||
return conn
|
||||
|
||||
|
||||
class Event(object):
|
||||
"""Event is streamed by VctlClient.
|
||||
Eventually, we will just use the proto3 definition for logutil.proto/Event.
|
||||
"""
|
||||
|
||||
INFO = 0
|
||||
WARNING = 1
|
||||
ERROR = 2
|
||||
CONSOLE = 3
|
||||
|
||||
def __init__(self, time, level, file, line, value):
|
||||
self.time = time
|
||||
self.level = level
|
||||
self.file = file
|
||||
self.line = line
|
||||
self.value = value
|
||||
|
||||
|
||||
class VctlClient(object):
|
||||
"""VctlClient is the interface for the vtctl client implementations.
|
||||
All implementations must implement all these methods.
|
||||
|
@ -68,17 +88,49 @@ class VctlClient(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0,
|
||||
lock_timeout=5.0, info_to_debug=False):
|
||||
def execute_vtctl_command(self, args, action_timeout=30.0, lock_timeout=5.0):
|
||||
"""Executes a remote command on the vtctl server.
|
||||
|
||||
Args:
|
||||
args: Command line to run.
|
||||
action_timeout: total timeout for the action (float, in seconds).
|
||||
lock_timeout: timeout for locking topology (float, in seconds).
|
||||
info_to_debug: if set, changes the info messages to debug.
|
||||
|
||||
Returns:
|
||||
The console output of the action.
|
||||
This is a generator method that yeilds Event objects.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def execute_vtctl_command(client, args, action_timeout=30.0,
|
||||
lock_timeout=5.0, info_to_debug=False):
|
||||
"""This is a helper method that executes a remote vtctl command, logs
|
||||
the output to the logging module, and returns the console output.
|
||||
|
||||
Args:
|
||||
client: VtctlClient object to use.
|
||||
args: Command line to run.
|
||||
action_timeout: total timeout for the action (float, in seconds).
|
||||
lock_timeout: timeout for locking topology (float, in seconds).
|
||||
info_to_debug: if set, changes the info messages to debug.
|
||||
|
||||
Returns:
|
||||
The console output of the action.
|
||||
"""
|
||||
|
||||
console_result = ''
|
||||
for e in client.execute_vtctl_command(args, action_timeout=action_timeout,
|
||||
lock_timeout=lock_timeout):
|
||||
if e.level == Event.INFO:
|
||||
if info_to_debug:
|
||||
logging.debug('%s', e.value)
|
||||
else:
|
||||
logging.info('%s', e.value)
|
||||
elif e.level == Event.WARNING:
|
||||
logging.warning('%s', e.value)
|
||||
elif e.level == Event.ERROR:
|
||||
logging.error('%s', e.value)
|
||||
elif e.level == Event.CONSOLE:
|
||||
console_result += e.value
|
||||
|
||||
return console_result
|
||||
|
|
|
@ -593,7 +593,7 @@ def run_vtctl(clargs, auto_log=False, expect_fail=False,
|
|||
return result, ""
|
||||
elif mode == VTCTL_RPC:
|
||||
logging.debug("vtctl: %s", " ".join(clargs))
|
||||
result = vtctld_connection.execute_vtctl_command(clargs, info_to_debug=True, action_timeout=120)
|
||||
result = vtctl_client.execute_vtctl_command(vtctld_connection, clargs, info_to_debug=True, action_timeout=120)
|
||||
return result, ""
|
||||
|
||||
raise Exception('Unknown mode: %s', mode)
|
||||
|
|
Загрузка…
Ссылка в новой задаче