#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
from json import dumps
from typing import Union, Tuple, Dict, List, Optional
from warnings import warn
from copy import copy
import numpy as np
# Third party imports
import scine_database as db
from scine_database.queries import (
stop_on_timeout,
calculation_exists_in_structure,
get_calculation_id_from_structure,
optimized_labels,
)
import scine_utilities as utils
import scine_molassembler as masm
# Local application imports
from . import Gear
from ..utilities.calculation_creation_helpers import finalize_calculation
from ..utilities.masm import mol_from_cbor, mols_from_properties
from .network_refinement.enabling import AggregateEnabling
from scine_chemoton.utilities.place_holder_model import (
construct_place_holder_model,
PlaceHolderModelType
)
[docs]class BasicAggregateHousekeeping(Gear):
"""
This Gear updates all relevant Structures stored in the database with
bond orders and graph representations.
This data is then used to sort each Structure into existing Compounds/Flasks or to
create a new Compound/Flask if no appropriate one exists.
Attributes
----------
options : BasicAggregateHousekeeping.Options
The options for the BasicAggregateHousekeeping Gear.
Notes
-----
Checks for all 'user_optimized', 'minimum_optimized', and 'complex_optimized' Structures
that do not have an Aggregate assigned. The Gear then generates bond orders and molecular graphs
('masm_cbor_graph', and 'masm_decision_list') if they are not yet present.
Using the molecular graphs the Structures are then sorted into Compounds/Flasks.
"""
def __init__(self) -> None:
super().__init__()
self._required_collections = ["calculations", "compounds", "flasks",
"properties", "structures", "reactions"]
self._compound_map: Dict[int, Dict[Tuple[int, int, int], List[db.ID]]] = dict()
"""
Caching map from n_atoms, n_molecules, charge, multiplicity to compound ids.
"""
self._flask_map: Dict[int, Dict[Tuple[int, int, int], List[db.ID]]] = dict()
"""
Caching map from n_atoms, n_molecules, charge, multiplicity to flask ids.
"""
self._unique_structures: Dict[str, List[Tuple[db.ID, str]]] = dict()
"""
Caching map from compound id to list of structure ids and decision lists.
"""
[docs] class Options(Gear.Options):
"""
The options for the BasicAggregateHousekeeping Gear.
"""
__slots__ = [
"bond_order_job",
"bond_order_settings",
"graph_job",
"graph_settings",
"exclude_misguided_conformer_optimizations",
"aggregate_enabling"
]
def __init__(self) -> None:
super().__init__()
self.bond_order_job: db.Job = db.Job("scine_bond_orders")
"""
db.Job (Scine::Database::Calculation::Job)
The Job used for the bond order calculations.
The default is: the 'scine_bond_orders' order on a single core.
"""
self.bond_order_settings: utils.ValueCollection = utils.ValueCollection()
"""
utils.ValueCollection
Additional settings passed to the bond order calculations.
Empty by default.
"""
self.graph_job: db.Job = db.Job("graph")
"""
db.Job (Scine::Database::Calculation::Job)
The Job used for the graph calculations.
The default is: the 'graph' order on a single core.
"""
self.graph_settings: utils.ValueCollection = utils.ValueCollection()
"""
Dict[str, str]
Additional settings passed to the graph calculations.
Empty by default.
"""
self.exclude_misguided_conformer_optimizations: bool = True
"""
bool
If true, no additional aggregate is created if the structure was generated only by geometry
optimization. The default policy does nothing.
"""
self.aggregate_enabling: AggregateEnabling = AggregateEnabling()
"""
AggregateEnabling
If a structure is added to an aggregate, the analysis of the corresponding aggregate is enabled
according to this policy.
"""
options: Options
def _create_unique_structure_map(self):
for compound in stop_on_timeout(self._compounds.iterate_all_compounds()):
compound.link(self._compounds)
structure_ids = compound.get_structures()
key = compound.id().string()
self._unique_structures[key] = []
for s_id in structure_ids:
structure = db.Structure(s_id, self._structures)
if structure.get_label() in [db.Label.DUPLICATE, db.Label.MINIMUM_GUESS]:
continue
if not structure.has_graph("masm_decision_list"):
continue
self._unique_structures[key].append((s_id, structure.get_graph("masm_decision_list")))
def _add_to_unique_structures(self, key: str, structure: db.Structure):
if key in self._unique_structures:
self._unique_structures[key].append((structure.id(), structure.get_graph("masm_decision_list")))
else:
self._unique_structures[key] = [(structure.id(), structure.get_graph("masm_decision_list"))]
def _create_compound_map(self):
for compound in stop_on_timeout(self._compounds.iterate_all_compounds()):
compound.link(self._compounds)
self._add_to_map(compound, self._compound_map)
def _create_flask_map(self):
for flask in stop_on_timeout(self._flasks.iterate_all_flasks()):
flask.link(self._flasks)
self._add_to_map(flask, self._flask_map)
def _add_to_map(self, aggregate: Union[db.Compound, db.Flask], cache_map: Dict):
sum_formula, n_molecules, charge, multiplicity, mols = self._get_aggregate_info(aggregate)
key = (n_molecules, charge, multiplicity)
if sum_formula in cache_map:
if key in cache_map[sum_formula]:
if (aggregate.id(), mols) in cache_map[sum_formula][key]:
return
cache_map[sum_formula][key].append((aggregate.id(), mols))
return
else:
cache_map[sum_formula][key] = [(aggregate.id(), mols)]
return
cache_map[sum_formula] = {key: [(aggregate.id(), mols)]}
def _get_sum_formula(self, mols: List[masm.Molecule]) -> str:
sum_formulas = []
for m in mols:
sum_formulas.append(m.graph.__repr__().split()[-1])
sum_formulas.sort()
return ','.join(sum_formulas)
@staticmethod
def _get_aggregate_from_map(sum_formula: str, n_molecules: int, charge: int, multiplicity: int,
mols: List[masm.Molecule], cache_map: Dict) -> Union[db.ID, None]:
if sum_formula in cache_map:
key = (n_molecules, charge, multiplicity)
if key in cache_map[sum_formula]:
for a_id, ref_mols in cache_map[sum_formula][key]:
remaining_mols = copy(ref_mols)
for m in mols:
idx = remaining_mols.index(m) if m in remaining_mols else None
if idx is not None:
remaining_mols.pop(idx)
else:
break
else:
return a_id
return None
def _check_graph(self, structure: db.Structure) -> Optional[str]:
if structure.has_graph("masm_cbor_graph") and structure.has_graph("masm_decision_list"):
# Check if graph exists
graph = structure.get_graph("masm_cbor_graph")
label = structure.get_label()
if ";" in graph:
if label != db.Label.COMPLEX_OPTIMIZED and label != db.Label.USER_COMPLEX_OPTIMIZED:
warn(f"Structure '{str(structure.id())}' received incorrect label '{str(label)}', "
f"according to its graph, it is actually a complex.")
else:
if label == db.Label.COMPLEX_OPTIMIZED or label == db.Label.USER_COMPLEX_OPTIMIZED:
warn(f"Structure '{str(structure.id())}' received incorrect label '{str(label)}', "
f"according to its graph, it is actually NOT a complex.")
return graph
if structure.has_properties("bond_orders"):
self._setup_graph_job(structure.id())
return None
self._setup_bond_order_job(structure.id())
return None
def _label_structure_as_duplicate(self, structure: db.Structure, duplicate_id: db.ID):
structure.set_label(db.Label.DUPLICATE)
structure.set_comment("Structure is a duplicate of {:s}.".format(str(duplicate_id)))
structure.set_original(duplicate_id)
def _check_for_misguided_conformer_optimization(self, structure: db.Structure) -> bool:
# TODO this query may become a bottleneck for huge databases.
# avoiding a query into the calculations would be much preferred
selection = {
"$and": [
{"status": {"$in": ["complete", "failed", "analyzed"]}},
{"results.structures": {"$oid": structure.id().string()}},
{"results.elementary_steps": {"$size": 0}},
]
}
hit = None
irrelevant_structure = False
if self.options.exclude_misguided_conformer_optimizations:
hit = self._calculations.get_one_calculation(dumps(selection))
if hit is not None:
# Check if there is a calculation that generated the compound
minimization = hit
minimization.link(self._calculations)
starting_structure = db.Structure(minimization.get_structures()[0], self._structures)
if starting_structure.has_aggregate():
# If the starting structure has an aggregate, then the
# result should also be part of the same aggregate
structure.set_label(db.Label.IRRELEVANT)
structure.set_comment(
f"Result of misguided optimization not staying within the origin aggregate "
f"{str(starting_structure.get_aggregate())}."
)
irrelevant_structure = True
return irrelevant_structure
def _store_as_compound(self, structure: db.Structure):
compound = db.Compound(db.ID(), self._compounds)
compound.create([structure.id()], exploration_disabled=True)
structure.set_aggregate(compound.id())
self._add_to_map(compound, self._compound_map)
def _store_as_flask(self, structure: db.Structure):
flask = db.Flask(db.ID(), self._flasks)
flask.create([structure.id()], [], exploration_disabled=True)
structure.set_aggregate(flask.id())
self._add_to_map(flask, self._flask_map)
def _loop_impl(self):
if not self._compound_map:
self._create_compound_map()
if not self._flask_map:
self._create_flask_map()
if not self._unique_structures:
self._create_unique_structure_map()
self._check_for_aggregates()
def _check_for_aggregates(self):
# Setup query for optimized structures without aggregate
selection = {
"$and": [
{"label": {"$in": optimized_labels()}},
{"aggregate": ""},
{"analysis_disabled": {"$ne": True}},
]
}
# Loop over all results
for structure in stop_on_timeout(self._structures.iterate_structures(dumps(selection))):
structure.link(self._structures)
if self.stop_at_next_break_point:
return
# # # Continue if a job for has been setup
graph = self._check_graph(structure)
if graph is None:
continue
# If a graph exists but no aggregate is registered, generate a new aggregate
# or append the structure to an existing compound
try:
matching_aggregate = self._get_matching_aggregate(structure)
except IndexError:
print("Structure: " + structure.id().string() + " has invalid Molassember serialization.")
print("The structure is set as IRRELEVANT for further analysis.")
structure.set_label(db.Label.IRRELEVANT)
if matching_aggregate is not None:
# Check if duplicate in same aggregate
duplicate = self._query_duplicate(structure, matching_aggregate)
if duplicate is not None:
self._label_structure_as_duplicate(structure, duplicate.id())
else:
# only add aggregate info for non-duplicates
structure.set_aggregate(matching_aggregate.id())
matching_aggregate.add_structure(structure.id())
self.options.aggregate_enabling.process(matching_aggregate)
else:
# Create a new aggregate but only if the structure is not a misguided conformer optimization
# TODO this query may become a bottleneck for huge databases.
# avoiding a query into the calculations would be much preferred
irrelevant_structure = self._check_for_misguided_conformer_optimization(structure)
if irrelevant_structure:
continue
if ";" in graph:
self._store_as_flask(structure)
else:
self._store_as_compound(structure)
def _setup_bond_order_job(self, structure_id: db.ID) -> None:
"""
Checks for already existing bond_order job and if none present, sets up one for given structure
"""
if calculation_exists_in_structure(self.options.bond_order_job.order, [structure_id], self.options.model,
self._structures, self._calculations):
return
# Setup calculation of a graph
calculation = db.Calculation(db.ID(), self._calculations)
calculation.create(self.options.model, self.options.bond_order_job, [structure_id])
if self.options.bond_order_settings:
calculation.set_settings(self.options.bond_order_settings)
finalize_calculation(calculation, self._structures)
def _setup_graph_job(self, structure_id: db.ID) -> None:
"""
Checks for already existing graph job and if none present, sets up one for given structure
"""
if calculation_exists_in_structure(self.options.graph_job.order, [structure_id], self.options.model,
self._structures, self._calculations):
return
# Setup calculation of a graph
calculation = db.Calculation(db.ID(), self._calculations)
calculation.create(self.options.model, self.options.graph_job, [structure_id])
if self.options.graph_settings:
calculation.set_settings(self.options.graph_settings)
finalize_calculation(calculation, self._structures)
def _get_aggregate_info(
self, aggregate: Union[db.Compound, db.Flask]
) -> Tuple[str, int, int, int, List[masm.Molecule]]:
# We could even think about adding the graph here too ...
centroid = db.Structure(aggregate.get_centroid(), self._structures)
try:
graph = centroid.get_graph("masm_cbor_graph")
mols = [mol_from_cbor(m) for m in graph.split(";")]
except RuntimeError as e:
tmp = mols_from_properties(centroid, self._properties)
if tmp is None:
raise RuntimeError(
f'Error: Graph deserialization and recreation failed for {centroid.id().string}'
) from e
mols = tmp
sum_formula = self._get_sum_formula(mols)
return sum_formula, len(graph.split(";")), centroid.get_charge(), centroid.get_multiplicity(), \
mols
def _get_matching_aggregate(self, structure: db.Structure) -> Union[db.Compound, db.Flask, None]:
"""
Queries database for matching aggregate of the given structure based on:
* graph (irrespective of order for flasks)
* charge
* multiplicity
Returns None if no matching aggregate in DB yet
"""
try:
graph = structure.get_graph("masm_cbor_graph")
mols = [mol_from_cbor(m) for m in graph.split(";")]
except RuntimeError as e:
tmp = mols_from_properties(structure, self._properties)
if tmp is None:
raise RuntimeError(
f'Error: Graph deserialization and recreation failed for {structure.id().string}'
) from e
mols = tmp
charge = structure.get_charge()
multiplicity = structure.get_multiplicity()
sum_formula = self._get_sum_formula(mols)
n_molecules = len(mols)
if n_molecules > 1:
f_id = self._get_aggregate_from_map(
sum_formula, n_molecules, charge, multiplicity, mols, self._flask_map
)
if f_id:
return db.Flask(f_id, self._flasks)
else:
c_id = self._get_aggregate_from_map(
sum_formula, n_molecules, charge, multiplicity, mols, self._compound_map
)
if c_id:
return db.Compound(c_id, self._compounds)
return None
def _query_duplicate(self, structure: db.Structure, aggregate: Union[db.Compound, db.Flask]) \
-> Union[db.Structure, None]:
"""
Check based on decision lists if the given aggregate has a duplicate structure to the given structure
NOTE: not implemented for flasks yet
"""
if isinstance(aggregate, db.Flask):
# currently not possible
return None
key = aggregate.id().string()
if key in self._unique_structures:
similar_structures = self._unique_structures[key]
decision_list = structure.get_graph("masm_decision_list")
model = structure.get_model()
for sid, ref_decision_list in similar_structures:
# Check that similar structure is not identical to the given ID
if sid == structure.id():
continue
if not masm.JsonSerialization.equal_decision_lists(decision_list, ref_decision_list):
continue
similar_structure = db.Structure(sid, self._structures)
if similar_structure.get_model() == model:
return similar_structure
self._add_to_unique_structures(key, structure)
return None
[docs]class ThermoAggregateHousekeeping(BasicAggregateHousekeeping):
"""
This Gear updates all relevant Structures stored in the database with frequencies,
bond orders and graph representations.
This data is then used to sort each Structure into existing Compounds/Flasks or to
create a new Compound/Flask if no appropriate one exists.
To be sorted into an Aggregate, a Structure must have no imaginary frequencies
above the given absolute frequency threshold.
Otherwise, a validation job is setup or, if this has been attempted already,
the exploration and analysis of this Structure are disabled.
Attributes
----------
options : ThermoAggregateHousekeeping.Options
The options for the ThermoAggregateHousekeeping Gear.
Notes
-----
Checks for all 'user_optimized', 'minimum_optimized', and 'complex_optimized' Structures
that do not have an Aggregate assigned and are enabled for analysis.
The Gear then generates bond orders and molecular graphs ('masm_cbor_graph', and 'masm_decision_list')
if they are not yet present.
Using the molecular graphs, the Structures are then sorted into Compounds/Flasks.
"""
[docs] class Options(BasicAggregateHousekeeping.Options):
"""
The options for the ThermoAggregateHouseKeeping Gear.
"""
__slots__ = [
"validation_job",
"validation_settings",
"structure_model",
"absolute_frequency_threshold"
]
def __init__(self) -> None:
super().__init__()
self.validation_job: db.Job = db.Job("scine_geometry_validation")
"""
db.Job
The Job used for the geometry validation calculations.
The default is: the 'scine_geometry_validation' order on a single core.
"""
self.validation_settings: utils.ValueCollection = utils.ValueCollection()
"""
utils.ValueCollection
Additional settings passed to the geometry validation order calculation.
Empty by default.
"""
self.structure_model: db.Model = construct_place_holder_model()
"""
Optional[db.Model]
Validation calculations are only started for structures with the given model.
"""
self.absolute_frequency_threshold: float = 50.0
"""
float
Abs Frequency threshold in cm^-1.
"""
options: Options
def _check_for_aggregates(self):
# Setup query for optimized structures without aggregate
selection = {"$and": [
{"label": {"$in": optimized_labels()}},
{"aggregate": ""},
{"analysis_disabled": {"$ne": True}},
]
}
# Loop over all results
for structure in stop_on_timeout(self._structures.iterate_structures(dumps(selection))):
structure.link(self._structures)
if self.stop_at_next_break_point:
return
# Structure Model Check, might be cached
if not isinstance(self.options.structure_model, PlaceHolderModelType):
if structure.get_model() != self.options.structure_model:
continue
# # # Start Graph Check
# # # Continue if a job for has been setup
graph = self._check_graph(structure)
if graph is None:
continue
# If a graph exists but no aggregate is registered, generate a new aggregate
# or append the structure to an existing compound
# # # Look, if aggregate is there
try:
matching_aggregate = self._get_matching_aggregate(structure)
except IndexError:
print("Structure: " + structure.id().string() + " has invalid Molassember serialization.")
print("The structure is set as IRRELEVANT for further analysis.")
structure.set_label(db.Label.IRRELEVANT)
if matching_aggregate is not None:
# # # Frequency check
keep_going, valid_minimum = self._check_frequencies_and_validation_job(structure)
# Move on to next structure as either a validation job has been setup or
# or the structure was attempted to be validated already
if not keep_going:
continue
# Check if duplicate in same aggregate
duplicate = self._query_duplicate(structure, matching_aggregate)
if duplicate is not None:
self._label_structure_as_duplicate(structure, duplicate.id())
elif valid_minimum:
# only add aggregate info for non-duplicates
structure.set_aggregate(matching_aggregate.id())
matching_aggregate.add_structure(structure.id())
# # # Few more checks and make new aggregate
else:
# # # Frequency check
keep_going, valid_minimum = self._check_frequencies_and_validation_job(structure)
# Move on to next structure as either a validation job has been setup or
# or the structure was attempted to be validated already
if not keep_going:
continue
# Create a new aggregate but only if the structure is not a misguided conformer optimization
# # # Check for misguided conformation jobs
irrelevant_structure = self._check_for_misguided_conformer_optimization(structure)
if irrelevant_structure:
continue
# # # End misguided conformation job
# # # Sort to correct aggregate, only if it is a valid minimum
if ";" in graph and valid_minimum:
self._store_as_flask(structure)
elif valid_minimum:
self._store_as_compound(structure)
def _check_frequencies_and_validation_job(self, structure: db.Structure) -> Tuple[bool, bool]:
"""
Checks if structure is minimum, sets up a validation job if not setup yet and disables the structure,
if it is not a valid minimum and a validation has been attempted already.
The latter two should continue the loop.
Parameters
----------
structure : db.Structure
The structure to be checked.
Returns
-------
keep_going, valid_minimum : Tuple[bool, bool]
keep_going indicates if the parent loop should continue or look at the next structure.
valid_minimum indicates if the structure is a valid minimum.
"""
# Default return values
valid_minimum = False
keep_going = True
if structure.has_properties("frequencies"):
valid_minimum = self._is_valid_minium(structure)
# # # Setting up validation job
# Single atom is a valid minium, but has no frequencies
if len(structure.get_atoms()) == 1:
valid_minimum = True
elif not valid_minimum and len(structure.get_atoms()) > 1:
# Not attempted validation
validation_setup = self._setup_validation_job(structure.id())
if validation_setup:
keep_going = False
# Disable structure from being analyzed
# Only if validation job ran through and frequencies not proper for a minimum
elif not validation_setup and structure.has_properties("frequencies"):
structure.disable_analysis()
structure.disable_exploration()
structure.set_comment(
"Structure has imaginary frequencies larger than {:.0f} cm^-1".format(
self.options.absolute_frequency_threshold) +
"\nDisabled.")
keep_going = False
return keep_going, valid_minimum
def _is_valid_minium(self, structure: db.Structure) -> bool:
frequencies = self._properties.get_vector_property(structure.get_property("frequencies"))
frequencies.link(self._properties)
# Get minimal frequency
min_frequency = np.min(frequencies.get_data()[0])
if min_frequency < 0.0 and abs(
min_frequency) > self.options.absolute_frequency_threshold *\
utils.HARTREE_PER_INVERSE_CENTIMETER / (2 * utils.PI):
return False
else:
return True
def _setup_validation_job(self, structure_id: db.ID) -> bool:
"""
Checks for already existing validation_order job and if none present, sets up one for given structure
"""
success = False
val_calc_id = get_calculation_id_from_structure(self.options.validation_job.order, [structure_id],
self.options.model, self._structures, self._calculations)
# Setup calculation of validation
if val_calc_id is None:
calculation = db.Calculation(db.ID(), self._calculations)
calculation.create(self.options.model, self.options.validation_job, [structure_id])
if self.options.validation_settings:
calculation.set_settings(self.options.validation_settings)
finalize_calculation(calculation, self._structures)
success = True
else:
val_calc = db.Calculation(val_calc_id, self._calculations)
val_calc_status = val_calc.get_status()
# If the calculation is complete, setup failed
if val_calc_status in [db.Status.COMPLETE, db.Status.FAILED]:
success = False
# Any other states indicates
else:
success = True
return success