Source code for scine_puffin.jobloop

# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

import os
import psutil
import sys
import time
import ctypes
import multiprocessing
import random
import traceback
from datetime import datetime, timedelta
from importlib import import_module, util
from typing import Any, Dict, List
from json import dumps
from .config import Configuration

# A global variable holding the process actually running jobs.
# This variable is used to be able to interact with (mainly to kill) said
# process even from outside the loop() function.
PROCESS = None


def _log(config: Configuration, message: str):
    """
    Logs given message together with time stamp and line break if the running puffin has a configured log.

    Parameters
    ----------
    config : Configuration
        The configuration of the puffin
    message : str
        The message that is padded
    """
    if config["daemon"]["log"]:
        with open(config["daemon"]["log"], "a") as f:
            f.write(str(datetime.utcnow()) + ": " + config["daemon"]["uuid"] + ": " + message + "\n")


[docs]def slow_connect(manager, config: Configuration) -> None: """ Connects the given Manager to the database referenced in the Configuration. This version of connecting tries 30 times to connect to the database. Each attempt is followed by a wait time of `1.0 + random([0.0, 1.0])` seconds in order to stagger connection attempts of multiple Puffin instances. Parameters ---------- manager : scine_database.Manager The database manager/connection. config : scine_puffin.config.Configuration The current configuration of the Puffin. """ import scine_database as db name = config["database"]["name"] if "," in name: name = name.split(",")[0] credentials = db.Credentials(config["database"]["ip"], config["database"]["port"], name) _log(config, "Connecting to: {:s}:{:d} {:s}".format(config["database"]["ip"], config["database"]["port"], name)) manager.set_credentials(credentials) # Allow 30 to 60 seconds to try and connect to the database for _ in range(30): try: manager.connect() break except BaseException as e: _log(config, "Connection failed with " + str(e) + ". Keep trying to connect.") r = random.uniform(0, 1) time.sleep(1.0 + r) else: manager.connect()
[docs]def kill_daemon(config: Configuration) -> None: """ Kills the Puffin instantaneously without any possibility of a graceful exit. Parameters ---------- config : scine_puffin.config.Configuration The current configuration of the Puffin. """ # Remove stop file if present if config.daemon()["remove_stop_file"]: stop_file = config.daemon()["stop"] if os.path.isfile(stop_file): try: os.remove(stop_file) except FileNotFoundError: pass # Kill the daemon process pid_file = config["daemon"]["pid"] if PROCESS is not None: parent = psutil.Process(PROCESS.pid) for child in parent.children(recursive=True): child.kill() parent.kill() if os.path.isfile(pid_file): with open(pid_file, "r") as f: pid = int(f.readline().strip()) os.remove(pid_file) parent = psutil.Process(pid) for child in parent.children(recursive=True): child.kill() parent.kill()
[docs]def loop(config: Configuration, available_jobs: dict) -> None: """ The outer loop function. This function controls the forked actual loop function, which is implemented in _loop_impl(). The loop has an added timeout and also a 15 min ping is added showing that the runner is still alive. Parameters ---------- config : scine_puffin.config.Configuration The current configuration of the Puffin. available_jobs : dict The dictionary of available jobs, given the current config and runtime environment. """ if config["daemon"]["mode"].lower() == "debug": _loop_impl(config=config, available_jobs=available_jobs) sys.exit() # Connect to database import scine_database as db manager = db.Manager() slow_connect(manager, config) # Generate shared variable # Shared variables have to be ctypes so this is a bit ugly JOB: Any = multiprocessing.Array(ctypes.c_char, 200) JOB.value = "".encode("utf-8") CURRENT_DB: Any = multiprocessing.Array(ctypes.c_char, 200) CURRENT_DB.value = manager.get_database_name().encode("utf-8") # Run the loop in a second process PROCESS = multiprocessing.Process( # pylint: disable=redefined-outer-name target=_loop_impl, args=(), kwargs={ "config": config, "available_jobs": available_jobs, "JOB": JOB, "CURRENT_DB": CURRENT_DB, }, ) PROCESS.start() # Check for timeout timeout_setting = config["daemon"]["timeout_in_h"] timeout = timedelta(hours=timeout_setting) if timeout_setting > 0 else None idle_setting = config["daemon"]["idle_timeout_in_h"] idle_timeout = timedelta(hours=idle_setting) if idle_setting > 0 else None touch_time = timedelta(seconds=config["daemon"]["touch_time_in_s"]) reset_delta = timedelta(seconds=config["daemon"]["job_reset_time_in_s"]) last_time_with_a_job = datetime.now() last_touch = datetime.now() start = datetime.now() while PROCESS.is_alive(): time.sleep(1.0) # Kill the puffin if it was idle for too long now = datetime.now() if JOB.value.decode("utf-8"): last_time_with_a_job = now if idle_timeout is not None and (now - last_time_with_a_job) > idle_timeout: _log(config, "Puffin reached idle timeout") kill_daemon(config) # Kill job if it is out of time if timeout is not None and (now - start) > timeout: # But first reset the calculation if JOB.value.decode("utf-8"): manager.set_database_name(CURRENT_DB.value.decode("utf-8")) calculations = manager.get_collection("calculations") job = db.Calculation(db.ID(JOB.value.decode("utf-8")), calculations) job.set_status(db.Status.NEW) job.set_executor("") _log(config, "Puffin reached timeout") kill_daemon(config) # Touch current calculation every so often if (now - last_touch) > touch_time: # Touch the own calculation last_touch = now try: manager.set_database_name(CURRENT_DB.value.decode("utf-8")) calculations = manager.get_collection("calculations") if JOB.value.decode("utf-8"): job = db.Calculation(db.ID(JOB.value.decode("utf-8")), calculations) job.touch() _log(config, "Touching Job: {:s}".format(str(job.id()))) # TODO maybe move this onto a separate timer/if to reduce DB queries _check_touch_of_pending_jobs(manager, calculations, config, reset_delta) except BaseException as e: # If it isn't possible to work with the database, kill the # job/loop and stop. _log(config, "Failed to work with database, received error {:s}\n".format(str(e))) kill_daemon(config)
def _check_touch_of_pending_jobs( manager, calculations, config: Configuration, reset_delta: timedelta ) -> None: """ Checks for calculation of other Puffins that are pending. If these jobs have not been touched in due time they are reset, as their Puffins are expected to be dead. Parameters ---------- manager : scine_database.Manager The database connection calculations : scine_database.Collection The collection holding all calculations reset_delta : int The time difference after which a job is assumed to be dead. Time given in seconds. """ import scine_database as db # Check for dead jobs in pending status in the database selection = {"status": "pending"} server_now = manager.server_time() for pending_calculation in calculations.query_calculations(dumps(selection)): last_modified = pending_calculation.last_modified() if (server_now - last_modified) > reset_delta: _log(config, "Resetting Job: {:s}".format(str(pending_calculation.id()))) pending_calculation.set_status(db.Status.NEW) pending_calculation.set_executor("")
[docs]def check_setup(config: Configuration) -> Dict[str, str]: """ Checks if all the programs are correctly installed or reachable. Parameters ---------- config : scine_puffin.config.Configuration The current configuration of the Puffin. """ scine_database = util.find_spec("scine_database") if scine_database is None: print("Missing SCINE Database backend, please bootstrap Puffin.") sys.exit(1) scine_database = util.find_spec("scine_utilities") if scine_database is None: print("Missing SCINE Utilities, please bootstrap Puffin.") sys.exit(1) scine_readuct = util.find_spec("scine_readuct") if scine_readuct is None: print("SCINE ReaDuct is not available for Puffin. Note that this will disable nearly all exploration jobs.") # Generate the list of available programs available_programs = [] for program_name, settings in config.programs().items(): if settings["available"]: available_programs.append(program_name) if scine_readuct is None and "readuct" in available_programs: raise RuntimeError("SCINE ReaDuct was not found by Puffin but is set as available in the run configuration.\n" "Please make sure that SCINE ReaDuct is installed properly, bootstrap Puffin, or disable\n" "SCINE ReaDuct in the run configuration.") # Initialize all available programs for program_name in available_programs: class_name = "".join([s.capitalize() for s in program_name.split("_")]) module = import_module("scine_puffin.programs." + program_name) class_ = getattr(module, class_name) class_.initialize() # Gather list of all jobs all_jobs = [] import scine_puffin.jobs for path in scine_puffin.jobs.__path__: for _, dirs, files in os.walk(path): for name in files: if name.endswith(".py") and name != "__init__.py" and "templates" in dirs: all_jobs.append(name[:-3]) # Generate list of jobs for which the required programs are present available_jobs = {} for job in all_jobs: class_name = "".join([s.capitalize() for s in job.split("_")]) module = import_module("scine_puffin.jobs." + job) class_ = getattr(module, class_name) required_programs = class_.required_programs() for program in required_programs: if program not in available_programs: break else: available_jobs[job] = class_name # Output results print("") print("Available Resources:") print(" {:18} {:6d}.000".format("Threads:", config["resources"]["cores"])) print(" {:18} {:10.3f} GB".format("Total RAM:", config["resources"]["memory"])) print(" {:18} {:10.3f} GB".format("RAM/Thread:", config["resources"]["memory"] / config["resources"]["cores"])) print(" {:18} {:10.3f} GB".format("Total Disk Space:", config["resources"]["disk"])) print( " {:18} {:10.3f} GB".format( "Disk Space/Thread:", config["resources"]["disk"] / config["resources"]["cores"], ) ) print("") print("Available Programs:") for available_program in available_programs: print(" - " + available_program) print("") print("Accepting Jobs:") for job_name, class_name in available_jobs.items(): print(" - " + job_name) print("") return available_jobs
def _loop_impl( config: Configuration, available_jobs: dict, JOB=None, CURRENT_DB=None ) -> None: """ The actual loop, executing jobs and handling all calculation related operations. Parameters ---------- config : scine_puffin.config.Configuration The current configuration of the Puffin. available_jobs : dict A dictionary of all jobs that are available to this Puffin. JOB : multiprocessing.Array Possibly a shared array of chars (string) to share the current jobs ID with external code. Default: ``None`` CURRENT_DB : multiprocessing.Array The name of the current database, used to sync the two threads in case of multi-database usage of a single Puffin. """ import scine_database as db # Connect to database manager = db.Manager() slow_connect(manager, config) # Initialize loop variables sleep = timedelta(seconds=config["daemon"]["cycle_time_in_s"]) last_cycle = datetime.now() job_list = list(available_jobs.keys()) program_list = ["any"] version_list = [] for program_name, settings in config.programs().items(): if settings["available"]: program_list.append(program_name) version_list.append(program_name + settings["version"]) # Initialize cache for failure checks previously_failed_job_count = 0 previously_failed_jobs: List[db.ID] = [] previous_dbs: List[str] = [] n_jobs_run = 0 while True: # Stop the loop if a stop file has been written stop_file = config["daemon"]["stop"] if os.path.isfile(stop_file): if config.daemon()["remove_stop_file"]: try: os.remove(stop_file) except FileNotFoundError: pass _log(config, "Detected stop file " + stop_file + " and stopped puffin.") break # Wait if needed loop_time = datetime.now() - last_cycle if loop_time < sleep: time.sleep(int(round((sleep - loop_time).total_seconds()))) last_cycle = datetime.now() # Verify that the database is still listening while not manager.is_connected(): # Retry connection until killed time.sleep(10) try: manager.disconnect() manager.connect() except BaseException as e: _log(config, "Failed to connect to database with error " + str(e) + ". Keep trying to connect.") # =================== # Job procurement # =================== # use ',' as db separator db_names = config["database"]["name"].split(",") for db_name in db_names: # Switch to requested DB manager.set_database_name(db_name) if CURRENT_DB is not None: CURRENT_DB.value = db_name.encode("utf-8") # Get calculations collection from current DB if not manager.has_collection("calculations"): # This DB is not initialized, skip to next one continue collection = manager.get_collection("calculations") selection = { "$and": [ {"status": "new"}, {"job.cores": {"$lte": int(config["resources"]["cores"])}}, {"job.disk": {"$lte": float(config["resources"]["disk"])}}, {"job.memory": {"$lte": float(config["resources"]["memory"])}}, {"job.order": {"$in": job_list}}, {"model.program": {"$in": program_list}} # { '$or' : [ # {'model.version' : { '$eq' : 'any'} }, # {'model.program + model.version' : { '$in' : version_list} } # ]} ] } this_puffin_id = config["daemon"]["uuid"] update = {"$set": {"status": "pending", "executor": this_puffin_id}} # sort for first priority and then most expensive calculation with cores, then memory, then disk sort = {'priority': 1, 'job.cores': -1, 'job.memory': -1, 'job.disk': -1} calculation = collection.get_and_update_one_calculation(dumps(selection), dumps(update), dumps(sort)) calculation.link(collection) if calculation.has_id(): # we found a calculation time_waited = 0.0 executor = calculation.get_executor() while executor.strip() == "" and time_waited < 120.0: # if the update step has not been properly processed yet, wait time.sleep(0.1) time_waited += 0.1 executor = calculation.get_executor() if time_waited >= 120.0: # Job acquisition timed out, continue continue if executor != this_puffin_id: message = 'Wanted to do calculation {:s} with puffin {:s}, but puffin {:s} already wrote into ' \ 'calculation in the mean time, we got a problem with atomic operations'.format( str(calculation.id()), this_puffin_id, executor) _log(config, message) _fail_calculation(calculation, config, message, datetime.now()) return # kill puffin since it would do a pointless calculation # touch and thus update the timestamp calculation.touch() # Leave db loop if calculation was found break # Skip job execution if no calculation found else: continue # ================= # Job execution # ================= # Log the job id if JOB is not None: JOB.value = calculation.id().string().encode("utf-8") # Load requested job job_name = calculation.get_job().order try: class_name = available_jobs[job_name] except BaseException as e: raise KeyError("Missing Job in list of possible jobs.\n" + "Dev-Note: This error should not be reachable.") from e module = import_module("scine_puffin.jobs." + job_name) class_ = getattr(module, class_name) SUCCESS: Any = multiprocessing.Value('i', False) # Create value in shared memory. Use int for bool flag # Run the job in a third process JOB_PROCESS = multiprocessing.Process( target=_job_execution, args=(), kwargs={ "config": config, "job_class": class_, "manager": manager, "calculation": calculation, "SUCCESS": SUCCESS, }, ) JOB_PROCESS.start() start = datetime.now() # in case we need to abort the job, we can set a runtime # monitor job for memory usage and wait for finishing if bool(config['daemon']['enforce_memory_limit']): process = psutil.Process(JOB_PROCESS.pid) mem_limit = float(config['resources']['memory']) while JOB_PROCESS.is_alive(): time.sleep(1.0) try: memory = process.memory_info().rss children = process.children(recursive=True) except BaseException: # process is likely already finished, simply check if still alive continue for child in children: try: memory += child.memory_info().rss except BaseException: # child finished between gathering it and trying to access its info # hence its memory usage should be 0 anyway, continue with next child pass # make comparison in GB if memory / (1024 ** 3) > mem_limit: # we have exceeded memory limit, kill job process and fail calculation with error children = process.children(recursive=True) # ensure we get the latest possible for child in children: child.kill() process.kill() SUCCESS = None _fail_calculation(calculation, config, "ERROR: Calculation exceeded given memory limit", start) _log(config, "Stopping calculation {:s} because of exceeded memory limit of {:f} GB.".format( str(calculation.id()), mem_limit)) else: # do not enforce just wait JOB_PROCESS.join() if calculation.get_status() == db.Status.PENDING: _fail_calculation(calculation, config, "ERROR: Puffin did not end this calculation properly.\n" "Most likely a process during the job killed the puffin with " "an illegal instruction.", start) _log(config, "Calculation {:s} was not properly ended and its status was set to failed." .format(str(calculation.id()))) # accounting of maximum number of jobs and maximum serial job fails n_jobs_run += 1 max_n_jobs = config["daemon"]["max_number_of_jobs"] if 0 < max_n_jobs <= n_jobs_run: _log(config, "Stopping Puffin due to maximum number of jobs ({:d}) being reached".format(max_n_jobs)) break success = False if SUCCESS is None else bool(SUCCESS.value) if success: previously_failed_job_count = 0 previously_failed_jobs = [] previous_dbs = [] elif CURRENT_DB is not None: previously_failed_job_count += 1 previously_failed_jobs.append(calculation.id()) previous_dbs.append(CURRENT_DB.value.decode("utf-8")) if JOB is not None: JOB.value = "".encode("utf-8") # Check for repeated job failures and take action if previously_failed_job_count >= config["daemon"]["repeated_failure_stop"]: # Reset previous jobs for idx, pdb in zip(previously_failed_jobs, previous_dbs): manager.set_database_name(pdb) calculations = manager.get_collection("calculations") calculation = db.Calculation(idx, calculations) calculation.set_status(db.Status.NEW) calculation.set_raw_output("") comment = calculation.get_comment() calculation.set_comment("Calculation has been reset.\nComment of previous run: " + comment) # Log and exit _log(config, "Stopping Puffin due to {:d} consecutive failed jobs".format(previously_failed_job_count)) break def _job_execution(config: Configuration, job_class: type, manager, calculation, SUCCESS=None) -> None: """ We are running job in a separate process to save us from SegFaults and enforce memory limit """ job = job_class() _log(config, "Processing Job: {:s}".format(str(calculation.id()))) # Prepare job directory and start timer start = datetime.now() job.prepare(config["daemon"]["job_dir"], calculation.id()) # Initialize programs that need initialization for program_name, settings in config.programs().items(): if settings["available"]: # Initialize all available programs class_name = "".join([s.capitalize() for s in program_name.split("_")]) module = import_module("scine_puffin.programs." + program_name) class_ = getattr(module, class_name) class_.initialize() # Run job success = job.run(manager, calculation, config) # we already write a runtime in case puffin fails during copying operations # this avoids that a completed calculation is missing a runtime # the runtime is set again after copying to include this additional time prelim_end = datetime.now() calculation.set_runtime((prelim_end - start).total_seconds()) # Stop if maximum number of jobs is reached # Archive if requested and job was successful archive = config["daemon"]["archive_dir"] if success and archive: try: job.archive(archive) except OSError: _log(config, "Failed to archive job: {:s} with error {:s}".format( str(calculation.id()), traceback.format_exc())) # Archive error if requested and job has failed error = config["daemon"]["error_dir"] if not success and error: try: job.archive(error) except OSError: _log(config, "Failed to archive job: {:s} with error {:s}".format( str(calculation.id()), traceback.format_exc())) # End timer end = datetime.now() calculation.set_runtime((end - start).total_seconds()) # Update job info with actual system specs # Clean job remains _update_job_specs(calculation, config) job.clear() # only now set shared variable to signal whether job was success or not # if this process dies it is assumed to be False SUCCESS.value = success def _fail_calculation(calculation, config: Configuration, comment_to_add: str, start: datetime) -> None: import scine_database as db calculation.set_status(db.Status.FAILED) _update_job_specs(calculation, config) comment = calculation.get_comment() comment += "\n" + comment_to_add calculation.set_comment(comment) calculation.set_runtime((datetime.now() - start).total_seconds()) def _update_job_specs(calculation, config: Configuration) -> None: db_job = calculation.get_job() db_job.cores = int(config["resources"]["cores"]) db_job.disk = float(config["resources"]["disk"]) db_job.memory = float(config["resources"]["memory"]) calculation.set_job(db_job)