implement local and remote processes on node

remove common folder
add some cleanup code
This commit is contained in:
Chi Song 2020-08-06 18:14:41 +08:00
Родитель 9300e145bd
Коммит 3fa24d2ef0
22 изменённых файлов: 274 добавлений и 61 удалений

Просмотреть файл

@ -1,13 +1,19 @@
from lisa import CaseMetadata, SuiteMetadata
from lisa.common.logger import log
from lisa.core.testSuite import TestSuite
from lisa.util.logger import log
@SuiteMetadata(area="demo", category="simple", tags=["demo"])
class SimpleTestSuite(TestSuite):
@CaseMetadata(priority=1)
def hello(self) -> None:
log.info("hello world")
log.info("environment: %s", len(self.environment.nodes))
default_node = self.environment.defaultNode
result = default_node.execute("echo hello world!")
log.info("stdout of node: '%s'", result.stdout)
log.info("stderr of node: '%s'", result.stderr)
log.info("exitCode of node: '%s'", result.exitCode)
log.info("try me on a remote node, same code!")
@CaseMetadata(priority=1)
def bye(self) -> None:

Просмотреть файл

@ -3,7 +3,6 @@ import os
from argparse import Namespace
from typing import Dict, List, Optional, cast
from lisa.common.logger import log
from lisa.core.environmentFactory import EnvironmentFactory
from lisa.core.package import import_module
from lisa.core.platformFactory import PlatformFactory
@ -12,6 +11,7 @@ from lisa.core.testFactory import TestFactory
from lisa.parameter_parser.parser import parse
from lisa.test_runner.lisarunner import LISARunner
from lisa.util import constants
from lisa.util.logger import log
def _load_extends(base_path: str, extends_config: Optional[Dict[str, object]]) -> None:

Просмотреть файл

@ -3,8 +3,8 @@ from __future__ import annotations
from abc import ABCMeta, abstractmethod
from typing import Dict, Optional
from lisa.common.logger import log
from lisa.core.actionStatus import ActionStatus
from lisa.util.logger import log
class Action(metaclass=ABCMeta):

Просмотреть файл

@ -1,8 +1,8 @@
from timeit import default_timer as timer
from typing import Callable, Optional
from lisa.common.logger import log
from lisa.core.testFactory import TestFactory
from lisa.util.logger import log
class CaseMetadata(object):

Просмотреть файл

@ -3,9 +3,9 @@ from __future__ import annotations
import copy
from typing import TYPE_CHECKING, Dict, List, Optional, cast
from lisa.common.logger import log
from lisa.core.nodeFactory import NodeFactory
from lisa.util import constants
from lisa.util.logger import log
if TYPE_CHECKING:
from lisa.core.platform import Platform
@ -128,3 +128,7 @@ class Environment(object):
raise Exception("only one node can set isDefault to True")
has_default = True
return has_default
def cleanup(self) -> None:
for node in self.nodes:
node.cleanup()

Просмотреть файл

