Source code for puffin.jobloop

# -*- coding: utf-8 -*-
__copyright__ = """This file is part of SCINE Puffin.
This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory for Physical Chemistry, Reiher Group.
See LICENSE.txt for details
"""

import os
import sys
import time
import ctypes
import multiprocessing
import subprocess
import random
from datetime import datetime, timedelta
from importlib import import_module
from json import dumps
from .config import Configuration

# A global variable holding the process actually running jobs.
# This variable is used to be able to interact with (mainly to kill) said
# process even from outside the loop() function.
PROCESS = None


[docs]def slow_connect(manager, config: Configuration): """ Connects the given Manager to the database referenced in the Configuration. This version of connecting tries 30 times to connect to the database. Each attempt is followed by a wait time of `1.0 + random([0.0, 1.0])` seconds in order to stagger connection attempts of multiple Puffin instances. Parameters ---------- config :: puffin.config.Configuration The current configuration of the Puffin. available_jobs :: dict A dictionary of all jobs that are available to this Puffin. """ import scine_database as db credentials = db.Credentials(config['database']['ip'], config['database']['port'], config['database']['name']) if config['daemon']['log']: with open(config['daemon']['log'], 'a') as f: f.write('{:s}: Connecting to: {:s}:{:d} {:s}\n'.format( str(datetime.utcnow()), config['database']['ip'], config['database']['port'], config['database']['name'])) manager.set_credentials(credentials) # Allow 30 to 60 seconds to try and connect to the database for connection_timeout in range(30): try: manager.connect() break except: r = random.uniform(0, 1) time.sleep(1.0 + r) else: manager.connect()
[docs]def loop(config: Configuration, available_jobs: dict): """ The outer loop function. This function controls the forked actual loop function, which is implemented in _loop_impl(). The loop has an added timeout and also a 15 min ping is added showing that the runner is still alive. Parameters ---------- manager :: db.Manager (Scine::Database::Manager) The manager/database to work on/with. config :: puffin.config.Configuration The current configuration of the Puffin. """ if config['daemon']['mode'] == 'debug': _loop_impl(config=config, available_jobs=available_jobs) sys.exit() # Generate shared variable # Shared variables have to be ctypes so this is a bit ugly JOB = multiprocessing.Array(ctypes.c_char, 200) JOB.value = ''.encode('utf-8') # Connect to database import scine_database as db manager = db.Manager() slow_connect(manager, config) # Run the loop in a second process PROCESS = multiprocessing.Process(target=_loop_impl, args=(), kwargs={ 'config': config, 'available_jobs': available_jobs, 'JOB': JOB}) PROCESS.start() calculations = manager.get_collection('calculations') # Check for timeout timeout = config['daemon']['timeout_in_h'] touch_time = config['daemon']['touch_time_in_s'] idle_timeout = config['daemon']['idle_timeout_in_h'] last_time_with_a_job = time.time() start = time.time() last_touch = 0.0 while PROCESS.is_alive(): time.sleep(1.0) # Kill the puffin if it was idle for too long if JOB.value.decode('utf-8'): last_time_with_a_job = time.time() if idle_timeout > 0.0 and (time.time() - last_time_with_a_job) > idle_timeout*3600.0: subprocess.run(["kill", "-9", str(PROCESS.pid)]) # Kill job if it is out of time if timeout and (time.time() - start) > timeout*3600.0: # But first reset the calculation if JOB.value.decode('utf-8'): job = db.Calculation(db.ID(JOB.value.decode('utf-8'))) job.link(calculations) job.set_status(db.Status.NEW) job.set_executor('') subprocess.run(["kill", "-9", str(PROCESS.pid)]) # Touch current calculation every so often if ((time.time() - last_touch) > touch_time): # Touch the own calculation last_touch = time.time() try: if JOB.value.decode('utf-8'): job = db.Calculation(db.ID(JOB.value.decode('utf-8'))) job.link(calculations) job.touch() if config['daemon']['log']: with open(config['daemon']['log'], 'a') as f: f.write('{:s}: Touching Job: {:s}\n'.format( str(datetime.utcnow()), job.id().string())) # Also check for dead jobs in the database selection = {'status': {'$eq': 'pending'}} server_now = manager.server_time() for pending_calculation in calculations.iterate_calculations(dumps(selection)): pending_calculation.link(calculations) last_modified = pending_calculation.last_modified() if (server_now - last_modified) > timedelta(seconds=(touch_time + 120)): if config['daemon']['log']: with open(config['daemon']['log'], 'a') as f: f.write('{:s}: Resetting Job: {:s}\n'.format( str(datetime.utcnow()), pending_calculation.id().string())) pending_calculation.set_status(db.Status.NEW) pending_calculation.set_executor('') except: # If it isn't possible to work with the database, kill the # job/loop and stop. subprocess.run(["kill", "-9", str(PROCESS.pid)])
[docs]def check_setup(config: Configuration): """ Checks if all the programs are correctly installed or reachable. Parameters ---------- config :: puffin.config.Configuration The current configuration of the Puffin. """ try: import scine_database except: print("Missing SCINE Database backend, please bootstrap Puffin.") sys.exit(1) try: import scine_utilities except: print("Missing SCINE Utilities, please bootstrap Puffin.") sys.exit(1) try: import scine_readuct except: print("Missing SCINE ReaDuct, please bootstrap Puffin.") sys.exit(1) # Generate the list of available programs available_programs = [] for program_name, settings in config.programs().items(): if settings['available']: available_programs.append(program_name) # Gather list of all jobs all_jobs = [] import puffin.jobs for path in puffin.jobs.__path__: for root, dirs, files in os.walk(path): for name in files: if name.endswith('.py') and name != 'job.py' and name != '__init__.py': all_jobs.append(name[:-3]) # Generate list of jobs for which the required programs are present available_jobs = {} for job in all_jobs: class_name = ''.join([s.capitalize() for s in job.split("_")]) module = import_module("puffin.jobs."+job) class_ = getattr(module, class_name) job_instance = class_() required_programs = job_instance.required_programs() for program in required_programs: if not program in available_programs: break else: available_jobs[job] = class_name # Output results print('') print('Available Resources:') print(' {:18} {:6d}.000'.format('Threads:', config['resources']['cores'])) print(' {:18} {:10.3f} GB'.format('Total RAM:', config['resources']['memory'])) print(' {:18} {:10.3f} GB'.format('RAM/Thread:', config['resources']['memory']/config['resources']['cores'])) print(' {:18} {:10.3f} GB'.format('Total Disk Space:', config['resources']['disk'])) print(' {:18} {:10.3f} GB'.format('Disk Space/Thread:', config['resources']['disk']/config['resources']['cores'])) print('') print('Available Programs:') for available_program in available_programs: print(' - '+available_program) print('') print('Accepting Jobs:') for job_name, class_name in available_jobs.items(): print(' - '+job_name) print('') return available_jobs
def _loop_impl(config: Configuration, available_jobs: dict, JOB=None): """ The actual loop, executing jobs and handling all calculation related operations. Parameters ---------- config :: puffin.config.Configuration The current configuration of the Puffin. available_jobs :: dict A dictionary of all jobs that are available to this Puffin. JOB :: multiprocessing.Array Possibly a shared array of chars (string) to share the current jobs ID with external code. Default: ``None`` """ import scine_database as db # Connect to database manager = db.Manager() slow_connect(manager, config) collection = manager.get_collection('calculations') # Initialize loop variables sleep = config['daemon']['cycle_time_in_s'] last_cycle = time.time() job_list = [] for key in available_jobs.keys(): job_list.append(key) program_list = ['any'] version_list = [] for program_name, settings in config.programs().items(): if settings['available']: program_list.append(program_name) version_list.append(program_name+settings['version']) while True: # Stop the loop if a stop file has been written stop_file = config['daemon']['stop'] if os.path.isfile(stop_file): os.remove(stop_file) break # Wait if needed now = time.time() if (now-last_cycle) < sleep: time.sleep(sleep-now+last_cycle) last_cycle = time.time() # Check for new calculations update = {'$set': {'status': 'pending', 'executor': config['daemon']['uuid']}} selection = {'$and': [ {'status': {'$eq': 'new'}}, {'job.cores': {'$lte': int(config['resources']['cores'])}}, {'job.disk': {'$lte': float(config['resources']['disk'])}}, {'job.memory': {'$lte': float(config['resources']['memory'])}}, {'job.order': {'$in': job_list}}, {'model.program': {'$in': program_list}} # { '$or' : [ # {'model.version' : { '$eq' : 'any'} }, # {'model.program + model.version' : { '$in' : version_list} } # ]} ]} calculation = collection.get_and_update_one_calculation(dumps(selection), dumps(update)) calculation.link(collection) # Continue if no jobs are found if not calculation.has_id(): continue # touch and thus update the timestamp otherwise calculation.touch() # Log the job id if JOB is not None: JOB.value = calculation.id().string().encode('utf-8') if config['daemon']['log']: with open(config['daemon']['log'], 'a') as f: f.write('{:s}: Processing Job: {:s}\n'.format(str(datetime.utcnow()), calculation.id().string())) # Load requested job job_name = calculation.get_job().order try: class_name = available_jobs[job_name] except: raise KeyError('Missing Job in list of possible jobs.\n' + 'Dev-Note: This error should not be reachable.') module = import_module("puffin.jobs."+job_name) class_ = getattr(module, class_name) job = class_() # Prepare job directory and start timer start = time.time() job.prepare(config['daemon']['job_dir'], calculation.id()) # Run job success = job.run(manager, calculation, config) # Archive if requested and job was successful archive = config['daemon']['archive_dir'] if success and archive: job.archive(archive) # Archive error if requested and job has failed error = config['daemon']['error_dir'] if not success and error: job.archive(error) # End timer end = time.time() calculation.set_runtime(end-start) # Update job info with actual system specs db_job = calculation.get_job() db_job.cores = int(config['resources']['cores']) db_job.disk = float(config['resources']['disk']) db_job.memory = float(config['resources']['memory']) calculation.set_job(db_job) # Clean job remains job.clear() if JOB is not None: JOB.value = ''.encode('utf-8')