Source code for scine_chemoton.gears.compound

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
from json import dumps
from typing import Union, Tuple, Dict, List
from warnings import warn

# Third party imports
import scine_database as db
import scine_utilities as utils
import scine_molassembler as masm

# Local application imports
from . import Gear
from ..utilities.queries import stop_on_timeout, calculation_exists_in_structure
from ..utilities.calculation_creation_helpers import finalize_calculation


[docs]class BasicAggregateHousekeeping(Gear): """ This Gear updates all relevant Structures stored in the database with bond orders and graph representations. This data is then used to sort each Structure into existing Compounds/Flasks or to create a new Compound/Flask if no appropriate one exists. Attributes ---------- options : BasicAggregateHousekeeping.Options The options for the BasicAggregateHousekeeping Gear. Notes ----- Checks for all 'user_optimized', 'minimum_optimized', and 'complex_optimized' Structures that do not have an Aggregate assigned. The Gear then generates bond orders and molecular graphs ('masm_cbor_graph', and 'masm_decision_list') if they are not yet present. Using the molecular graphs the Structures are then sorted into Compounds/Flasks. """ def __init__(self): super().__init__() self.options = self.Options() self._required_collections = ["calculations", "compounds", "flasks", "properties", "structures", "reactions"] self._compound_map: Dict[int, Dict[Tuple[int, int, int], List[db.ID]]] = dict() """ Caching map from n_atoms, n_molecules, charge, multiplicity to compound ids. """ self._flask_map: Dict[int, Dict[Tuple[int, int, int], List[db.ID]]] = dict() """ Caching map from n_atoms, n_molecules, charge, multiplicity to flask ids. """ self._unique_structures: Dict[str, List[Tuple[db.ID, str]]] = dict() """ Caching map from compound id to list of structure ids and decision lists. """
[docs] class Options: """ The options for the BasicAggregateHouseKeeping Gear. """ __slots__ = [ "cycle_time", "model", "bond_order_job", "bond_order_settings", "graph_job", "graph_settings", "exclude_misguided_conformer_optimizations" ] def __init__(self): self.cycle_time = 10 """ int The minimum number of seconds between two cycles of the Gear. Cycles are finished independent of this option, thus if a cycle takes longer than the cycle_time will effectively lead to longer cycle times and not cause multiple cycles of the same Gear. """ self.model: db.Model = db.Model("PM6", "PM6", "") """ db.Model (Scine::Database::Model) The Model used for the bond order calculations. The default is: PM6 using Sparrow. """ self.bond_order_job: db.Job = db.Job("scine_bond_orders") """ db.Job (Scine::Database::Calculation::Job) The Job used for the bond order calculations. The default is: the 'scine_bond_orders' order on a single core. """ self.bond_order_settings: utils.ValueCollection = utils.ValueCollection() """ utils.ValueCollection Additional settings passed to the bond order calculations. Empty by default. """ self.graph_job: db.Job = db.Job("graph") """ db.Job (Scine::Database::Calculation::Job) The Job used for the graph calculations. The default is: the 'graph' order on a single core. """ self.graph_settings: utils.ValueCollection = utils.ValueCollection() """ Dict[str, str] Additional settings passed to the graph calculations. Empty by default. """ self.exclude_misguided_conformer_optimizations: bool = True """ bool If true, no additional aggregate is created if the structure was generated only by geometry optimization. """
def _create_unique_structure_map(self): for compound in stop_on_timeout(self._compounds.iterate_all_compounds()): compound.link(self._compounds) structure_ids = compound.get_structures() key = compound.id().string() self._unique_structures[key] = [] for s_id in structure_ids: structure = db.Structure(s_id, self._structures) if structure.get_label() in [db.Label.DUPLICATE, db.Label.MINIMUM_GUESS]: continue self._unique_structures[key].append((s_id, structure.get_graph("masm_decision_list"))) def _add_to_unique_structures(self, structure: db.Structure): key = structure.get_aggregate().string() if key in self._unique_structures: self._unique_structures[key].append((structure.id(), structure.get_graph("masm_decision_list"))) else: self._unique_structures[key] = [(structure.id(), structure.get_graph("masm_decision_list"))] def _create_compound_map(self): for compound in stop_on_timeout(self._compounds.iterate_all_compounds()): compound.link(self._compounds) self._add_to_map(compound, self._compound_map) def _create_flask_map(self): for flask in stop_on_timeout(self._flasks.iterate_all_flasks()): flask.link(self._flasks) self._add_to_map(flask, self._flask_map) def _add_to_map(self, aggregate: Union[db.Compound, db.Flask], cache_map: Dict): n_atoms, n_molecules, charge, multiplicity, graph = self._get_aggregate_info(aggregate) key = (n_molecules, charge, multiplicity) if n_atoms in cache_map: if key in cache_map[n_atoms]: cache_map[n_atoms][key].append((aggregate.id(), graph)) return else: cache_map[n_atoms][key] = [(aggregate.id(), graph)] return cache_map[n_atoms] = {key: [(aggregate.id(), graph)]} def _get_aggregate_info(self, aggregate: Union[db.Compound, db.Flask]) -> Tuple[int, int, int, int, str]: # We could even think about adding the graph here too ... centroid = db.Structure(aggregate.get_centroid(), self._structures) graph = centroid.get_graph("masm_cbor_graph") return len(centroid.get_atoms()), len(graph.split(";")), centroid.get_charge(), centroid.get_multiplicity(),\ graph @staticmethod def _get_aggregate_from_map(n_atoms: int, n_molecules: int, charge: int, multiplicity: int, graph, cache_map: Dict) -> Union[db.ID, None]: if n_atoms in cache_map: key = (n_molecules, charge, multiplicity) if key in cache_map[n_atoms]: for a_id, ref_graph in cache_map[n_atoms][key]: if masm.JsonSerialization.equal_molecules(ref_graph, graph): return a_id return None def _loop_impl(self): if not self._compound_map: self._create_compound_map() if not self._flask_map: self._create_flask_map() if not self._unique_structures: self._create_unique_structure_map() self._check_for_aggregates() self._check_compounds_in_flasks() def _check_compounds_in_flasks(self): """ Check for Flasks without Compounds. Complete them if ALL sub-structures are assigned a Compound. """ # Check for flasks without compounds selection = { "compounds": {"$size": 0} } for flask in stop_on_timeout(self._flasks.iterate_flasks(dumps(selection))): flask.link(self._flasks) flask_centroid = db.Structure(flask.get_centroid(), self._structures) if not flask_centroid.has_graph("masm_cbor_graph") or not flask_centroid.has_graph("masm_idx_map"): warn(f"Centroid {str(flask_centroid.id())} of flask {str(flask.id())} is missing a graph") continue graphs = flask_centroid.get_graph("masm_cbor_graph").split(';') compounds = [] n_atoms_per_molecule = self.get_n_atoms(flask_centroid) n_graphs_missing = len(n_atoms_per_molecule) for i, n_atoms in enumerate(n_atoms_per_molecule): if n_atoms not in self._compound_map: break for key in self._compound_map[n_atoms]: compound_list = self._compound_map[n_atoms][key] for c_id, current in compound_list: # we are looping in reverse, so we can pop the element if identical while looping # we loop through because one compound could be part of flask multiple times # e.g. solute with 2 water molecules if masm.JsonSerialization.equal_molecules(graphs[i], current): compounds.append(c_id) n_graphs_missing -= 1 if n_graphs_missing == 0: flask.set_compounds(compounds) break def _check_for_aggregates(self): # Setup query for optimized structures without aggregate selection = { "$and": [ { "$or": [ {"label": "complex_optimized"}, {"label": "minimum_optimized"}, {"label": "user_optimized"}, ] }, {"aggregate": ""}, {"analysis_disabled": {"$ne": True}}, ] } # Loop over all results for structure in stop_on_timeout(self._structures.iterate_structures(dumps(selection))): structure.link(self._structures) if structure.has_graph("masm_cbor_graph"): # Check if graph exists graph = structure.get_graph("masm_cbor_graph") label = structure.get_label() if ";" in graph: if label != db.Label.COMPLEX_OPTIMIZED: warn(f"Structure '{str(structure.id())}' received incorrect label '{str(label)}', " f"according to its graph, it is actually a complex.") else: if label == db.Label.COMPLEX_OPTIMIZED: warn(f"Structure '{str(structure.id())}' received incorrect label '{str(label)}', " f"according to its graph, it is actually NOT a complex.") elif structure.has_properties("bond_orders"): self._setup_graph_job(structure.id()) continue else: self._setup_bond_order_job(structure.id()) continue # If a graph exists but no aggregate is registered, generate a new aggregate # or append the structure to an existing compound matching_aggregate = self._get_matching_aggregate(structure) if matching_aggregate is not None: structure.set_aggregate(matching_aggregate.id()) matching_aggregate.add_structure(structure.id()) # Check if duplicate in same aggregate duplicate = self._query_duplicate(structure, matching_aggregate) if duplicate is not None: structure.set_label(db.Label.DUPLICATE) structure.set_comment("Structure is a duplicate of {:s}.".format(str(duplicate.id()))) structure.set_as_duplicate_of(duplicate.id()) else: # Create a new aggregate but only if the structure is not a misguided conformer optimization # TODO this query may become a bottleneck for huge databases. # avoiding a query into the calculations would be much preferred selection = { "$and": [ {"results.structures": {"$oid": structure.id().string()}}, {"results.elementary_steps": {"$size": 0}}, ] } hit = None if self.options.exclude_misguided_conformer_optimizations: hit = self._calculations.get_one_calculation(dumps(selection)) if hit is not None: # Check if there is a calculation that generated the compound minimization = hit minimization.link(self._calculations) starting_structure = db.Structure(minimization.get_structures()[0], self._structures) if starting_structure.has_aggregate(): # If the starting structure has an aggregate, then the # result should also be part of the same aggregate structure.set_label(db.Label.IRRELEVANT) structure.set_comment( f"Result of misguided optimization not staying within the origin aggregate " f"{str(starting_structure.get_aggregate())}." ) continue if ";" in graph: flask = db.Flask(db.ID(), self._flasks) flask.create([structure.id()], []) flask.disable_exploration() structure.set_aggregate(flask.id()) self._add_to_map(flask, self._flask_map) else: compound = db.Compound(db.ID(), self._compounds) compound.create([structure.id()]) compound.disable_exploration() structure.set_aggregate(compound.id()) self._add_to_map(compound, self._compound_map) def _setup_bond_order_job(self, structure_id: db.ID) -> None: """ Checks for already existing bond_order job and if none present, sets up one for given structure """ if calculation_exists_in_structure(self.options.bond_order_job.order, [structure_id], self.options.model, self._structures, self._calculations): return # Setup calculation of a graph calculation = db.Calculation(db.ID(), self._calculations) calculation.create(self.options.model, self.options.bond_order_job, [structure_id]) if self.options.bond_order_settings: calculation.set_settings(self.options.bond_order_settings) finalize_calculation(calculation, self._structures) def _setup_graph_job(self, structure_id: db.ID) -> None: """ Checks for already existing graph job and if none present, sets up one for given structure """ if calculation_exists_in_structure(self.options.graph_job.order, [structure_id], self.options.model, self._structures, self._calculations): return # Setup calculation of a graph calculation = db.Calculation(db.ID(), self._calculations) calculation.create(self.options.model, self.options.graph_job, [structure_id]) if self.options.graph_settings: calculation.set_settings(self.options.graph_settings) finalize_calculation(calculation, self._structures) def _get_matching_aggregate(self, structure: db.Structure) -> Union[db.Compound, db.Flask, None]: """ Queries database for matching aggregate of the given structure based on: * graph (irrespective of order for flasks) * charge * multiplicity Returns None if no matching aggregate in DB yet """ graph = structure.get_graph("masm_cbor_graph") charge = structure.get_charge() multiplicity = structure.get_multiplicity() n_atoms = len(structure.get_atoms()) n_molecules = len(graph.split(";")) if n_molecules > 1: f_id = self._get_aggregate_from_map(n_atoms, n_molecules, charge, multiplicity, graph, self._flask_map) if f_id: return db.Flask(f_id, self._flasks) else: c_id = self._get_aggregate_from_map(n_atoms, n_molecules, charge, multiplicity, graph, self._compound_map) if c_id: return db.Compound(c_id, self._compounds) return None def _query_duplicate(self, structure: db.Structure, aggregate: Union[db.Compound, db.Flask]) \ -> Union[db.Structure, None]: """ Check based on decision lists if the given aggregate has a duplicate structure to the given structure NOTE: not implemented for flasks yet """ if isinstance(aggregate, db.Flask): # currently not possible return None decision_list = structure.get_graph("masm_decision_list") key = aggregate.id().string() if key in self._unique_structures: similar_structures = self._unique_structures[key] for sid, ref_decision_list in similar_structures: if not masm.JsonSerialization.equal_decision_lists(decision_list, ref_decision_list): continue similar_structure = db.Structure(sid, self._structures) if similar_structure.get_model() == structure.get_model(): return similar_structure self._add_to_unique_structures(structure) return None
[docs] @staticmethod def get_n_atoms(structure: db.Structure) -> List[int]: import ast idx_map = ast.literal_eval(structure.get_graph("masm_idx_map")) n_mols = 2 for iMol, _ in idx_map: n_mols = max(n_mols, iMol + 1) n_atoms_per_mol = [0 for _ in range(n_mols)] for iMol, _ in idx_map: n_atoms_per_mol[iMol] += 1 return n_atoms_per_mol