@ -1,9 +1,15 @@
from __future__ import annotations
import random
from time import sleep
from timeit import default_timer as timer
from typing import Dict, Optional
from lisa.core.sshConnection import SshConnection
from lisa.util import constants
from lisa.util.excutableResult import ExecutableResult
from lisa.util.logger import log
from lisa.util.process import Process
class Node:
@ -17,8 +23,7 @@ class Node:
self.isDefault = isDefault
self.isRemote = isRemote
self.spec = spec
self.connection = None
self.publicSshSession: Optional[SshConnection] = None
self.connection: Optional[SshConnection] = None
@staticmethod
def createNode(
@ -37,17 +42,55 @@ class Node:
def setConnectionInfo(
self,
address: str = "",
port: int = 22,
publicAddress: str = "",
publicPort: int = 22,
username: str = "root",
password: str = "",
privateKeyFile: str = "",
address: Optional[object],
port: Optional[object],
publicAddress: Optional[object],
publicPort: Optional[object],
username: Optional[object],
password: Optional[object],
privateKeyFile: Optional[object],
) -> None:
self.connection = SshConnection(
address, port, publicAddress, publicPort, username, password, privateKeyFile
)
if self.connection is not None:
raise Exception(
"node is set connection information already, cannot set again"
)
parameters: Dict[str, object] = dict()
if address is not None:
parameters["address"] = address
if port is not None:
parameters["port"] = port
if publicAddress is not None:
parameters["publicAddress"] = publicAddress
if publicPort is not None:
parameters["publicPort"] = publicPort
if username is not None:
parameters["username"] = username
if password is not None:
parameters["password"] = password
if privateKeyFile is not None:
parameters["privateKeyFile"] = privateKeyFile
self.connection = SshConnection(**parameters)
def connect(self) -> None:
pass
def execute(self, cmd: str) -> ExecutableResult:
result: ExecutableResult
cmd_id = random.randint(0, 10000)
start_timer = timer()
log.debug("remote(%s) cmd[%s] %s", self.isRemote, cmd_id, cmd)
if self.isRemote:
# remote
if self.connection is None:
raise Exception("remote node has no connection info")
result = self.connection.execute(cmd)
else:
# local
process = Process()
with process:
process.start(cmd)
result = process.waitResult()
end_timer = timer()
log.info("cmd[%s] executed with %f", cmd_id, end_timer - start_timer)
return result
def cleanup(self) -> None:
if self.connection is not None:
self.connection.cleanup()

Просмотреть файл

@ -1,7 +1,7 @@
from typing import Dict, Optional
from lisa.common.logger import log
from lisa.util import constants
from lisa.util.logger import log
from .node import Node
@ -19,6 +19,16 @@ class NodeFactory:
]:
is_default = NodeFactory._isDefault(config)
node = Node.createNode(node_type=node_type, isDefault=is_default)
if node.isRemote:
node.setConnectionInfo(
config.get(constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_PORT),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_PUBLIC_ADDRESS),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_PUBLIC_PORT),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_USERNAME),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_PASSWORD),
config.get(constants.ENVIRONMENTS_NODES_REMOTE_PRIVATEKEYFILE),
)
if node is not None:
log.debug("created node '%s'", node_type)
return node

Просмотреть файл

@ -3,7 +3,7 @@ import os
import sys
from glob import glob
from lisa.common.logger import log
from lisa.util.logger import log
def import_module(path: str, logDetails: bool = True) -> None:

Просмотреть файл

@ -31,5 +31,6 @@ class Platform(ABC):
return environment
def deleteEnvironment(self, environment: Environment) -> None:
environment.cleanup()
self.deleteEnvironmentInternal(environment)
environment.isReady = False

Просмотреть файл

@ -2,8 +2,8 @@ from typing import Dict, List, Optional, Type, cast
from singleton_decorator import singleton
from lisa.common.logger import log
from lisa.util import constants
from lisa.util.logger import log
from .platform import Platform

Просмотреть файл

@ -1,11 +1,11 @@
from typing import Optional, cast
from lisa.common.logger import log
from lisa.core.environmentFactory import EnvironmentFactory
from lisa.core.platform import Platform
from lisa.parameter_parser.config import Config
from lisa.sut_orchestrator.ready import ReadyPlatform
from lisa.util import constants
from lisa.util.logger import log
class RuntimeObject:

Просмотреть файл

