Source code for scine_chemoton.gears.rerun_calculations

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

from json import dumps
from copy import deepcopy
from typing import List, Dict, Set, Tuple, Optional, Any

# Third party imports
import scine_database as db
from scine_database.queries import (
    model_query, get_calculation_id_from_structure, stop_on_timeout, query_calculation_in_id_set
)
import scine_utilities as utils

# Local application imports
from scine_chemoton.gears import Gear
from scine_chemoton.utilities.calculation_creation_helpers import finalize_calculation
from scine_chemoton.gears.network_refinement.enabling import EnableCalculationResults, PlaceHolderCalculationEnabling
from scine_chemoton.utilities.place_holder_model import (
    ModelNotSetError,
    construct_place_holder_model,
    PlaceHolderModelType
)


[docs]class RerunCalculations(Gear): """ This gear re-starts (failed) calculations with different settings, a new model, or a different job based on calculations which were already run. The set of calculations to "re-run" can be characterized through setting, the mode, the job-order, the resulting calculation status, and the comment. """ restart_info_key = "restart_ids"
[docs] class Options(Gear.Options): """ The options for the RerunCalculations Gear. """ __slots__ = ("_parent", "old_job_settings", "new_job_settings", "old_status", "old_job", "new_job", "change_model", "new_model", "comment_filter", "calculation_id_list", "old_settings_to_remove", "legacy_existence_check") def __init__(self, parent: Optional[Any] = None) -> None: self._parent = parent # best be first member to be set because of __setattr__ super().__init__() self.old_job_settings: utils.ValueCollection = utils.ValueCollection() """ utils.ValueCollection The settings of the original calculation. This dictionary does not have to be complete. It is only used to reduce the number of calculations re-run by the gear. """ self.new_job_settings: utils.ValueCollection = utils.ValueCollection() """ utils.ValueCollection New settings for the calculation. The settings from this dictionary are used to update the original calculations settings. """ self.old_status: str = "failed" """ str The calculation status of the calculations to be re-run. """ self.old_job: db.Job = db.Job("scine_react_complex_nt2") """ db.Job The original job of the calculations. """ self.new_job: db.Job = db.Job("scine_react_complex_nt2") """ db.Job The new job to re-run the calculations with. """ self.change_model: bool = False """ bool If true, the model for the newly set up calculation is updated. """ self.new_model: db.Model = construct_place_holder_model() """ db.Model The new calculation model. The keyword change_model must be set to True in order to set the new model. """ self.comment_filter: List[str] = [] """ List[str] A list of comments that is used to further identify calculations that should be re-run. Example comments: * No more negative eigenvalues * TS has incorrect number of imaginary frequencies. * Self consistent charge iterator did not converge """ self.calculation_id_list: List[db.ID] = [] """ List[db.ID] A list of calculation ids to consider for rerunning. If empty, all calculations are looped. """ self.old_settings_to_remove: List[str] = [] """ List[str] A list of settings to remove from the original calculation settings. """ self.legacy_existence_check = False """ bool If True, the gear will check if the calculation already exists in the database without looking up the restart_information field. This is the old behavior of the gear and should only be used if the restart_information field is not available or re-runs have been carried out with the old gear. """ def __setattr__(self, item, value): """ Overwritten standard method to mark the cache as out of date if any option is changed. """ super().__setattr__(item, value) if self._parent is not None: self._parent._recreate_cache = True self._parent._calculation_cache = list()
options: Options def __init__(self) -> None: super().__init__() self.options = self.Options(parent=self) self._required_collections = ["calculations", "structures"] self._calculation_cache: List[db.ID] = list() self._already_set_up_calculations: Dict[Tuple, Set[str]] = dict() self.result_enabling: EnableCalculationResults = PlaceHolderCalculationEnabling() """ Optional[EnableCalculationResults] If this calculation result enabling policy is given, the result of an already existing calculation is enabled again (if disabled previously). """ self.__have_printed_note: bool = False def _loop_impl(self): if self.options.change_model and isinstance(self.options.new_model, PlaceHolderModelType): raise ModelNotSetError("Specified to change the model, but have not specified the new model") if not self.options.legacy_existence_check and not self.__have_printed_note: print("Note: The calculation Re-Run gear will check the existence of old calculations by their restart\n" "information. Note that this could lead to duplicated calculations if the already existing" " calculation was\n" "created without registering in the restart information, e.g., through another gear or an older\n" "chemoton version. Use the option 'legacy_existence_check' to ensure that this cannot happen.") self.__have_printed_note = True if self._identical_calculation_characterization(): return if not isinstance(self.result_enabling, PlaceHolderCalculationEnabling): self.result_enabling.initialize_collections(self._manager) cache_update = dict() encountered_identical_settings = False if self.options.calculation_id_list: for calculation_id in self.options.calculation_id_list: if self.stop_at_next_break_point: break old_calculation = db.Calculation(calculation_id, self._calculations) encountered_identical_settings = self._rerun_calculation(old_calculation, cache_update) else: for old_calculation in stop_on_timeout(self._calculations.iterate_calculations( dumps(self._get_calculation_selection()))): if self.stop_at_next_break_point: break old_calculation.link(self._calculations) encountered_identical_settings = self._rerun_calculation(old_calculation, cache_update) if encountered_identical_settings: print("Encountered identical settings when re-running calculations.\n" "This is a sign that 1) the new and old settings and model are identical, which does not make sense," "\nor 2) the calculation with the new settings encountered the same problem (e.g. convergence issue) " "as the old calculation.\n" "Hence, the new settings could not solve the problem.\n" "In either case this gear has not set up any new calculations." ) # Update the cache only after completing the cycle. self._update_already_set_up_cache(cache_update) def _rerun_calculation(self, old_calculation: db.Calculation, cache_update: Dict[Tuple[str, ...], Set[str]]) \ -> bool: self._calculation_cache.append(old_calculation.id()) # check if the comment given for the calculation corresponds to the problem that should be fixed. if not self._check_comment(old_calculation): return False # get old structures and settings old_structures = old_calculation.get_structures() new_settings = self._build_new_settings(old_calculation.get_settings().as_dict()) if not self.options.change_model and new_settings == old_calculation.get_settings().as_dict(): # identical settings return True auxiliaries = old_calculation.get_auxiliaries() # create new calculation model = self.options.new_model if self.options.change_model else self.options.model if self.options.legacy_existence_check: calculation_id = get_calculation_id_from_structure(self.options.new_job.order, old_structures, model, self._structures, self._calculations, new_settings, auxiliaries) else: id_selection = set([str(i) for i in self.get_restart_ids(old_calculation)]) calculation_id = query_calculation_in_id_set(id_selection, len(old_structures), self._calculations, old_structures, new_settings, auxiliaries, self.options.new_job.order) if calculation_id is None: new_calculation = db.Calculation() new_calculation.link(self._calculations) new_calculation.create(model, self.options.new_job, old_structures) new_calculation.set_settings(utils.ValueCollection(new_settings)) new_calculation.set_auxiliaries(auxiliaries) if not self.options.legacy_existence_check: counter = len(self.get_restart_ids(old_calculation)) old_calculation.set_restart_information(f"{RerunCalculations.restart_info_key}_{counter}", new_calculation.id()) self._add_to_already_set_up_calculations(new_calculation, cache_update) finalize_calculation(new_calculation, self._structures) else: calculation = db.Calculation(calculation_id, self._calculations) # type: ignore self._add_to_already_set_up_calculations(calculation, cache_update) if not isinstance(self.result_enabling, PlaceHolderCalculationEnabling): self.result_enabling.process(calculation) return False def _check_comment(self, old_calculation: db.Calculation) -> bool: if not self.options.comment_filter: # no comments specified, we are not filtering based on comments return True comment = old_calculation.get_comment() for message in self.options.comment_filter: if message in comment: return True return False def _build_new_settings(self, old_settings: Dict[str, Any]) -> Dict[str, Any]: new_settings = deepcopy(old_settings) for setting in self.options.old_settings_to_remove: new_settings.pop(setting, None) new_settings.update(self.options.new_job_settings) return new_settings def _get_calculation_selection(self): calc_id_str = [{"$oid": str_id.string()} for str_id in self._calculation_cache] selection = { "$and": [ {"_id": {"$nin": calc_id_str}}, {"status": str(self.options.old_status)}, {"job.order": self.options.old_job.order}, {"analysis_disabled": False}, ] + self._expand_settings_query(self.options.old_job_settings) + model_query(self.options.model) } return selection @staticmethod def _expand_settings_query(settings: utils.ValueCollection) -> List[Dict[str, Any]]: query_list = [] for key in settings.keys(): item = {"settings." + str(key): settings[key]} query_list.append(item) return query_list def _identical_calculation_characterization(self) -> bool: if self.options.model != self.options.new_model and self.options.change_model: return False if self.options.old_job != self.options.new_job: return False for key in self.options.new_job_settings.keys(): if key not in self.options.old_job_settings.keys(): return False if self.options.old_job_settings[key] != self.options.new_job_settings[key]: # type: ignore return False if self.options.old_settings_to_remove and any(setting in self.options.old_job_settings for setting in self.options.old_settings_to_remove): return False print("The calculation rerun gear detected that it would set up identical calculations, e.g.,") print("the characterization of the new calculation does not change the characterization of the") print("original one! The gear will do nothing.") return True @staticmethod def _get_caching_key(structure_id_list: List[db.ID]) -> Tuple[str, ...]: s_id_str = [s_id.string() for s_id in structure_id_list] return tuple((*s_id_str, )) def _add_to_already_set_up_calculations(self, calculation: db.Calculation, caching_map: Dict[Tuple[str, ...], Set[str]]): key = self._get_caching_key(calculation.get_structures()) if key in caching_map: caching_map[key].add(calculation.id().string()) else: caching_map[key] = {calculation.id().string()} def _calculation_ids_already_set_up(self, structure_ids: List[db.ID]) -> Set[str]: calculation_str_ids: Set[str] = set() key = self._get_caching_key(structure_ids) if key in self._already_set_up_calculations: calculation_str_ids = self._already_set_up_calculations[key] return calculation_str_ids def _update_already_set_up_cache(self, update: Dict[Tuple, Set[str]]): for key in update.keys(): if key in self._already_set_up_calculations: self._already_set_up_calculations[key] = self._already_set_up_calculations[key].union(update[key]) else: self._already_set_up_calculations[key] = update[key]
[docs] @staticmethod def get_restart_ids(calculation: db.Calculation) -> List[db.ID]: restart_info = calculation.get_restart_information() counter = 0 ids: List[db.ID] = [] while True: i = restart_info.get(f"{RerunCalculations.restart_info_key}_{counter}", None) if i is None: return ids ids.append(i) counter += 1