# -*- coding: utf-8 -*-
__copyright__ = """This file is part of SCINE Puffin.
This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory for Physical Chemistry, Reiher Group.
See LICENSE.txt for details
"""
import os
import sys
import time
import ctypes
import multiprocessing
import subprocess
import random
from datetime import datetime, timedelta
from importlib import import_module
from json import dumps
from .config import Configuration
# A global variable holding the process actually running jobs.
# This variable is used to be able to interact with (mainly to kill) said
# process even from outside the loop() function.
PROCESS = None
[docs]def slow_connect(manager, config: Configuration):
"""
Connects the given Manager to the database referenced in the Configuration.
This version of connecting tries 30 times to connect to the database.
Each attempt is followed by a wait time of `1.0 + random([0.0, 1.0])` seconds in
order to stagger connection attempts of multiple Puffin instances.
Parameters
----------
config :: puffin.config.Configuration
The current configuration of the Puffin.
available_jobs :: dict
A dictionary of all jobs that are available to this Puffin.
"""
import scine_database as db
credentials = db.Credentials(config['database']['ip'], config['database']['port'],
config['database']['name'])
if config['daemon']['log']:
with open(config['daemon']['log'], 'a') as f:
f.write('{:s}: Connecting to: {:s}:{:d} {:s}\n'.format(
str(datetime.utcnow()), config['database']['ip'], config['database']['port'],
config['database']['name']))
manager.set_credentials(credentials)
# Allow 30 to 60 seconds to try and connect to the database
for connection_timeout in range(30):
try:
manager.connect()
break
except:
r = random.uniform(0, 1)
time.sleep(1.0 + r)
else:
manager.connect()
[docs]def loop(config: Configuration, available_jobs: dict):
"""
The outer loop function.
This function controls the forked actual loop function, which is implemented
in _loop_impl(). The loop has an added timeout and also a 15 min ping is
added showing that the runner is still alive.
Parameters
----------
manager :: db.Manager (Scine::Database::Manager)
The manager/database to work on/with.
config :: puffin.config.Configuration
The current configuration of the Puffin.
"""
if config['daemon']['mode'] == 'debug':
_loop_impl(config=config, available_jobs=available_jobs)
sys.exit()
# Generate shared variable
# Shared variables have to be ctypes so this is a bit ugly
JOB = multiprocessing.Array(ctypes.c_char, 200)
JOB.value = ''.encode('utf-8')
# Connect to database
import scine_database as db
manager = db.Manager()
slow_connect(manager, config)
# Run the loop in a second process
PROCESS = multiprocessing.Process(target=_loop_impl, args=(), kwargs={
'config': config,
'available_jobs': available_jobs, 'JOB': JOB})
PROCESS.start()
calculations = manager.get_collection('calculations')
# Check for timeout
timeout = config['daemon']['timeout_in_h']
touch_time = config['daemon']['touch_time_in_s']
idle_timeout = config['daemon']['idle_timeout_in_h']
last_time_with_a_job = time.time()
start = time.time()
last_touch = 0.0
while PROCESS.is_alive():
time.sleep(1.0)
# Kill the puffin if it was idle for too long
if JOB.value.decode('utf-8'):
last_time_with_a_job = time.time()
if idle_timeout > 0.0 and (time.time() - last_time_with_a_job) > idle_timeout*3600.0:
subprocess.run(["kill", "-9", str(PROCESS.pid)])
# Kill job if it is out of time
if timeout and (time.time() - start) > timeout*3600.0:
# But first reset the calculation
if JOB.value.decode('utf-8'):
job = db.Calculation(db.ID(JOB.value.decode('utf-8')))
job.link(calculations)
job.set_status(db.Status.NEW)
job.set_executor('')
subprocess.run(["kill", "-9", str(PROCESS.pid)])
# Touch current calculation every so often
if ((time.time() - last_touch) > touch_time):
# Touch the own calculation
last_touch = time.time()
try:
if JOB.value.decode('utf-8'):
job = db.Calculation(db.ID(JOB.value.decode('utf-8')))
job.link(calculations)
job.touch()
if config['daemon']['log']:
with open(config['daemon']['log'], 'a') as f:
f.write('{:s}: Touching Job: {:s}\n'.format(
str(datetime.utcnow()), job.id().string()))
# Also check for dead jobs in the database
selection = {'status': {'$eq': 'pending'}}
server_now = manager.server_time()
for pending_calculation in calculations.iterate_calculations(dumps(selection)):
pending_calculation.link(calculations)
last_modified = pending_calculation.last_modified()
if (server_now - last_modified) > timedelta(seconds=(touch_time + 120)):
if config['daemon']['log']:
with open(config['daemon']['log'], 'a') as f:
f.write('{:s}: Resetting Job: {:s}\n'.format(
str(datetime.utcnow()), pending_calculation.id().string()))
pending_calculation.set_status(db.Status.NEW)
pending_calculation.set_executor('')
except:
# If it isn't possible to work with the database, kill the
# job/loop and stop.
subprocess.run(["kill", "-9", str(PROCESS.pid)])
[docs]def check_setup(config: Configuration):
"""
Checks if all the programs are correctly installed or reachable.
Parameters
----------
config :: puffin.config.Configuration
The current configuration of the Puffin.
"""
try:
import scine_database
except:
print("Missing SCINE Database backend, please bootstrap Puffin.")
sys.exit(1)
try:
import scine_utilities
except:
print("Missing SCINE Utilities, please bootstrap Puffin.")
sys.exit(1)
try:
import scine_readuct
except:
print("Missing SCINE ReaDuct, please bootstrap Puffin.")
sys.exit(1)
# Generate the list of available programs
available_programs = []
for program_name, settings in config.programs().items():
if settings['available']:
available_programs.append(program_name)
# Gather list of all jobs
all_jobs = []
import puffin.jobs
for path in puffin.jobs.__path__:
for root, dirs, files in os.walk(path):
for name in files:
if name.endswith('.py') and name != 'job.py' and name != '__init__.py':
all_jobs.append(name[:-3])
# Generate list of jobs for which the required programs are present
available_jobs = {}
for job in all_jobs:
class_name = ''.join([s.capitalize() for s in job.split("_")])
module = import_module("puffin.jobs."+job)
class_ = getattr(module, class_name)
job_instance = class_()
required_programs = job_instance.required_programs()
for program in required_programs:
if not program in available_programs:
break
else:
available_jobs[job] = class_name
# Output results
print('')
print('Available Resources:')
print(' {:18} {:6d}.000'.format('Threads:', config['resources']['cores']))
print(' {:18} {:10.3f} GB'.format('Total RAM:', config['resources']['memory']))
print(' {:18} {:10.3f} GB'.format('RAM/Thread:', config['resources']['memory']/config['resources']['cores']))
print(' {:18} {:10.3f} GB'.format('Total Disk Space:', config['resources']['disk']))
print(' {:18} {:10.3f} GB'.format('Disk Space/Thread:',
config['resources']['disk']/config['resources']['cores']))
print('')
print('Available Programs:')
for available_program in available_programs:
print(' - '+available_program)
print('')
print('Accepting Jobs:')
for job_name, class_name in available_jobs.items():
print(' - '+job_name)
print('')
return available_jobs
def _loop_impl(config: Configuration, available_jobs: dict, JOB=None):
"""
The actual loop, executing jobs and handling all calculation related
operations.
Parameters
----------
config :: puffin.config.Configuration
The current configuration of the Puffin.
available_jobs :: dict
A dictionary of all jobs that are available to this Puffin.
JOB :: multiprocessing.Array
Possibly a shared array of chars (string) to share the current jobs ID
with external code. Default: ``None``
"""
import scine_database as db
# Connect to database
manager = db.Manager()
slow_connect(manager, config)
collection = manager.get_collection('calculations')
# Initialize loop variables
sleep = config['daemon']['cycle_time_in_s']
last_cycle = time.time()
job_list = []
for key in available_jobs.keys():
job_list.append(key)
program_list = ['any']
version_list = []
for program_name, settings in config.programs().items():
if settings['available']:
program_list.append(program_name)
version_list.append(program_name+settings['version'])
while True:
# Stop the loop if a stop file has been written
stop_file = config['daemon']['stop']
if os.path.isfile(stop_file):
os.remove(stop_file)
break
# Wait if needed
now = time.time()
if (now-last_cycle) < sleep:
time.sleep(sleep-now+last_cycle)
last_cycle = time.time()
# Check for new calculations
update = {'$set': {'status': 'pending', 'executor': config['daemon']['uuid']}}
selection = {'$and': [
{'status': {'$eq': 'new'}},
{'job.cores': {'$lte': int(config['resources']['cores'])}},
{'job.disk': {'$lte': float(config['resources']['disk'])}},
{'job.memory': {'$lte': float(config['resources']['memory'])}},
{'job.order': {'$in': job_list}},
{'model.program': {'$in': program_list}}
# { '$or' : [
# {'model.version' : { '$eq' : 'any'} },
# {'model.program + model.version' : { '$in' : version_list} }
# ]}
]}
calculation = collection.get_and_update_one_calculation(dumps(selection), dumps(update))
calculation.link(collection)
# Continue if no jobs are found
if not calculation.has_id():
continue
# touch and thus update the timestamp otherwise
calculation.touch()
# Log the job id
if JOB is not None:
JOB.value = calculation.id().string().encode('utf-8')
if config['daemon']['log']:
with open(config['daemon']['log'], 'a') as f:
f.write('{:s}: Processing Job: {:s}\n'.format(str(datetime.utcnow()),
calculation.id().string()))
# Load requested job
job_name = calculation.get_job().order
try:
class_name = available_jobs[job_name]
except:
raise KeyError('Missing Job in list of possible jobs.\n' +
'Dev-Note: This error should not be reachable.')
module = import_module("puffin.jobs."+job_name)
class_ = getattr(module, class_name)
job = class_()
# Prepare job directory and start timer
start = time.time()
job.prepare(config['daemon']['job_dir'], calculation.id())
# Run job
success = job.run(manager, calculation, config)
# Archive if requested and job was successful
archive = config['daemon']['archive_dir']
if success and archive:
job.archive(archive)
# Archive error if requested and job has failed
error = config['daemon']['error_dir']
if not success and error:
job.archive(error)
# End timer
end = time.time()
calculation.set_runtime(end-start)
# Update job info with actual system specs
db_job = calculation.get_job()
db_job.cores = int(config['resources']['cores'])
db_job.disk = float(config['resources']['disk'])
db_job.memory = float(config['resources']['memory'])
calculation.set_job(db_job)
# Clean job remains
job.clear()
if JOB is not None:
JOB.value = ''.encode('utf-8')