Source code for biothings.utils.parallel

"""
Utils for running parallel jobs.
"""
from concurrent.futures import ProcessPoolExecutor


[docs] def run_jobs_on_parallel(worker, task_list, executor_args=None): """ This method will run multiple workers to handle the task_list, in a process pool, which is an easy way to run and manage processes. Parameters: - worker: a callable, which will be apply for an item of the task_list - task_list: a iterable, which contains task data should be processed. - executor_args: should be valid parameters for initializing a ProcessPoolExecutor. """ executor_args = executor_args or {} with ProcessPoolExecutor(**executor_args) as executor: return executor.map(worker, task_list)
[docs] def run_jobs_on_ipythoncluster(worker, task_list, shutdown_ipengines_after_done=False): import warnings warnings.warn(DeprecationWarning("This function is deprecated! Use run_jobs_on_parallel function instead.")) import os import time from ipyparallel import Client import biothings # ipcluster expecting to be in the folder where the app config is # because ipcluster will just import this file/module, kind of as if # it was building a main script from this lib import biothings.config as config biothings.config_for_app(config) from biothings.utils.common import ask, timesofar t0 = time.time() rc = Client(config.CLUSTER_CLIENT_JSON) lview = rc.load_balanced_view() cnt_nodes = len(lview.targets or rc.ids) print("\t# nodes in use: {}".format(cnt_nodes)) lview.block = False # move to app path lview.map(os.chdir, [config.APP_PATH] * cnt_nodes) print("\t# of tasks: {}".format(len(task_list))) print("\tsubmitting...", end="") job = lview.map_async(worker, task_list) print("done.") try: job.wait_interactive() except KeyboardInterrupt: # handle "Ctrl-C" if ask("\nAbort all submitted jobs?") == "Y": lview.abort() print("Aborted, all submitted jobs are cancelled.") else: print("Aborted, but your jobs are still running on the cluster.") return if len(job.result()) != len(task_list): print("WARNING:\t# of results returned ({}) != # of tasks ({}).".format(len(job.result()), len(task_list))) print("\ttotal time: {}".format(timesofar(t0))) if shutdown_ipengines_after_done: print("\tshuting down all ipengine nodes...", end="") lview.shutdown() print("Done.") return job.result()
[docs] def collection_partition(src_collection_list, step=100000): """This function is deprecated, not used anywhere""" import copy if not isinstance(src_collection_list, (list, tuple)): src_collection_list = [src_collection_list] kwargs = {} kwargs["limit"] = step for src_collection in src_collection_list: _kwargs = copy.copy(kwargs) _kwargs["src_collection"] = src_collection.name _kwargs["src_db"] = src_collection.database.name _kwargs["server"] = src_collection.database.connection.host _kwargs["port"] = src_collection.database.connection.port cnt = src_collection.count() for s in range(0, cnt, step): __kwargs = copy.copy(_kwargs) __kwargs["skip"] = s yield __kwargs