зеркало из https://github.com/mozilla/TUID.git
135 строки
4.6 KiB
Python
135 строки
4.6 KiB
Python
# encoding: utf-8
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
#
|
|
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
|
|
#
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
|
|
import os
|
|
import platform
|
|
|
|
from mo_dots import set_default, wrap
|
|
from mo_json import json2value, value2json
|
|
from mo_logs import Except, Log
|
|
|
|
from mo_threads import Lock, Process, Signal, THREAD_STOP, Thread, DONE
|
|
|
|
PYTHON = "python"
|
|
DEBUG = True
|
|
|
|
|
|
class Python(object):
|
|
|
|
def __init__(self, name, config):
|
|
config = wrap(config)
|
|
if config.debug.logs:
|
|
Log.error("not allowed to configure logging on other process")
|
|
|
|
Log.note("begin process")
|
|
# WINDOWS REQUIRED shell, WHILE LINUX NOT
|
|
shell = "windows" in platform.system().lower()
|
|
self.process = Process(
|
|
name,
|
|
[PYTHON, "-u", "mo_threads" + os.sep + "python_worker.py"],
|
|
debug=False,
|
|
cwd=os.getcwd(),
|
|
shell=shell
|
|
)
|
|
self.process.stdin.add(value2json(set_default({}, config, {"debug": {"trace": True}})))
|
|
status = self.process.stdout.pop()
|
|
if status != '{"out":"ok"}':
|
|
Log.error("could not start python\n{{error|indent}}", error=self.process.stderr.pop_all()+[status]+self.process.stdin.pop_all())
|
|
self.lock = Lock("wait for response from "+name)
|
|
self.current_task = DONE
|
|
self.current_response = None
|
|
self.current_error = None
|
|
|
|
self.daemon = Thread.run("", self._daemon)
|
|
self.errors = Thread.run("", self._stderr)
|
|
|
|
def _execute(self, command):
|
|
with self.lock:
|
|
self.current_task.wait()
|
|
self.current_task = Signal()
|
|
self.current_response = None
|
|
self.current_error = None
|
|
|
|
if self.process.service_stopped:
|
|
Log.error("python is not running")
|
|
self.process.stdin.add(value2json(command))
|
|
(self.current_task | self.process.service_stopped).wait()
|
|
|
|
try:
|
|
if self.current_error:
|
|
Log.error("problem with process call", cause=Except.new_instance(self.current_error))
|
|
else:
|
|
return self.current_response
|
|
finally:
|
|
self.current_task = DONE
|
|
self.current_response = None
|
|
self.current_error = None
|
|
|
|
def _daemon(self, please_stop):
|
|
while not please_stop:
|
|
line = self.process.stdout.pop(till=please_stop)
|
|
if line == THREAD_STOP:
|
|
break
|
|
try:
|
|
data = json2value(line)
|
|
if "log" in data:
|
|
Log.main_log.write(*data.log)
|
|
elif "out" in data:
|
|
self.current_response = data.out
|
|
self.current_task.go()
|
|
elif "err" in data:
|
|
self.current_error = data.err
|
|
self.current_task.go()
|
|
except Exception as e:
|
|
Log.note("non-json line: {{line}}", line=line)
|
|
DEBUG and Log.note("stdout reader is done")
|
|
|
|
def _stderr(self, please_stop):
|
|
while not please_stop:
|
|
try:
|
|
line = self.process.stderr.pop(till=please_stop)
|
|
if line == THREAD_STOP:
|
|
please_stop.go()
|
|
break
|
|
Log.note("Error line from {{name}}({{pid}}): {{line}}", line=line, name=self.process.name, pid=self.process.pid)
|
|
except Exception as e:
|
|
Log.error("could not process line", cause=e)
|
|
|
|
def import_module(self, module_name, var_names=None):
|
|
if var_names is None:
|
|
self._execute({"import": module_name})
|
|
else:
|
|
self._execute({"import": {"from": module_name, "vars": var_names}})
|
|
|
|
def set(self, var_name, value):
|
|
self._execute({"set": {var_name, value}})
|
|
|
|
def get(self, var_name):
|
|
return self._execute({"get": var_name})
|
|
|
|
def execute_script(self, script):
|
|
return self._execute({"exec": script})
|
|
|
|
def __getattr__(self, item):
|
|
def output(*args, **kwargs):
|
|
if len(args):
|
|
if kwargs.keys():
|
|
Log.error("Not allowed to use both args and kwargs")
|
|
return self._execute({item: args})
|
|
else:
|
|
return self._execute({item: kwargs})
|
|
return output
|
|
|
|
def stop(self):
|
|
self._execute({"stop": {}})
|
|
self.process.join()
|
|
self.daemon.stop()
|
|
self.errors.stop()
|