import asyncio
import concurrent.futures
import copy
import datetime
import glob
import importlib
import os
import re
import threading
import time
import types
from collections import OrderedDict
from functools import partial, wraps
from pprint import pformat
try:
import aiocron
except ImportError:
# Suppress import error when we just run CLI
pass
try:
import dill as pickle
import psutil
except ImportError:
# Suppress import error when we just run CLI
pass
import biothings.hub # noqa
from biothings import config
from biothings.utils.common import get_loop, get_random_string, sizeof_fmt, timesofar
from biothings.utils.hub_db import get_src_conn
logger = config.logger
[docs]
def track(func):
# only wraps do_work defined later
# seems to create a pickled dict for process/thread info (pinfo) and
# some other metadata
@wraps(func)
def func_wrapper(*args, **kwargs):
job_id = args[0]
ptype = args[1] # tracking process or thread ?
# we're looking for some "pinfo" value (process info) to later
# reporting. If we can't find any, we'll try our best to figure out
# what this is about...
# func is the do_work wrapper, we want the actual partial
# is first arg a callable (func) or pinfo ?
if callable(args[2]):
innerfunc = args[2]
innerargs = args[3:]
pinfo = None
else:
innerfunc = args[3]
innerargs = args[4:]
# we want to let the original as-is, as it can still contain
# usefull information like predicates
pinfo = copy.deepcopy(args[2])
# predicates can't be pickles/dilled
pinfo.pop("__predicates__", None)
# just informative, so stringify is just ok there)
# make sure we can pickle the whole thing (and it's
innerargs = [str(arg) for arg in innerargs]
if type(innerfunc) == partial:
fname = innerfunc.func.__name__
elif isinstance(innerfunc, types.MethodType):
fname = innerfunc.__self__.__class__.__name__
else:
fname = innerfunc.__name__
firstarg = innerargs and innerargs[0] or ""
if not pinfo:
pinfo = {
"category": None,
"source": None,
"step": None,
"description": "%s %s" % (fname, firstarg),
}
pinfo["started_at"] = time.time()
worker = {"func_name": fname, "args": innerargs, "kwargs": kwargs, "job": pinfo}
results = None
exc = None
trace = None
pidfile = None
try:
_id = None
if ptype == "thread":
_id = "%s" % threading.current_thread().getName()
else:
_id = os.getpid()
# add random chars: 2 jobs handled by the same slot (pid or thread)
# would override filename otherwise
fn = "%s_%s" % (_id, job_id)
# despite saying "job" "id" this is pid/thread name
worker["job"]["id"] = _id
pidfile = os.path.join(config.RUN_DIR, "%s.pickle" % fn)
pickle.dump(worker, open(pidfile, "wb"))
results = func(*args, **kwargs)
except Exception as e:
import traceback
trace = traceback.format_exc()
logger.error("err %s\n%s", e, trace)
# we want to store exception so for now, just make a reference
exc = e
finally:
if pidfile and os.path.exists(pidfile):
logger.debug("Remove PID file '%s'", pidfile)
os.unlink(pidfile)
# now raise original exception
if exc:
raise exc
return results
return func_wrapper
[docs]
@track
def do_work(job_id, ptype, pinfo=None, func=None, *args, **kwargs):
# purpose: to be wrapped by @track
# only used in defer_to_process / defer_to_thread in JobManager
# pinfo is optional, and func is not. and args and kwargs must
# be after func. just to say func is mandatory, despite what the
# signature says
assert func
# need to wrap calls otherwise multiprocessing could have
# issue pickling directly the passed func because of some import
# issues ("can't pickle ... object is not the same as ...")
return func(*args, **kwargs)
[docs]
def find_process(pid):
# It seems that it's only used once to find the hub process
# I wonder why not just try to use psutil.Process(pid)
g = psutil.process_iter()
for p in g:
if p.pid == pid:
break
# apparently a non-existent pid could trigger a NameError
return p
[docs]
def norm(value, maxlen):
"""just a helper to clean/prepare job's values printing"""
if len(value) > maxlen:
value = "...%s" % value[-maxlen + 3 :]
return value
[docs]
class UnknownResource(Exception):
pass
[docs]
class ResourceError(Exception):
pass
[docs]
class ManagerError(Exception):
pass
[docs]
class ResourceNotFound(Exception):
pass
[docs]
class BaseManager:
def __init__(self, job_manager, poll_schedule=None):
self.register = {}
self.poll_schedule = poll_schedule
self.job_manager = job_manager
self.clean_stale_status()
[docs]
def clean_stale_status(self):
"""
During startup, search for action in progress which would have
been interrupted and change the state to "canceled".
Ex: some donwloading processes could have been interrupted, at
startup, "downloading" status should be changed to "canceled" so
to reflect actual state on these datasources.
This must be overriden in subclass.
"""
def __repr__(self):
registered = sorted(list(self.register.keys()))
return "<%s [%d registered]: %s>" % (
self.__class__.__name__,
len(self.register),
registered,
)
def __getitem__(self, src_name):
try:
# as a main-source
return self.register[src_name]
except KeyError:
try:
# as a sub-source
main, sub = src_name.split(".")
srcs = self.register[main]
# there can be many uploader for one resource (when each is dealing
# with one specific file but upload to the same collection for instance)
# so we want to make sure user is aware of this and not just return one
# uploader when many are needed
# on the other hand, if only one avail, just return it
res = [src for src in srcs if src.name == sub]
if not res:
raise KeyError(src_name)
return res
except (ValueError, AttributeError, KeyError):
# nope, can't find it...
raise KeyError(src_name)
[docs]
def poll(self, state, func, col):
"""
Search for source in collection 'col' with a pending flag list
containing 'state' and and call 'func' for each document found
(with doc as only param)
"""
if not self.poll_schedule:
raise ManagerError("poll_schedule is not defined")
async def check_pending(state):
sources = [src for src in col.find({"pending": state}) if isinstance(src["_id"], str)]
if sources:
logger.info(
"Found %d resources with pending flag %s (%s)",
len(sources),
state,
repr([src["_id"] for src in sources]),
)
for src in sources:
logger.info("Run %s for pending flag %s on source '%s'", func, state, src["_id"])
try:
# first reset flag to make sure we won't call func multiple time
col.update({"_id": src["_id"]}, {"$pull": {"pending": state}})
func(src)
except ResourceNotFound:
logger.error(
"Resource '%s' has a pending flag set to %s but is not registered in manager",
src["_id"],
state,
)
return aiocron.crontab(
self.poll_schedule,
func=partial(check_pending, state),
start=True,
loop=self.job_manager.loop,
)
[docs]
class BaseStatusRegisterer:
[docs]
def load_doc(self, key_name, stage):
"""
Find document using key_name and stage, stage being a
key within the document matching a specific process name:
Ex: {"_id":"123","snapshot":"abc"}
load_doc("abc","snapshot")
will return the document. Note key_name is first used to
find the doc by its _id.
Ex: with another doc {"_id" : "abc", "snapshot" : "somethingelse"}
load_doc{"abc","snapshot")
will return doc with _id="abc", not "123"
"""
doc = self.collection.find_one({"_id": key_name})
if not doc:
doc = []
bdocs = self.collection.find()
for adoc in bdocs:
if key_name in adoc.get(stage, {}):
doc.append(adoc)
if len(doc) == 1:
# we'll just return the single doc
# otherwise it's up to the caller to do something with that
doc = doc.pop()
assert doc, "No document could be found"
return doc
@property
def collection(self):
"""
Return collection object used to fetch doc in which we store status
"""
raise NotImplementedError("implement me in sub-class")
[docs]
def register_status(self, doc, stage, status, transient=False, init=False, **extra):
assert self.collection, "No collection set"
# stage: "snapshot", "publish", etc... depending on the what's being done
job_info = {
"status": status,
"step_started_at": datetime.datetime.now().astimezone(),
"logfile": self.logfile,
}
stage_info = {}
stage_key = None
# register status can be about different stages:
stage_info.setdefault(stage, {}).update(extra[stage])
stage_key = list(extra[stage].keys())
assert len(stage_key) == 1, stage_key
stage_key = stage_key.pop()
if transient:
# record some "in-progress" information
job_info["pid"] = os.getpid()
else:
# only register time when it's a final state
job_info["time"] = timesofar(self.ti)
t1 = round(time.time() - self.ti, 0)
job_info["time_in_s"] = t1
stage_info.setdefault(stage, {}).setdefault(stage_key, {}).update(
{"created_at": datetime.datetime.now().astimezone()}
)
if "job" in extra:
job_info.update(extra["job"])
# since the base is the merged collection, we register info there
if init:
# init timer for this step
self.ti = time.time()
self.collection.update({"_id": doc["_id"]}, {"$push": {"jobs": job_info}})
# now refresh/sync
doc = self.collection.find_one({"_id": doc["_id"]})
else:
# merge extra at root level
doc["jobs"] and doc["jobs"].append(job_info)
def merge_index_info(target, d):
if not isinstance(target, dict):
# previous value wasn't a dict, just replace
target = d
elif "__REPLACE__" in d.keys():
d.pop("__REPLACE__")
target = d
else:
if status == "success":
# remove 'err' key to avoid merging success results with errors
target.pop("err", None)
for k, v in d.items():
if isinstance(v, dict):
if k in target:
target[k] = merge_index_info(target[k], v)
else:
v.pop("__REPLACE__", None)
# merge v with "nothing" just to make sure to remove any "__REPLACE__"
v = merge_index_info({}, v)
target[k] = v
else:
target[k] = v
return target
doc = merge_index_info(doc, stage_info)
self.collection.replace_one({"_id": doc["_id"]}, doc)
[docs]
class BaseSourceManager(BaseManager):
"""
Base class to provide source management: discovery, registration
Actual launch of tasks must be defined in subclasses
"""
# define the class manager will look for. Set in a subclass
SOURCE_CLASS = None
def __init__(self, job_manager, datasource_path="dataload.sources", *args, **kwargs):
super(BaseSourceManager, self).__init__(job_manager, *args, **kwargs)
self.conn = get_src_conn()
self.default_src_path = datasource_path
self.poll_schedule = None
[docs]
def filter_class(self, klass):
"""
Gives opportunity for subclass to check given class and decide to
keep it or not in the discovery process. Returning None means "skip it".
"""
# keep it by default
return klass
[docs]
def register_classes(self, klasses):
"""
Register each class in self.register dict. Key will be used
to retrieve the source class, create an instance and run method from it.
It must be implemented in subclass as each manager may need to access
its sources differently,based on different keys.
"""
raise NotImplementedError("implement me in sub-class")
[docs]
def find_classes(self, src_module, fail_on_notfound=True):
"""
Given a python module, return a list of classes in this module, matching
SOURCE_CLASS (must inherit from)
"""
# try to find a uploader class in the module
klasses = []
for attr in dir(src_module):
something = getattr(src_module, attr)
# not interested in classes coming from biothings.hub.*, these would typically come
# from "from biothings.hub.... import aclass" statements and would be incorrectly registered
# we only look for classes defined straight from the actual module
if (
type(something) == type
and issubclass(something, self.__class__.SOURCE_CLASS)
and not something.__module__.startswith("biothings.hub")
):
klass = something
if not self.filter_class(klass):
continue
logger.debug("Found a class based on %s: '%s'", self.__class__.SOURCE_CLASS.__name__, klass)
klasses.append(klass)
if not klasses:
if fail_on_notfound:
raise UnknownResource(
"Can't find a class based on %s in module '%s'" % (self.__class__.SOURCE_CLASS.__name__, src_module)
)
return klasses
[docs]
def register_source(self, src, fail_on_notfound=True):
"""Register a new data source. src can be a module where some classes
are defined. It can also be a module path as a string, or just a source name
in which case it will try to find information from default path.
"""
if isinstance(src, str):
try:
src_m = importlib.import_module(src)
src_m = importlib.reload(src_m)
except ImportError as ex:
logger.warning(ex)
try:
src_m = importlib.import_module("%s.%s" % (self.default_src_path, src))
except ImportError as e:
msg = "Can't find module '%s', even in '%s'" % (src, self.default_src_path)
logger.error(msg)
logger.warning(e)
raise UnknownResource(msg)
elif isinstance(src, dict):
# source is comprised of several other sub sources
assert len(src) == 1, "Should have only one element in source dict '%s'" % src
_, sub_srcs = list(src.items())[0]
for src in sub_srcs:
self.register_source(src, fail_on_notfound)
return
else:
src_m = src
# first try to find classes defined in __init__.py (in package) explicitely
klasses = self.find_classes(src_m, fail_on_notfound)
# then if none found, try to search within the package's modules
if not klasses:
try:
src_m_path = src_m.__path__[0]
for d in os.listdir(src_m_path):
if d.endswith("__pycache__"):
continue
modpath = os.path.join(src_m.__name__, d).replace(".py", "").replace(os.path.sep, ".")
try:
m = importlib.import_module(modpath)
klasses.extend(self.find_classes(m, fail_on_notfound))
except Exception as e:
# (SyntaxError, ImportError) is not sufficient to catch
# all possible failures, for example a ValueError
# in module definition..
logger.debug("Couldn't import %s: %s", modpath, e)
continue
except TypeError as e:
logger.warning("Can't register source '%s', something's wrong with path: %s", src_m, e)
logger.debug("Found classes to register: %s", repr(klasses))
self.register_classes(klasses)
[docs]
def register_sources(self, sources):
assert not isinstance(sources, str), "sources argument is a string, should pass a list"
self.register.clear()
for src in sources:
try:
# batch registration, we'll silently ignore not-found sources
self.register_source(src, fail_on_notfound=False)
except (UnknownResource, ResourceError) as e:
logger.info("Can't register source '%s', skip it; %s", src, e)
import traceback
logger.error(traceback.format_exc())
[docs]
class JobManager:
# TODO: Add class docstring
COLUMNS = [
"pid",
"source",
"category",
"step",
"description",
"mem",
"cpu",
"started_at",
"duration",
]
HEADER = dict(zip(COLUMNS, [c.upper() for c in COLUMNS])) # upper() for column titles
HEADERLINE = "{pid:^10}|{source:^35}|{category:^10}|{step:^20}|{description:^30}|{mem:^10}|{cpu:^6}|{started_at:^20}|{duration:^10}"
DATALINE = HEADERLINE.replace("^", "<")
def __init__(
self,
loop,
process_queue=None,
thread_queue=None,
max_memory_usage=None,
num_workers=None,
num_threads=None,
auto_recycle=True,
):
if not os.path.exists(config.RUN_DIR):
logger.info("Creating RUN_DIR directory '%s'", config.RUN_DIR)
os.makedirs(config.RUN_DIR)
self.loop = loop # usu. it's the asyncio event loop
self.num_workers = num_workers
if self.num_workers == 0:
logger.debug("Adjusting number of worker to 1")
self.num_workers = 1
self.num_threads = num_threads or self.num_workers
self.process_queue = process_queue or concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers)
# notes on fixing BPE (BrokenProcessPool Exception):
# whenever a process exits unexpectedly, BPE is raised, and while that
# all the processes in the pool gets a SIGTERM from the management
# thread (see _queue_management_worker in concurrent.futures.process)
# TODO: limit the number of threads (as argument) ?
self.thread_queue = thread_queue or concurrent.futures.ThreadPoolExecutor(max_workers=self.num_threads)
# In Py38 using an executor that is not a ThreadPoolExecutor is
# deprecated. And it seems in Py39 , it must be a ThreadPoolExecutor,
# using a ProcessPoolExecutor will trigger an error.
# However, loop.run_in_executor still accepts ProcessPoolExecutor
# see https://bugs.python.org/issue34075
self.loop.set_default_executor(self.thread_queue)
# this lock is acquired when defer_to_process/thread is invoked
# and released when the inner coroutine is run
# purpose being: "control job submission", as it only creates a new
# "task" when the previous one has completed checking its constraints
self.ok_to_run = asyncio.Semaphore()
# auto-creata RUN_DIR
if not os.path.exists(config.RUN_DIR):
os.makedirs(config.RUN_DIR)
if max_memory_usage == "auto":
# try to find a nice limit...
limited = int(psutil.virtual_memory().available * 0.6)
logger.info("Auto-setting memory usage limit to %s", sizeof_fmt(limited))
max_memory_usage = limited
elif max_memory_usage:
logger.info("Setting memory usage to %s", sizeof_fmt(max_memory_usage))
else:
logger.info("No memory limit set")
self.max_memory_usage = max_memory_usage
self.avail_memory = int(psutil.virtual_memory().available)
self._phub = None
# Process obj. for hub (process which JobManager is in)
self.auto_recycle = auto_recycle # active
self.auto_recycle_setting = auto_recycle # keep setting if we need to restore it its orig value
self.jobs = {} # all active jobs (thread/process)
# _process_job_ids is for storing Job IDs of calls deferred in process
# executor, so that when Executor is recreated, staled Job IDs can
# be removed
# FIXME: drop this when structure of pinfo is clear so we can rely on
# that instead of storing _process_job_ids
self._process_job_ids = set()
self._pchildren = []
self.clean_staled()
[docs]
def stop(self, force=False, recycling=False, wait=1):
async def do():
try:
# shutting down the process queue can take a while
# if some processes are still running (it'll wait until they're done)
# we'll wait in a thread to prevent the hub from being blocked
logger.info("Shutting down current process queue...")
pinfo = {
"__skip_check__": True, # skip sanity check, mem check to make sure
# this worker will be run
"category": "admin",
"source": "maintenance",
"step": "",
"description": "Stopping process queue",
}
# await on coroutine
j = await self.defer_to_thread(pinfo, self.process_queue.shutdown)
# await on the future to be done
await j
if recycling:
# now replace
logger.info("Replacing process queue with new one")
self.process_queue = concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers)
else:
self.process_queue = None
except Exception as e:
logger.error("Error while recycling the process queue: %s", e)
raise
async def kill():
nonlocal wait
if wait < 1:
wait = 1 # wait a little bit so job manager has time to stop if nothing is running
logger.warning("Wait %s seconds before killing queue processes", wait)
await asyncio.sleep(wait)
logger.warning("Can't wait anymore, killing running processed in the queue !")
for proc in self.pchildren:
logger.warning("Killing %s", proc)
proc.kill()
def done(f):
f.result() # consume future's result to potentially raise exception
fut = asyncio.ensure_future(do())
fut.add_done_callback(done)
if force:
# futkill = asyncio.ensure_future(kill())
asyncio.ensure_future(kill())
return fut
[docs]
def clean_staled(self):
# clean old/staled files
children_pids = [p.pid for p in self.pchildren]
active_tids = [t.getName() for t in self.thread_queue._threads]
pid_pat = re.compile(r".*/(\d+)_.*\.pickle") # see track() for filename format
for fn in glob.glob(os.path.join(config.RUN_DIR, "*.pickle")):
pid = pid_pat.findall(fn)
if not pid:
continue
try:
pid = int(pid[0].split("_")[0])
except IndexError:
logger.warning("Invalid PID file '%s', skip it", fn)
raise
if pid not in children_pids:
logger.info("Removing staled pid file '%s'", fn)
os.unlink(fn)
tid_pat = re.compile(r".*/(Thread\w*-\d+)_.*\.pickle")
for fn in glob.glob(os.path.join(config.RUN_DIR, "*.pickle")):
try:
tid = tid_pat.findall(fn)[0].split("_")[0]
except IndexError:
logger.warning("Invalid TID file '%s', skip it", fn)
raise
if not tid:
continue
if tid not in active_tids:
logger.info("Removing staled thread file '%s'", fn)
os.unlink(fn)
[docs]
def recycle_process_queue(self):
"""
Replace current process queue with a new one. When processes
are used over and over again, memory tends to grow as python
interpreter keeps some data (...). Calling this method will
perform a clean shutdown on current queue, waiting for running
processes to terminate, then discard current queue and replace
it a new one.
"""
return self.stop(recycling=True)
[docs]
async def check_constraints(self, pinfo=None):
mem_req = pinfo and pinfo.get("__reqs__", {}).get("mem") or 0
t0 = time.time()
waited = False
sleep_time = 5
if mem_req:
logger.info(
"Job {cat:%s,source:%s,step:%s} requires %s memory, checking if available",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
sizeof_fmt(mem_req),
)
if self.max_memory_usage:
hub_mem = self.hub_memory
while hub_mem >= self.max_memory_usage:
if self.auto_recycle:
pworkers = self.get_pid_files()
tworkers = self.get_thread_files()
if len(pworkers) == 0 and len(tworkers) == 0:
logger.info("No worker running, recycling the process queue...")
fut = self.recycle_process_queue()
def recycled(f):
# res = f.result()
f.result()
# still out of memory ?
avail_mem = self.max_memory_usage - self.hub_memory
if avail_mem <= 0:
logger.error(
"After recycling process queue, "
"memory usage is still too high (needs at least %s more)"
"now turn auto-recycling off to prevent infinite recycling...",
sizeof_fmt(abs(avail_mem)),
)
self.auto_recycle = False
fut.add_done_callback(recycled)
logger.info(
"Hub is using too much memory to launch job {cat:%s,source:%s,step:%s}"
" (%s used, more than max allowed %s), wait a little (job's already been postponed for %s)",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
sizeof_fmt(hub_mem),
sizeof_fmt(self.max_memory_usage),
timesofar(t0),
)
await asyncio.sleep(sleep_time)
waited = True
hub_mem = self.hub_memory
if mem_req:
# max allowed mem is either the limit we gave and the os limit
max_mem = self.max_memory_usage and self.max_memory_usage or self.avail_memory
# TODO: check projected memory (jobs with mem requirements currently running
# as those jobs may not have reached their max mem usage yet)
hub_mem = self.hub_memory
while mem_req >= (max_mem - hub_mem):
logger.info(
"Job {cat:%s,source:%s,step:%s} needs %s to run, not enough to launch it "
"(hub consumes %s while max allowed is %s), wait a little (job's already been postponed for %s)",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
sizeof_fmt(mem_req),
sizeof_fmt(hub_mem),
sizeof_fmt(max_mem),
timesofar(t0),
)
await asyncio.sleep(sleep_time)
waited = True
# refresh limites and usage (manager can be modified from hub
# thus memory usage can be modified on-the-fly
hub_mem = self.hub_memory
max_mem = self.max_memory_usage and self.max_memory_usage or self.avail_memory
pendings = len(self.process_queue._pending_work_items.keys()) - config.HUB_MAX_WORKERS
while pendings >= config.MAX_QUEUED_JOBS:
if not waited:
logger.info(
"Can't run job {cat:%s,source:%s,step:%s} right now, too much pending jobs in the queue (max: %s), will retry until possible",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
config.MAX_QUEUED_JOBS,
)
await asyncio.sleep(sleep_time)
pendings = len(self.process_queue._pending_work_items.keys()) - config.HUB_MAX_WORKERS
waited = True
# finally check custom predicates
predicates = pinfo and pinfo.get("__predicates__", [])
failed_predicate = None
while True:
for predicate in predicates:
if not predicate(self):
failed_predicate = predicate
break # for loop (most inner one)
else:
# reset flag
failed_predicate = None
if failed_predicate:
logger.info(
"Can't run job {cat:%s,source:%s,step:%s} right now, predicate %s failed, will retry until possible",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
failed_predicate,
)
await asyncio.sleep(sleep_time)
waited = True
else:
break # while loop
if waited:
logger.info(
"Job {cat:%s,source:%s,step:%s} now can be launched (total waiting time: %s)",
pinfo.get("category"),
pinfo.get("source"),
pinfo.get("step"),
timesofar(t0),
)
# auto-recycle could have been temporarily disabled until more mem is assigned.
# if we've been able to run the job, it means we had enough mem so restore
# recycling setting (if auto_recycle was False, it's ignored
if self.auto_recycle_setting:
self.auto_recycle = self.auto_recycle_setting
[docs]
async def defer_to_process(self, pinfo=None, func=None, *args, **kwargs):
async def run(future, job_id):
nonlocal pinfo
await self.check_constraints(pinfo)
self.ok_to_run.release()
# pinfo can contain predicates hardly pickleable during run_in_executor
# but we also need not to touch the original one
copy_pinfo = copy.deepcopy(pinfo)
copy_pinfo.pop("__predicates__", None)
self.jobs[job_id] = copy_pinfo
self._process_job_ids.add(job_id)
try:
# test to see if Executor still alive
_ = self.process_queue.submit(int, 1)
except concurrent.futures.process.BrokenProcessPool as e:
# recreate if not
# we don't need to care about the remaining tasks because
# they'd all be SIGTERM'd anyways. But ...
logger.warning("Broken Process Pool: %s, restarting.", e)
self.process_queue = concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers)
for stale_id in self._process_job_ids:
self.jobs.pop(stale_id, None) # in the rare case that
# somehow they de-sync
self._process_job_ids.clear()
res = self.loop.run_in_executor(
self.process_queue,
partial(do_work, job_id, "process", copy_pinfo, func, *args, **kwargs),
)
# do_work will create and clean up the pickle files unless
# the worker process gets killed unexpectedly
# callback to consume executor future to trigger exception
# and remove the job from self.jobs
def ran(f):
try:
# consume future, just to trigger potential exceptions
# r = f.result()
f.result()
finally:
# whatever the result we want to make sure to clean the job registry
# to keep it sync with actual running jobs
# -- actually it can't the job_id is added in
# defer_to_process, but this is inside the try-finally
# block indefer_to_process.run.ran (names are hard, I know)
self.jobs.pop(job_id)
self._process_job_ids.discard(job_id)
res.add_done_callback(ran)
res = await res
# process could generate other parallelized jobs and return a Future/Task
# If so, we want to make sure we get the results from that task
if type(res) == asyncio.Task:
res = await res
future.set_result(res)
# lock is released in run coroutine
await self.ok_to_run.acquire()
f = asyncio.Future()
def runned(innerf, job_id):
# not exactly inner future, if f is the most outside future
# and res is the innermost future, then innerf is in-between
# res is an asyncio future that represents the concurrent.futures.
# Future from the Executor
if innerf.exception():
f.set_exception(innerf.exception())
job_id = get_random_string()
fut = asyncio.ensure_future(run(f, job_id))
fut.add_done_callback(partial(runned, job_id=job_id))
return f
[docs]
async def defer_to_thread(self, pinfo=None, func=None, *args):
skip_check = pinfo.get("__skip_check__", False)
async def run(future, job_id):
if not skip_check:
await self.check_constraints(pinfo)
self.ok_to_run.release()
self.jobs[job_id] = pinfo
res = self.loop.run_in_executor(self.thread_queue, partial(do_work, job_id, "thread", pinfo, func, *args))
def ran(f):
try:
# r = f.result()
f.result()
finally:
# whatever the result we want to make sure to clean the job registry
# to keep it sync with actual running jobs
self.jobs.pop(job_id)
res.add_done_callback(ran)
res = await res
# thread could generate other parallelized jobs and return a Future/Task
# If so, we want to make sure we get the results from that task
if type(res) == asyncio.Task:
res = await res
future.set_result(res)
if not skip_check:
await self.ok_to_run.acquire()
f = asyncio.Future()
def runned(innerf, job_id):
if innerf.exception():
f.set_exception(innerf.exception())
job_id = get_random_string()
fut = asyncio.ensure_future(run(f, job_id))
fut.add_done_callback(partial(runned, job_id=job_id))
return f
[docs]
def submit(self, pfunc, schedule=None):
"""
Helper to submit and run tasks. Tasks will run async'ly.
pfunc is a functools.partial
schedule is a string representing a cron schedule, task will then be scheduled
accordingly.
"""
logger.info("Building task: %s", pfunc)
if schedule:
logger.info("Scheduling task %s: %s", pfunc, schedule)
cron = aiocron.crontab(schedule, func=pfunc, start=True, loop=self.loop)
return cron
else:
ff = asyncio.ensure_future(pfunc())
return ff
[docs]
def schedule(self, crontab, func, *args, **kwargs):
"""
Helper to create a cron job from a callable "func". *argd, and **kwargs
are passed to func. "crontab" follows aicron notation.
"""
# we need to dynamically create a wrapper coroutine with a name
# that makes sense, taken from func, otherwise all scheduled jobs would
# have the same wrapping coroutine name
if isinstance(func, partial):
func_name = func.func.__name__
else:
func_name = func.__name__
strcode = (
"""
async def %s():
func(*args, **kwargs)
"""
% func_name
)
code = compile(strcode, "<string>", "exec")
command_globals = {}
command_locals = {"asyncio": asyncio, "func": func, "args": args, "kwargs": kwargs}
eval(code, command_locals, command_globals)
run_func = command_globals[func_name]
job = self.submit(run_func, schedule=crontab)
return job
@property
def hub_process(self):
if not self._phub:
self._phub = find_process(os.getpid())
return self._phub
@property
def pchildren(self):
if not self._pchildren:
self._pchildren = self.hub_process.children()
return self._pchildren
@property
def hub_memory(self):
total_mem = 0
try:
procs = [self.hub_process] + self.pchildren
for proc in procs:
total_mem += proc.memory_info().rss
except psutil.NoSuchProcess:
# observed multiple time: hub main pid doesn't exist, like it was replace, not sure why,... OS ?
self._phub = None
self._pchildren = None
return total_mem
[docs]
def get_pid_files(self, child=None):
pids = {}
try:
pat = re.compile(r".*/(\d+)_.*\.pickle") # see track() for filename format
children_pids = [p.pid for p in self.pchildren]
for fn in glob.glob(os.path.join(config.RUN_DIR, "*.pickle")):
try:
pid = int(pat.findall(fn)[0].split("_")[0])
if not child or child.pid == pid:
try:
worker = pickle.load(open(fn, "rb"))
except FileNotFoundError:
# it's possible that, as this point, the pickle file
# doesn't exist anymore (process is done and file was unlinked)
# just ignore go to next one
continue
proc = self.pchildren[children_pids.index(pid)]
worker["process"] = {
"mem": proc.memory_info().rss,
"cpu": proc.cpu_percent(),
}
pids[pid] = worker
except IndexError:
# weird though... should have only pid files there...
pass
except Exception:
pass
return pids
[docs]
def get_thread_files(self):
tids = {}
try:
# see track() for filename format
pat = re.compile(r".*/(Thread\w*-\d+)_.*\.pickle")
# threads = self.thread_queue._threads
# active_tids = [t.getName() for t in threads]
for fn in glob.glob(os.path.join(config.RUN_DIR, "*.pickle")):
try:
tid = pat.findall(fn)[0].split("_")[0]
worker = pickle.load(open(fn, "rb"))
worker["process"] = self.hub_process # misleading... it's the hub process
tids[tid] = worker
except IndexError:
# weird though... should have only pid files there...
pass
except Exception:
pass
return tids
[docs]
def print_workers(self, workers):
if workers:
out = []
out.append(self.__class__.HEADERLINE.format(**self.__class__.HEADER))
for pid in workers:
worker = workers[pid]
info = self.extract_worker_info(worker)
tt = datetime.datetime.fromtimestamp(info["started_at"]).timetuple()
info["started_at"] = time.strftime("%Y/%m/%d %H:%M:%S", tt)
try:
out.append(self.__class__.DATALINE.format(**info))
except (TypeError, KeyError) as e:
out.append(e)
out.append(pformat(info))
return "\n".join(out)
else:
return ""
[docs]
def print_pending_info(self, num, info):
assert isinstance(info, dict)
info["cpu"] = ""
info["mem"] = ""
info["pid"] = ""
info["duration"] = ""
info["source"] = norm(info["source"], 35)
info["category"] = norm(info["category"], 10)
info["step"] = norm(info["step"], 20)
info["description"] = norm(info["description"], 30)
info["started_at"] = ""
out = []
try:
out.append(self.__class__.DATALINE.format(**info))
except (TypeError, KeyError) as e:
out.append(e)
out.append(pformat(info))
return out
[docs]
def get_process_summary(self):
running_pids = self.get_pid_files()
res = {}
for child in self.pchildren:
try:
# mem = child.memory_info().rss
child.memory_info().rss
try:
pio = child.io_counters()
except AttributeError:
# workaround for OS w/o this feature
# namely macOS
pio = type(
"",
(),
{
"read_count": -1,
"write_count": -1,
"read_bytes": -1,
"write_bytes": -1,
},
)()
# TODO: cpu as reported here isn't reliable, the only to get something
# consistent to call cpu_percent() with a waiting time argument to integrate
# CPU activity over this time, but this is a blocking call and freeze the hub
# (an async implementation might possible though). Currently, pchildren is list
# set at init time where process object are stored, so subsequent cpu_percent()
# calls should report CPU activity since last call (between /job_manager & top()
# calls), but it constently return CPU > 100% even when no thread running (that
# could have been the explination but it's not).
cpu = child.cpu_percent()
res[child.pid] = {
"memory": {
"size": child.memory_info().rss,
"percent": child.memory_percent(),
},
"cpu": {
# override status() when we have cpu activity to avoid
# having a "sleeping" process that's actually running something
# (prob happening because delay between status and cpu_percent(), like a race condition)
"status": cpu > 0.0 and "running" or child.status(),
"percent": cpu,
},
"io": {
"read_count": pio.read_count,
"write_count": pio.write_count,
"read_bytes": pio.read_bytes,
"write_bytes": pio.write_bytes,
},
}
if child.pid in running_pids:
# something is running on that child process
worker = running_pids[child.pid]
res[child.pid]["job"] = {
"started_at": worker["job"]["started_at"],
"duration": timesofar(worker["job"]["started_at"], 0),
"func_name": worker["func_name"],
"category": worker["job"]["category"],
"description": worker["job"]["description"],
"source": worker["job"]["source"],
"step": worker["job"]["step"],
"id": worker["job"]["id"],
}
except psutil.NoSuchProcess as e:
print("child not found %s %s" % (child, e))
continue
return res
[docs]
def get_thread_summary(self):
running_tids = self.get_thread_files()
tchildren = self.thread_queue._threads
res = {}
for child in tchildren:
res[child.name] = {
"is_alive": child.is_alive(),
"is_daemon": child.isDaemon(),
}
if child.name in running_tids:
# something is running on that child process
worker = running_tids[child.name]
res[child.name]["job"] = {
"started_at": worker["job"]["started_at"],
"duration": timesofar(worker["job"]["started_at"], 0),
"func_name": worker["func_name"],
"category": worker["job"]["category"],
"description": worker["job"]["description"],
"source": worker["job"]["source"],
"step": worker["job"]["step"],
"id": worker["job"]["id"],
}
return res
[docs]
def get_summary(self, child=None):
pworkers = self.get_pid_files(child)
tworkers = self.get_thread_files()
ppendings = self.get_pending_processes()
tpendings = {} # TODO:
return {
"process": {
"running": list(pworkers.keys()),
"pending": list(ppendings.keys()),
"all": self.get_process_summary(),
"max": self.process_queue._max_workers,
},
"thread": {
"running": list(tworkers.keys()),
"pending": list(tpendings.keys()),
"all": self.get_thread_summary(),
"max": self.thread_queue._max_workers,
},
"memory": self.hub_memory,
"available_system_memory": self.avail_memory,
"max_memory_usage": self.max_memory_usage,
"hub_pid": self.hub_process.pid,
}
[docs]
def get_pending_summary(self, getstr=False):
running = len(self.get_pid_files())
return "%d pending job(s)" % (len(self.process_queue._pending_work_items) - running)
[docs]
def get_pending_processes(self):
# pendings are kept in queue while running, until result is there so we need
# to adjust the actual real pending jobs. also, pending job are get() from the
# queue following FIFO order. finally, worker ID is incremental. So...
pendings = sorted(self.process_queue._pending_work_items.items())
running = len(self.get_pid_files())
actual_pendings = dict(pendings[running:])
return actual_pendings
[docs]
def show_pendings(self, running=None):
out = []
out.append(self.get_pending_summary())
actual_pendings = self.get_pending_processes()
if actual_pendings:
out.append(self.__class__.HEADERLINE.format(**self.__class__.HEADER))
for num, pending in actual_pendings.items():
info = self.extract_pending_info(pending)
try:
self.print_pending_info(num, info)
except Exception as e:
out.append(e)
out.append(pformat(pending))
return "\n".join(out)
[docs]
def top(self, action="summary"):
# pending = False
# done = False
# run = False
# pid = None
child = None
# if action:
# try:
# # want to see details for a specific process ?
# pid = int(action)
# child = [p for p in pchildren if p.pid == pid][0]
# except ValueError:
# pass
pworkers = self.get_pid_files(child)
tworkers = self.get_thread_files()
done_jobs = glob.glob(os.path.join(config.RUN_DIR, "done", "*.pickle"))
out = []
if child:
return pworkers[child.pid]
elif action == "pending":
return self.show_pendings(running=len(pworkers))
elif action == "summary":
res = self.get_summary()
# TODO: delete the follow two lines
# pworkers = dict([(pid, proc) for pid, proc in res["process"]["all"].items() if pid in res["process"]["running"]])
# tworkers = dict([(tid, thread) for tid, thread in res["thread"]["all"].items() if tid in res["thread"]["running"]])
pworkers = {pid: proc for pid, proc in res["process"]["all"].items() if pid in res["process"]["running"]}
tworkers = {tid: thread for tid, thread in res["thread"]["all"].items() if tid in res["thread"]["running"]}
out.append(self.print_workers(pworkers))
out.append(self.print_workers(tworkers))
out.append("%d running job(s)" % (len(pworkers) + len(tworkers)))
out.append("%s, type 'top(pending)' for more" % self.get_pending_summary())
if done_jobs:
out.append("%s finished job(s), type 'top(done)' for more" % len(done_jobs))
else:
raise ValueError("Unknown action '%s'" % action)
return "\n".join(out)
[docs]
def job_info(self):
summary = self.get_summary()
# prunning = summary["process"]["running"]
# trunning = summary["thread"]["running"]
# ppending = summary["process"]["pending"]
# tpending = summary["thread"]["pending"]
return {
"queue": {
"process": summary["process"],
"thread": summary["thread"],
},
"memory": summary["memory"],
"available_system_memory": summary["available_system_memory"],
"max_memory_usage": summary["max_memory_usage"],
"hub_pid": summary["hub_pid"],
}
[docs]
class CLIJobManager:
"""This is the minimal JobManager used in CLI mode to run async jobs, with the compatible methods as JobManager.
It won't use a dedicated ProcessPool or ThreadPool, and will just run async job directly in the asyncio loop
(which runs jobs in threads by default).
"""
def __init__(self, loop=None):
self.loop = loop or get_loop()
[docs]
async def defer_to_process(self, pinfo=None, func=None, *args, **kwargs):
"""keep the same signature as JobManager.defer_to_process. The passed pinfo is ignored.
defer_to_process will still run func in the thread using defer_to_thread method.
"""
fut = await self.defer_to_thread(pinfo, func, *args, **kwargs)
return fut
[docs]
async def defer_to_thread(self, pinfo=None, func=None, *args):
"""keep the same signature as JobManager.defer_to_thread. The passed pinfo is ignored"""
async def run(fut, func):
res = func()
fut.set_result(res)
fut = self.loop.create_future()
self.loop.create_task(run(fut, func))
return fut