#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
.. module:: queries
:synopsis: Collection of functions that help generating queries
based on Chemoton specific objects.
"""
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
from collections import Counter
from json import dumps
from typing import Any, Dict, List, Optional, Union
# Third party imports
import scine_database as db
import scine_utilities as utils
[docs]class stop_on_timeout:
"""
Iterator class/function that gracefully stops database loops if the
loop cursor times out.
Parameters
----------
loop :: Iterator
The original iterator statement of a DB loop.
Examples
--------
>>> def inner_loop():
>>> for i in range(10):
>>> if i%2 ==0:
>>> yield i
>>> else:
>>> raise RuntimeError('socket error or timeout , Failed at '+str(i))
>>>
>>> for i in stop_on_timeout(inner_loop()):
>>> print(i)
"""
def __init__(self, loop):
self.loop = loop
def __iter__(self):
return self
def __next__(self):
try:
return next(self.loop)
except StopIteration:
raise StopIteration # pylint: disable=raise-missing-from
except RuntimeError as e:
if "socket error or timeout" in str(e):
raise StopIteration # pylint: disable=raise-missing-from
else:
raise e
[docs]def model_query(model: db.Model) -> List[dict]:
"""
Generates a query that fits the given model, meaning that any field
in the model given as 'any' will not be queried, while all other fields
must match.
Parameters
----------
model :: Scine::Database::Model
The model for which a query list will be generated.
Returns
-------
query :: List[dict]
A list of queries for each element of the Model class.
The list can be added to any '$and' or '$or' expression.
Examples
--------
>>> selection = {'$and': [
>>> {'some': 'logic'},
>>> {'more': 'logic'}
>>> ] + model_query(model)
>>> }
>>> for s in collection.iterate_structures(dumps(selection)):
>>> pass
"""
result = []
fields = ['spin_mode', 'basis_set', 'method', 'method_family', 'program', 'version', 'solvation', 'solvent',
'embedding', 'periodic_boundaries', 'external_field', 'temperature', 'electronic_temperature']
for field in fields:
value = getattr(model, field)
if value.lower() != "any":
result.append({f"model.{field}": value})
return result
[docs]def identical_reaction(
lhs_aggregates: List[db.ID], rhs_aggregates: List[db.ID], lhs_types: List[db.CompoundOrFlask],
rhs_types: List[db.CompoundOrFlask], reactions: db.Collection
) -> Union[db.Reaction, None]:
"""
Searches for a reaction with the same aggregates, forward and backward reactions
are categorized as the same reaction.
Parameters
----------
lhs_aggregates :: List[db.ID]
The ids of the aggregates of the left hand side
rhs_aggregates :: List[db.ID]
The ids of the aggregates of the right hand side
lhs_types :: List[db.ID]
The types of the LHS aggregates.
rhs_types :: List[db.ID]
The types of the RHS aggregates.
reactions :: db.Collection (Scine::Database::Collection)
Returns
-------
reaction :: Union[db.Reaction, None]
The identical reaction or None if no identical reaction found in the collection
"""
lhs_list = []
for i, j in zip(lhs_aggregates, lhs_types):
a_type = "flask" if j == db.CompoundOrFlask.FLASK else "compound"
lhs_list.append({"id": {"$oid": i.string()}, "type": a_type})
rhs_list = []
for i, j in zip(rhs_aggregates, rhs_types):
a_type = "flask" if j == db.CompoundOrFlask.FLASK else "compound"
rhs_list.append({"id": {"$oid": i.string()}, "type": a_type})
selection = {
"$or": [
{
"$and": [
{"lhs": {"$size": len(rhs_list), "$all": rhs_list}},
{"rhs": {"$size": len(lhs_list), "$all": lhs_list}},
]
},
{
"$and": [
{"lhs": {"$size": len(lhs_list), "$all": lhs_list}},
{"rhs": {"$size": len(rhs_list), "$all": rhs_list}},
]
},
]
}
for hit in reactions.query_reactions(dumps(selection)):
if _verify_identical_reaction(lhs_aggregates, rhs_aggregates, hit) or _verify_identical_reaction(
rhs_aggregates, lhs_aggregates, hit
):
return hit
return None
def _verify_identical_reaction(
lhs_aggregates: List[db.ID], rhs_aggregates: List[db.ID], possible_reaction: db.Reaction
) -> bool:
test_reactants = possible_reaction.get_reactants(db.Side.BOTH)
test_lhs = test_reactants[0]
test_rhs = test_reactants[1]
return Counter([x.string() for x in lhs_aggregates]) == Counter([x.string() for x in test_lhs]) and Counter(
[x.string() for x in rhs_aggregates]
) == Counter([x.string() for x in test_rhs])
[docs]def stationary_points() -> dict:
"""
Setup query for 1) optimized structures linked to an aggregate and 2) transition states
"""
selection = {
"$or": [
{"label": "ts_optimized"},
{
"$and": [
{
"$or": [
{"label": "minimum_optimized"},
{"label": "user_optimized"},
{"label": "complex_optimized"},
]
},
{"aggregate": {"$ne": ""}},
{"exploration_disabled": {"$ne": True}},
]
},
]
}
return selection
[docs]def select_calculation_by_structures(job_order: str, structure_id_list: List[db.ID], model: db.Model) -> dict:
"""
Sets up a query for calculations with a specific job order and model working
on all of the given structures irrespective of their ordering.
Parameters
----------
job_order : str
The job order of the calculations to consider.
structure_id_list : List[db.ID]
The list of structure ids of interest.
model : db.Model
The model the calculations shall use.
Returns
-------
dict
The selection query dictionary.
"""
struct_oids = [{"$oid": sid.string()} for sid in structure_id_list]
selection = {
"$and": [
{"job.order": job_order},
{"structures": {"$size": len(struct_oids), "$all": struct_oids}},
*model_query(model)
]
}
return selection
[docs]def calculation_exists_in_structure(job_order: str, structure_id_list: List[db.ID], model: db.Model,
structures: db.Collection, calculations: db.Collection,
settings: Optional[Dict[str, Any]] = None,
auxiliaries: Optional[Dict[str, Any]] = None) -> bool:
"""
Check if a calculation exists that corresponds to the given structures, mode, settings, etc.
Parameters
----------
job_order : str
The job order of the calculations to consider.
structure_id_list : List[db.ID]
The list of structure ids of interest.
model : db.Model
The model the calculations shall use.
structures : db.Collection
The structure collection.
calculations : db.Collection
The calculation collection.
settings : dict (optional)
The settings of the calculation.
auxiliaries : dict (optional)
The auxiliaries of the calculation.
Returns
-------
True, if such a calculation exists. False, otherwise.
"""
return get_calculation_id_from_structure(job_order, structure_id_list, model, structures,
calculations, settings, auxiliaries) is not None
[docs]def get_calculation_id_from_structure(job_order: str, structure_id_list: List[db.ID], model: db.Model,
structures: db.Collection, calculations: db.Collection,
settings: Optional[Union[utils.ValueCollection, Dict[str, Any]]] = None,
auxiliaries: Optional[Dict[str, Any]] = None) -> Union[db.ID, None]:
"""
Search for a calculation corresponding to the given settings. If the calculation is found, its ID is returned.
Parameters
----------
job_order : str
The job order of the calculations to consider.
structure_id_list : List[db.ID]
The list of structure ids of interest.
model : db.Model
The model the calculations shall use.
structures : db.Collection
The structure collection.
calculations : db.Collection
The calculation collection.
settings : dict (optional)
The settings of the calculation.
auxiliaries : dict (optional)
The auxiliaries of the calculation.
Returns
-------
Returns the calculation ID if found. Returns None if no calculation corresponds to the given specification.
"""
if len(structure_id_list) < 1:
return None
# settings type check, support both dict and ValueCollection and want ValueCollection for speed
compare_settings = None
if settings is not None:
if isinstance(settings, utils.ValueCollection):
compare_settings = settings
elif isinstance(settings, dict):
compare_settings = utils.ValueCollection(settings)
else:
raise TypeError(f"Gave incompatible type '{type(settings)}' to 'get_calculation_id_from_structure'")
structure_0 = db.Structure(structure_id_list[0], structures)
calc_id_set = set([c_id.string() for c_id in structure_0.query_calculations(job_order, model, calculations)])
if not calc_id_set:
return None
for s_id in structure_id_list:
structure = db.Structure(s_id, structures)
struc_set = set([c_id.string() for c_id in structure.query_calculations(job_order, model, calculations)])
calc_id_set = calc_id_set.intersection(struc_set)
calc_id_str = [{"$oid": str_id} for str_id in calc_id_set]
selection = {
"$and": [{"_id": {"$in": calc_id_str}}]
}
for calculation in stop_on_timeout(calculations.iterate_calculations(dumps(selection))):
calculation.link(calculations)
structures_in_calc_ids = calculation.get_structures()
if len(structure_id_list) != len(structures_in_calc_ids):
continue
# Check structure ids
matching_structures = True
for s_id in structures_in_calc_ids:
if s_id not in structure_id_list:
matching_structures = False
break
if not matching_structures:
continue
if compare_settings is not None:
if compare_settings != calculation.get_settings():
continue
if auxiliaries is not None:
if auxiliaries != calculation.get_auxiliaries():
continue
return calculation.id()
return None
[docs]def get_calculation_id(job_order: str, structure_id_list: List[db.ID], model: db.Model,
calculations: db.Collection,
settings: Optional[Union[utils.ValueCollection, Dict[str, Any]]] = None,
auxiliaries: Optional[Dict[str, Any]] = None) -> Union[db.ID, None]:
if len(structure_id_list) < 1:
return None
selection = select_calculation_by_structures(job_order, structure_id_list, model)
# simple case of no required loop comparisons
if settings is None and auxiliaries is None:
hit = calculations.get_one_calculation(dumps(selection))
if hit is not None:
return hit.id()
return None
# settings type check, support both dict and ValueCollection and want ValueCollection for speed
compare_settings = None
if settings is not None:
if isinstance(settings, utils.ValueCollection):
compare_settings = settings
elif isinstance(settings, dict):
compare_settings = utils.ValueCollection(settings)
else:
raise TypeError(f"Gave incompatible type '{type(settings)}' to 'get_calculation_id'")
for calculation in stop_on_timeout(calculations.iterate_calculations(dumps(selection))):
calculation.link(calculations)
if compare_settings is not None:
if compare_settings != calculation.get_settings():
continue
if auxiliaries is not None:
if auxiliaries != calculation.get_auxiliaries():
continue
return calculation.id()
return None