@ -1,4 +1,9 @@
import os
from lisa.util.excutableResult import ExecutableResult
from typing import Optional
import paramiko
from lisa.util.connectionInfo import ConnectionInfo
class SshConnection:
@ -38,24 +43,70 @@ class SshConnection:
elif self.publicPort is None or self.publicPort <= 0:
self.publicPort = self.port
if (self.password is None or self.password == "") and (
self.privateKeyFile is None or self.privateKeyFile == ""
):
raise Exception(
"at least one of password and privateKeyFile need to be set"
)
elif self.password is not None and self.password != "":
self.usePassword = True
self._connectionInfo = ConnectionInfo(
self.address, self.port, self.username, self.password, self.privateKeyFile
)
self._publicConnectionInfo = ConnectionInfo(
self.publicAddress,
self.publicPort,
self.username,
self.password,
self.privateKeyFile,
)
self._connection: Optional[paramiko.SSHClient] = None
self._publicConnection: Optional[paramiko.SSHClient] = None
self._isConnected: bool = False
self._isPublicConnected: bool = False
@property
def connectionInfo(self) -> ConnectionInfo:
return self._connectionInfo
@property
def publicConnectionInfo(self) -> ConnectionInfo:
return self._publicConnectionInfo
def execute(self, cmd: str) -> ExecutableResult:
client = self.connect()
_, stdout_file, stderr_file = client.exec_command(cmd)
exit_code: int = stdout_file.channel.recv_exit_status()
stdout: str = stdout_file.read().decode("utf-8")
stderr: str = stderr_file.read().decode("utf-8")
result = ExecutableResult(stdout, stderr, exit_code)
return result
def connect(self, isPublic: bool = True) -> paramiko.SSHClient:
if isPublic:
connection = self._publicConnection
connectionInfo = self.publicConnectionInfo
else:
if not os.path.exists(self.privateKeyFile):
raise FileNotFoundError(self.privateKeyFile)
self.usePassword = False
connection = self._connection
connectionInfo = self.connectionInfo
if connection is None:
connection = paramiko.SSHClient()
connection.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
connection.connect(
connectionInfo.address,
port=connectionInfo.port,
username=connectionInfo.username,
password=connectionInfo.password,
key_filename=connectionInfo.privateKeyFile,
look_for_keys=False,
)
if isPublic:
self._publicConnection = connection
else:
self._connection = connection
return connection
if self.username is None or self.username == "":
raise Exception("username must be set")
def getInternalConnection(self) -> None:
pass
def getPublicConnection(self) -> None:
pass
def cleanup(self) -> None:
if self._connection is not None:
self._connection.close()
self._connection = None
if self._publicConnection is not None:
self._publicConnection.close()
self._publicConnection = None

Просмотреть файл

@ -2,8 +2,8 @@ from typing import Callable, Dict, List, Optional, Type
from singleton_decorator import singleton
from lisa.common.logger import log
from lisa.core.testSuite import TestSuite
from lisa.util.logger import log
class TestCaseMetadata:

Просмотреть файл

@ -3,8 +3,8 @@ from __future__ import annotations
from abc import ABCMeta
from typing import TYPE_CHECKING, List
from lisa.common.logger import log
from lisa.core.action import Action, ActionStatus
from lisa.util.logger import log
if TYPE_CHECKING:
from .environment import Environment

Просмотреть файл

@ -5,9 +5,9 @@ from logging import DEBUG, INFO
from retry import retry
from lisa.common import env
from lisa.common.logger import init_log, log
from lisa.parameter_parser.argparser import parse_args
from lisa.util import env
from lisa.util.logger import init_log, log
@retry(FileExistsError, tries=10, delay=0) # type: ignore
@ -54,4 +54,4 @@ if __name__ == "__main__":
exitCode = -1
finally:
# force all threads end.
sys.exit(exitCode)
os._exit(exitCode)

Просмотреть файл

@ -3,8 +3,8 @@ from argparse import Namespace
import yaml
from lisa.common.logger import log
from lisa.parameter_parser.config import Config
from lisa.util.logger import log
def parse(args: Namespace) -> Config:

Просмотреть файл

@ -1,6 +1,5 @@
from typing import cast
from lisa.common.logger import log
from lisa.core.action import ActionStatus
from lisa.core.environmentFactory import EnvironmentFactory
from lisa.core.platform import Platform
@ -8,6 +7,7 @@ from lisa.core.testFactory import TestFactory
from lisa.core.testRunner import TestRunner
from lisa.core.testSuite import TestSuite
from lisa.util import constants
from lisa.util.logger import log
class LISARunner(TestRunner):

Просмотреть файл

