зеркало из https://github.com/mozilla/TUID.git
234 строки
6.3 KiB
Python
234 строки
6.3 KiB
Python
# encoding: utf-8
|
|
#
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
#
|
|
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
|
|
#
|
|
# THIS THREADING MODULE IS PERMEATED BY THE please_stop SIGNAL.
|
|
# THIS SIGNAL IS IMPORTANT FOR PROPER SIGNALLING WHICH ALLOWS
|
|
# FOR FAST AND PREDICTABLE SHUTDOWN AND CLEANUP OF THREADS
|
|
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
|
|
import random
|
|
from weakref import ref
|
|
|
|
from mo_future import allocate_lock as _allocate_lock, text
|
|
from mo_logs import Log
|
|
|
|
DEBUG = False
|
|
DEBUG_SIGNAL = False
|
|
SEED = random.Random()
|
|
|
|
|
|
class Signal(object):
|
|
"""
|
|
SINGLE-USE THREAD SAFE SIGNAL
|
|
|
|
go() - ACTIVATE SIGNAL (DOES NOTHING IF SIGNAL IS ALREADY ACTIVATED)
|
|
wait() - PUT THREAD IN WAIT STATE UNTIL SIGNAL IS ACTIVATED
|
|
then() - METHOD FOR OTHER THREAD TO RUN WHEN ACTIVATING SIGNAL
|
|
"""
|
|
|
|
__slots__ = ["_name", "lock", "_go", "job_queue", "waiting_threads", "__weakref__"]
|
|
|
|
def __init__(self, name=None):
|
|
(DEBUG and name) and Log.note("New signal {{name|quote}}", name=name)
|
|
self._name = name
|
|
self.lock = _allocate_lock()
|
|
self._go = False
|
|
self.job_queue = None
|
|
self.waiting_threads = None
|
|
|
|
def __str__(self):
|
|
return str(self._go)
|
|
|
|
def __bool__(self):
|
|
return self._go
|
|
|
|
def __nonzero__(self):
|
|
return self._go
|
|
|
|
def wait(self):
|
|
"""
|
|
PUT THREAD IN WAIT STATE UNTIL SIGNAL IS ACTIVATED
|
|
"""
|
|
if self._go:
|
|
return True
|
|
|
|
with self.lock:
|
|
if self._go:
|
|
return True
|
|
stopper = _allocate_lock()
|
|
stopper.acquire()
|
|
if not self.waiting_threads:
|
|
self.waiting_threads = [stopper]
|
|
else:
|
|
self.waiting_threads.append(stopper)
|
|
|
|
DEBUG and self._name and Log.note("wait for go {{name|quote}}", name=self.name)
|
|
stopper.acquire()
|
|
DEBUG and self._name and Log.note("GOing! {{name|quote}}", name=self.name)
|
|
return True
|
|
|
|
def go(self):
|
|
"""
|
|
ACTIVATE SIGNAL (DOES NOTHING IF SIGNAL IS ALREADY ACTIVATED)
|
|
"""
|
|
DEBUG and self._name and Log.note("GO! {{name|quote}}", name=self.name)
|
|
|
|
if self._go:
|
|
return
|
|
|
|
with self.lock:
|
|
if self._go:
|
|
return
|
|
self._go = True
|
|
|
|
DEBUG and self._name and Log.note("internal GO! {{name|quote}}", name=self.name)
|
|
jobs, self.job_queue = self.job_queue, None
|
|
threads, self.waiting_threads = self.waiting_threads, None
|
|
|
|
if threads:
|
|
DEBUG and self._name and Log.note("Release {{num}} threads", num=len(threads))
|
|
for t in threads:
|
|
t.release()
|
|
|
|
if jobs:
|
|
for j in jobs:
|
|
try:
|
|
j()
|
|
except Exception as e:
|
|
Log.warning("Trigger on Signal.go() failed!", cause=e)
|
|
|
|
def then(self, target):
|
|
"""
|
|
RUN target WHEN SIGNALED
|
|
"""
|
|
if not target:
|
|
Log.error("expecting target")
|
|
|
|
with self.lock:
|
|
if not self._go:
|
|
DEBUG and self._name and Log.note("Adding target to signal {{name|quote}}", name=self.name)
|
|
|
|
if not self.job_queue:
|
|
self.job_queue = [target]
|
|
else:
|
|
self.job_queue.append(target)
|
|
return
|
|
|
|
(DEBUG_SIGNAL) and Log.note("Signal {{name|quote}} already triggered, running job immediately", name=self.name)
|
|
target()
|
|
|
|
def remove_go(self, target):
|
|
"""
|
|
FOR SAVING MEMORY
|
|
"""
|
|
with self.lock:
|
|
if not self._go:
|
|
try:
|
|
self.job_queue.remove(target)
|
|
except ValueError:
|
|
pass
|
|
|
|
@property
|
|
def name(self):
|
|
if not self._name:
|
|
return "anonymous signal"
|
|
else:
|
|
return self._name
|
|
|
|
def __str__(self):
|
|
return self.name.decode(text)
|
|
|
|
def __repr__(self):
|
|
return text(repr(self._go))
|
|
|
|
def __or__(self, other):
|
|
if other == None:
|
|
return self
|
|
if not isinstance(other, Signal):
|
|
Log.error("Expecting OR with other signal")
|
|
if self or other:
|
|
return DONE
|
|
|
|
output = Signal(self.name + " | " + other.name)
|
|
OrSignal(output, (self, other))
|
|
return output
|
|
|
|
def __ror__(self, other):
|
|
return self.__or__(other)
|
|
|
|
def __and__(self, other):
|
|
if other == None or other:
|
|
return self
|
|
if not isinstance(other, Signal):
|
|
Log.error("Expecting OR with other signal")
|
|
|
|
if DEBUG and self._name:
|
|
output = Signal(self.name + " and " + other.name)
|
|
else:
|
|
output = Signal(self.name + " and " + other.name)
|
|
|
|
gen = AndSignals(output, 2)
|
|
self.then(gen.done)
|
|
other.then(gen.done)
|
|
return output
|
|
|
|
|
|
class AndSignals(object):
|
|
__slots__ = ["signal", "remaining", "locker"]
|
|
|
|
def __init__(self, signal, count):
|
|
"""
|
|
CALL signal.go() WHEN done() IS CALLED count TIMES
|
|
:param signal:
|
|
:param count:
|
|
:return:
|
|
"""
|
|
self.signal = signal
|
|
self.locker = _allocate_lock()
|
|
self.remaining = count
|
|
if not count:
|
|
self.signal.go()
|
|
|
|
def done(self):
|
|
with self.locker:
|
|
self.remaining -= 1
|
|
remaining = self.remaining
|
|
if not remaining:
|
|
self.signal.go()
|
|
|
|
|
|
class OrSignal(object):
|
|
"""
|
|
A SELF-REFERENTIAL CLUSTER OF SIGNALING METHODS TO IMPLEMENT __or__()
|
|
MANAGE SELF-REMOVAL UPON NOT NEEDING THE signal OBJECT ANY LONGER
|
|
"""
|
|
__slots__ = ["signal", "dependencies"]
|
|
|
|
def __init__(self, signal, dependencies):
|
|
self.dependencies = dependencies
|
|
self.signal = ref(signal, self.cleanup)
|
|
for d in dependencies:
|
|
d.then(self)
|
|
signal.then(self.cleanup)
|
|
|
|
def cleanup(self, r=None):
|
|
for d in self.dependencies:
|
|
d.remove_go(self)
|
|
self.dependencies = []
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
s = self.signal()
|
|
if s is not None:
|
|
s.go()
|
|
|
|
|
|
DONE = Signal()
|
|
DONE.go()
|