@ -0,0 +1,35 @@
import os
from typing import Optional
class ConnectionInfo:
def __init__(
self,
address: str = "",
port: int = 22,
username: str = "root",
password: Optional[str] = "",
privateKeyFile: str = "",
) -> None:
self.address = address
self.port = port
self.username = username
self.password = password
self.privateKeyFile = privateKeyFile
if (self.password is None or self.password == "") and (
self.privateKeyFile is None or self.privateKeyFile == ""
):
raise Exception(
"at least one of password and privateKeyFile need to be set"
)
elif self.password is not None and self.password != "":
self.usePassword = True
else:
if not os.path.exists(self.privateKeyFile):
raise FileNotFoundError(self.privateKeyFile)
self.password = None
self.usePassword = False
if self.username is None or self.username == "":
raise Exception("username must be set")

Просмотреть файл

Просмотреть файл

@ -0,0 +1,8 @@
from typing import Optional
class ExecutableResult:
def __init__(self, stdout: str, stderr: str, exitCode: Optional[int]) -> None:
self.stdout = stdout
self.stderr = stderr
self.exitCode = exitCode

Просмотреть файл

Просмотреть файл

@ -2,12 +2,21 @@ import logging
import os
import shlex
import subprocess
import time
from threading import Thread
from typing import Any, Dict, List, Optional, cast
from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, cast
from timeit import default_timer as timer
import psutil
from lisa.common.logger import log
from lisa.util.excutableResult import ExecutableResult
from lisa.util.logger import log
if TYPE_CHECKING:
BaseExceptionType = Type[BaseException]
else:
BaseExceptionType = bool
class LogPipe(Thread):
@ -16,10 +25,13 @@ class LogPipe(Thread):
and start the thread
"""
Thread.__init__(self)
self.output: str = ""
self.daemon = False
self.level = level
self.fdRead, self.fdWrite = os.pipe()
self.pipeReader = os.fdopen(self.fdRead)
self.isReadCompleted = False
self.isClosed = False
self.start()
def fileno(self) -> int:
@ -30,24 +42,41 @@ class LogPipe(Thread):
def run(self) -> None:
"""Run the thread, logging everything.
"""
for line in iter(self.pipeReader.readline, ""):
log.log(self.level, line.strip("\n"))
output = self.pipeReader.read()
self.output = "".join([self.output, output])
for line in output.splitlines(False):
log.log(self.level, line)
self.pipeReader.close()
self.isReadCompleted = True
def close(self) -> None:
"""Close the write end of the pipe.
"""
os.close(self.fdWrite)
if not self.isClosed:
os.close(self.fdWrite)
self.isClosed = True
class Process:
def __init__(self) -> None:
self.process: Optional[subprocess.Popen[Any]] = None
self.exitCode: Optional[int] = None
self.running: bool = False
self.log_pipe: Optional[LogPipe] = None
self._running: bool = False
def __enter__(self) -> None:
pass
def __exit__(
self,
exc_type: Optional[BaseExceptionType],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.cleanup()
def start(
self,
command: str,
@ -72,10 +101,36 @@ class Process:
cwd=cwd,
env=cast(Optional[Dict[str, str]], dictEnv),
)
self.running = True
self._running = True
if self.process is not None:
log.debug("process %s started", self.process.pid)
def waitResult(self, timeout: float = 600) -> ExecutableResult:
budget_time = timeout
# wait for all content read
while self.isRunning() is True and budget_time >= 0:
start = timer()
time.sleep(0.01)
end = timer()
budget_time = budget_time - (end - start)
if budget_time < 0:
if self.process is not None:
log.warn("process %s timeout in %s sec", self.process.pid, timeout)
self.stop()
# cleanup to get pipe complete
self.cleanup()
# wait all content flushed
while (
not self.stdout_pipe.isReadCompleted or not self.stderr_pipe.isReadCompleted
):
time.sleep(0.01)
return ExecutableResult(
self.stdout_pipe.output, self.stderr_pipe.output, self.exitCode
)
def stop(self) -> None:
if self.process is not None:
children = cast(
@ -95,10 +150,10 @@ class Process:
def isRunning(self) -> bool:
self.exitCode = self.getExitCode()
if self.exitCode is not None and self.process is not None:
if self.running is True:
if self._running is True:
log.debug("process %s exited: %s", self.process.pid, self.exitCode)
self.running = False
return self.running
self._running = False
return self._running
def getExitCode(self) -> Optional[int]:
if self.process is not None: