diff --git a/emmet-builders/emmet/builders/materials/electrodes.py b/emmet-builders/emmet/builders/materials/electrodes.py index 5d782d4d3c..9fcf809cd7 100644 --- a/emmet-builders/emmet/builders/materials/electrodes.py +++ b/emmet-builders/emmet/builders/materials/electrodes.py @@ -5,7 +5,7 @@ from functools import lru_cache from itertools import chain, groupby from pprint import pprint -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Union from maggma.builders import Builder, MapBuilder from maggma.stores import MongoStore @@ -18,300 +18,52 @@ from emmet.core.electrode import InsertionElectrodeDoc from emmet.core.structure_group import StructureGroupDoc +from emmet.core.migration import MigrationGraphDoc from emmet.core.utils import jsanitize __author__ = "Jimmy Shen" __email__ = "jmmshn@lbl.gov" -def s_hash(el): - return el.data["comp_delith"] - - -# MatDoc = namedtuple("MatDoc", ["material_id", "structure", "formula_pretty", "framework"]) - -REDOX_ELEMENTS = [ - "Ti", - "V", - "Cr", - "Mn", - "Fe", - "Co", - "Ni", - "Cu", - "Nb", - "Mo", - "Sn", - "Sb", - "W", - "Re", - "Bi", - "C", - "Hf", -] - -WORKING_IONS = ["Li", "Be", "Na", "Mg", "K", "Ca", "Rb", "Sr", "Cs", "Ba"] - -MAT_PROPS = ["structure", "material_id", "formula_pretty", "entries"] - -sg_fields = ["number", "hall_number", "international", "hall", "choice"] - - -def generic_groupby(list_in, comp=operator.eq): +class GroupedThermoDocsBuilder(Builder): """ - Group a list of unsortable objects - Args: - list_in: A list of generic objects - comp: (Default value = operator.eq) The comparator - Returns: - [int] list of labels for the input list + Used grouped ID to fetch entries from the thermo collection + This can be subclassed to accomplish more things with the entries """ - list_out = [None] * len(list_in) - label_num = 0 - for i1, ls1 in enumerate(list_out): - if ls1 is not None: - continue - list_out[i1] = label_num - for i2, ls2 in list(enumerate(list_out))[i1 + 1 :]: - if comp(list_in[i1], list_in[i2]): - if list_out[i2] is None: - list_out[i2] = list_out[i1] - else: - list_out[i1] = list_out[i2] - label_num -= 1 - label_num += 1 - return list_out - - -class StructureGroupBuilder(Builder): - def __init__( - self, - materials: MongoStore, - sgroups: MongoStore, - working_ion: str, - query: dict = None, - ltol: float = 0.2, - stol: float = 0.3, - angle_tol: float = 5.0, - check_newer: bool = True, - **kwargs, - ): - """ - Aggregate materials entries into sgroups that are topotactically similar to each other. - This is an incremental builder that makes ensures that each materials id belongs to one StructureGroupDoc document - Args: - materials (Store): Store of materials documents that contains the structures - sgroups (Store): Store of grouped material ids - query (dict): dictionary to limit materials to be analyzed --- - only applied to the materials when we need to group structures - the phase diagram is still constructed with the entire set - """ - self.materials = materials - self.sgroups = sgroups - self.working_ion = working_ion - self.query = query if query else {} - self.ltol = ltol - self.stol = stol - self.angle_tol = angle_tol - self.check_newer = check_newer - super().__init__(sources=[materials], targets=[sgroups], **kwargs) - - def prechunk(self, number_splits: int) -> Iterable[Dict]: - """ - TODO can implement this for distributed runs by adding filters - """ - pass - - def get_items(self): - """ - Summary of the steps: - - query the materials database for different chemical systems that satisfies the base query - "contains redox element and working ion" - - Get the full chemsys list of interest - - The main loop is over all these chemsys. within the main loop: - - get newest timestamp for the material documents (max_mat_time) - - get the oldest timestamp for the target documents (min_target_time) - - if min_target_time is < max_mat_time then nuke all the target documents - """ - other_wions = list(set(WORKING_IONS) - {self.working_ion}) - # All potentially interesting chemsys must contain the working ion - base_query = { - "$and": [ - self.query.copy(), - {"elements": {"$in": REDOX_ELEMENTS}}, - {"elements": {"$in": [self.working_ion]}}, - {"elements": {"$nin": other_wions}}, - ] - } - self.logger.debug(f"Initial Chemsys QUERY: {base_query}") - - # get a chemsys that only contains the working ion since the working ion - # must be present for there to be voltage steps - all_chemsys = self.materials.distinct("chemsys", criteria=base_query) - # Contains the working ion but not ONLY the working ion - all_chemsys = [ - *filter( - lambda x: self.working_ion in x and len(x) > 1, - [chemsys_.split("-") for chemsys_ in all_chemsys], - ) - ] - - self.logger.debug( - f"Performing initial checks on {len(all_chemsys)} chemical systems containing redox elements with or without the Working Ion." - ) - self.total = len(all_chemsys) - - for chemsys_l in all_chemsys: - chemsys = "-".join(sorted(chemsys_l)) - chemsys_wo = "-".join(sorted(set(chemsys_l) - {self.working_ion})) - chemsys_query = { - "$and": [ - {"chemsys": {"$in": [chemsys_wo, chemsys]}}, - self.query.copy(), - ] - } - self.logger.debug(f"QUERY: {chemsys_query}") - all_mats_in_chemsys = list( - self.materials.query( - criteria=chemsys_query, - properties=MAT_PROPS + [self.materials.last_updated_field], - ) - ) - self.logger.debug( - f"Found {len(all_mats_in_chemsys)} materials in {chemsys_wo}" - ) - if self.check_newer: - all_target_docs = list( - self.sgroups.query( - criteria={"chemsys": chemsys}, - properties=[ - "group_id", - self.sgroups.last_updated_field, - "material_ids", - ], - ) - ) - self.logger.debug( - f"Found {len(all_target_docs)} Grouped documents in {chemsys_wo}" - ) - - mat_times = [ - mat_doc[self.materials.last_updated_field] - for mat_doc in all_mats_in_chemsys - ] - max_mat_time = max(mat_times, default=datetime.min) - self.logger.debug( - f"The newest material doc was generated at {max_mat_time}." - ) - - target_times = [ - g_doc[self.materials.last_updated_field] - for g_doc in all_target_docs - ] - min_target_time = min(target_times, default=datetime.max) - self.logger.debug( - f"The newest GROUP doc was generated at {min_target_time}." - ) - - mat_ids = set( - [mat_doc["material_id"] for mat_doc in all_mats_in_chemsys] - ) - - # If any material id is missing or if any material id has been updated - target_ids = set() - for g_doc in all_target_docs: - target_ids |= set(g_doc["material_ids"]) - - self.logger.debug( - f"There are {len(mat_ids)} material ids in the source database vs {len(target_ids)} in the target database." - ) - if mat_ids == target_ids and max_mat_time < min_target_time: - self.logger.info(f"Skipping chemsys {chemsys}.") - yield None - elif len(target_ids) == 0: - self.logger.info( - f"No documents in chemsys {chemsys} in the target database." - ) - yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} - else: - self.logger.info( - f"Nuking all {len(target_ids)} documents in chemsys {chemsys} in the target database." - ) - self._remove_targets(list(target_ids)) - yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} - else: - yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} - - def update_targets(self, items: List): - items = list(filter(None, chain.from_iterable(items))) - if len(items) > 0: - self.logger.info("Updating {} sgroups documents".format(len(items))) - for struct_group_dict in items: - struct_group_dict[self.sgroups.last_updated_field] = datetime.utcnow() - self.sgroups.update(docs=items, key=["group_id"]) - else: - self.logger.info("No items to update") - - def _entry_from_mat_doc(self, mdoc): - # Note since we are just structure grouping we don't need to be careful with energy or correction - # All of the energy analysis is left to other builders - d_ = { - "entry_id": mdoc["material_id"], - "structure": mdoc["structure"], - "energy": -math.inf, - "correction": -math.inf, - } - return ComputedStructureEntry.from_dict(d_) - - def process_item(self, item: Any) -> Any: - if item is None: - return None - entries = [*map(self._entry_from_mat_doc, item["materials"])] - s_groups = StructureGroupDoc.from_ungrouped_structure_entries( - entries=entries, - ignored_species=[self.working_ion], - ltol=self.ltol, - stol=self.stol, - angle_tol=self.angle_tol, - ) - return [sg.dict() for sg in s_groups] - - def _remove_targets(self, rm_ids): - self.sgroups.remove_docs({"material_ids": {"$in": rm_ids}}) - -class InsertionElectrodeBuilder(Builder): def __init__( self, grouped_materials: MongoStore, thermo: MongoStore, - insertion_electrode: MongoStore, + target: MongoStore, query: dict = None, **kwargs, ): + """ + Group ThermoDocuments together + Args: + grouped_materials: + thermo: The thermo collection, documents are retrieved with the "material_ids" field + target: The target collection the key from the grouped_materials collection is mapped directly here + query: The query to be performed on the grouped_materials collection + **kwargs: + """ self.grouped_materials = grouped_materials - self.insertion_electrode = insertion_electrode self.thermo = thermo + self.target = target self.query = query if query else {} super().__init__( sources=[self.grouped_materials, self.thermo], - targets=[self.insertion_electrode], + targets=[self.target], **kwargs, ) def get_items(self): """ - Get items + Retrieve the thermo documents """ - @lru_cache(1000) - def get_working_ion_entry(working_ion): - with self.thermo as store: - working_ion_docs = [*store.query({"chemsys": working_ion})] - best_wion = min(working_ion_docs, key=lambda x: x["energy_per_atom"]) - return best_wion - def get_thermo_docs(mat_ids): self.logger.debug( f"Looking for {len(mat_ids)} material_id in the Thermo DB." @@ -344,42 +96,58 @@ def get_thermo_docs(mat_ids): "validated for the materials builder." ) return None - - # if len(item["ignored_species"]) != 1: - # raise ValueError( - # "Insertion electrode can only be defined for one working ion species" - # ) - return thermo_docs - # return { - # "group_id": item["group_id"], - # "working_ion_doc": working_ion_doc, - # "working_ion": item["ignored_species"][0], - # "thermo_docs": thermo_docs, - # } q_ = {"$and": [self.query, {"has_distinct_compositions": True}]} self.total = self.grouped_materials.count(q_) for group_doc in self.grouped_materials.query(q_): - working_ion_doc = get_working_ion_entry(group_doc["ignored_species"][0]) - thermo_docs = get_thermo_docs(group_doc["material_ids"]) - if thermo_docs: - yield { - "group_id": group_doc["group_id"], - "working_ion_doc": working_ion_doc, - "working_ion": group_doc["ignored_species"][0], - "thermo_docs": thermo_docs, - } - else: - yield None + group_doc["thermo_docs"] = get_thermo_docs(group_doc["material_ids"]) + yield group_doc def process_item(self, item) -> Dict: + return item + + def update_targets(self, items: List): + items = list(filter(None, items)) + if len(items) > 0: + self.logger.info("Updating {} documents".format(len(items))) + for struct_group_dict in items: + struct_group_dict[ + self.grouped_materials.last_updated_field + ] = datetime.utcnow() + self.target.update(docs=items, key=self.grouped_materials.key) + else: + self.logger.info("No items to update") + + +class InsertionElectrodeBuilder(GroupedThermoDocsBuilder): + def get_items(self): + """ + Additional fields: + - working_ion: the name of the working ion + - working_ion_doc: the materials document for the working ion + """ + + @lru_cache(1000) + def get_working_ion_entry(working_ion): + with self.thermo as store: + working_ion_docs = [*store.query({"chemsys": working_ion})] + best_wion = min(working_ion_docs, key=lambda x: x["energy_per_atom"]) + return best_wion + + for item in super().get_items(): + item["working_ion_doc"] = get_working_ion_entry(item["ignored_species"][0]) + item["working_ion"] = item["ignored_species"][0] + yield item + + def process_item(self, item) -> Union[Dict, None]: """ - Add volume information to each entry to create the insertion electrode document - Add the host structure """ - if item is None: + if item["thermo_docs"] is None: return None + self.logger.debug( f"Working on {item['group_id']} with {len(item['thermo_docs'])}" ) @@ -429,3 +197,32 @@ def update_targets(self, items: List): self.insertion_electrode.update(docs=items, key=["battery_id"]) else: self.logger.info("No items to update") + + +class MigrationGraphBuilder(InsertionElectrodeBuilder): + def process_item(self, item) -> Dict: + if item["thermo_docs"] is None: + return None + + self.logger.debug( + f"Working on {item['group_id']} with {len(item['thermo_docs'])}" + ) + + entries = [ + tdoc_["entries"][tdoc_["energy_type"]] for tdoc_ in item["thermo_docs"] + ] + entries = list(map(ComputedStructureEntry.from_dict, entries)) + + working_ion_entry = ComputedEntry.from_dict( + item["working_ion_doc"]["entries"][item["working_ion_doc"]["energy_type"]] + ) + working_ion = working_ion_entry.composition.reduced_formula + + struct = MigrationGraph.get_structure_from_entries(entries=entries, migrating_ion_entry=working_ion_entry) + + mg_doc = MigrationGraphDoc.from_entries(entries=entries, working_ion_entry=working_ion_entry, ltol=item['ltol'], + stol=item['stol'], + angle_tol=item['angle_tol'], + symprec=item['symprec']) + d = mg_doc.dict() + return jsanitize(d) diff --git a/emmet-builders/emmet/builders/materials/structure_group.py b/emmet-builders/emmet/builders/materials/structure_group.py new file mode 100644 index 0000000000..6734a0d719 --- /dev/null +++ b/emmet-builders/emmet/builders/materials/structure_group.py @@ -0,0 +1,260 @@ +import math +import operator +from collections import namedtuple +from datetime import datetime +from functools import lru_cache +from itertools import chain, groupby +from pprint import pprint +from typing import Any, Dict, Iterable, List, Optional + +from maggma.builders import Builder, MapBuilder +from maggma.stores import MongoStore +from monty.json import MontyEncoder +from numpy import unique +from pymatgen.analysis.structure_matcher import ElementComparator, StructureMatcher +from pymatgen.apps.battery.insertion_battery import InsertionElectrode +from pymatgen.core import Composition, Structure +from pymatgen.entries.computed_entries import ComputedEntry, ComputedStructureEntry + +from emmet.core.electrode import InsertionElectrodeDoc +from emmet.core.structure_group import StructureGroupDoc +from emmet.core.utils import jsanitize + +__author__ = "Jimmy Shen" +__email__ = "jmmshn@lbl.gov" + + +def s_hash(el): + return el.data["comp_delith"] + + +# MatDoc = namedtuple("MatDoc", ["material_id", "structure", "formula_pretty", "framework"]) + +MAT_PROPS = ["structure", "material_id", "formula_pretty", "entries"] + +sg_fields = ["number", "hall_number", "international", "hall", "choice"] + + +def generic_groupby(list_in, comp=operator.eq): + """ + Group a list of unsortable objects + Args: + list_in: A list of generic objects + comp: (Default value = operator.eq) The comparator + Returns: + [int] list of labels for the input list + """ + list_out = [None] * len(list_in) + label_num = 0 + for i1, ls1 in enumerate(list_out): + if ls1 is not None: + continue + list_out[i1] = label_num + for i2, ls2 in list(enumerate(list_out))[i1 + 1 :]: + if comp(list_in[i1], list_in[i2]): + if list_out[i2] is None: + list_out[i2] = list_out[i1] + else: + list_out[i1] = list_out[i2] + label_num -= 1 + label_num += 1 + return list_out + + +class StructureGroupBuilder(Builder): + def __init__( + self, + materials: MongoStore, + sgroups: MongoStore, + working_ion: str, + query: dict = None, + ltol: float = 0.2, + stol: float = 0.3, + angle_tol: float = 5.0, + check_newer: bool = True, + settings: Optional[EmmetBuildSettings] = None, + **kwargs, + ): + """ + Aggregate materials entries into sgroups that are topotactically similar to each other. + This is an incremental builder that makes ensures that each materials id belongs to one StructureGroupDoc document + Args: + materials (Store): Store of materials documents that contains the structures + sgroups (Store): Store of grouped material ids + query (dict): dictionary to limit materials to be analyzed --- + only applied to the materials when we need to group structures + the phase diagram is still constructed with the entire set + """ + self.materials = materials + self.sgroups = sgroups + self.working_ion = working_ion + self.query = query if query else {} + self.ltol = ltol + self.stol = stol + self.angle_tol = angle_tol + self.check_newer = check_newer + self.settings = EmmetBuildSettings.autoload(settings) + super().__init__(sources=[materials], targets=[sgroups], **kwargs) + + def prechunk(self, number_splits: int) -> Iterable[Dict]: + """ + TODO can implement this for distributed runs by adding filters + """ + pass + + def get_items(self): + """ + Summary of the steps: + - query the materials database for different chemical systems that satisfies the base query + "contains redox element and working ion" + - Get the full chemsys list of interest + - The main loop is over all these chemsys. within the main loop: + - get newest timestamp for the material documents (max_mat_time) + - get the oldest timestamp for the target documents (min_target_time) + - if min_target_time is < max_mat_time then nuke all the target documents + """ + other_wions = list(set(self.settings.SGROUP_WORKING_IONS) - {self.working_ion}) + # All potentially interesting chemsys must contain the working ion + base_query = { + "$and": [ + self.query.copy(), + {"elements": {"$in": self.settings.SGROUP_REDOX_ELEMENTS}}, + {"elements": {"$in": [self.working_ion]}}, + {"elements": {"$nin": other_wions}}, + ] + } + self.logger.debug(f"Initial Chemsys QUERY: {base_query}") + + # get a chemsys that only contains the working ion since the working ion + # must be present for there to be voltage steps + all_chemsys = self.materials.distinct("chemsys", criteria=base_query) + # Contains the working ion but not ONLY the working ion + all_chemsys = [ + *filter( + lambda x: self.working_ion in x and len(x) > 1, + [chemsys_.split("-") for chemsys_ in all_chemsys], + ) + ] + + self.logger.debug( + f"Performing initial checks on {len(all_chemsys)} chemical systems containing redox elements with or without the Working Ion." + ) + self.total = len(all_chemsys) + + for chemsys_l in all_chemsys: + chemsys = "-".join(sorted(chemsys_l)) + chemsys_wo = "-".join(sorted(set(chemsys_l) - {self.working_ion})) + chemsys_query = { + "$and": [ + {"chemsys": {"$in": [chemsys_wo, chemsys]}}, + self.query.copy(), + ] + } + self.logger.debug(f"QUERY: {chemsys_query}") + all_mats_in_chemsys = list( + self.materials.query( + criteria=chemsys_query, + properties=MAT_PROPS + [self.materials.last_updated_field], + ) + ) + self.logger.debug( + f"Found {len(all_mats_in_chemsys)} materials in {chemsys_wo}" + ) + if self.check_newer: + all_target_docs = list( + self.sgroups.query( + criteria={"chemsys": chemsys}, + properties=[ + "group_id", + self.sgroups.last_updated_field, + "material_ids", + ], + ) + ) + self.logger.debug( + f"Found {len(all_target_docs)} Grouped documents in {chemsys_wo}" + ) + + mat_times = [ + mat_doc[self.materials.last_updated_field] + for mat_doc in all_mats_in_chemsys + ] + max_mat_time = max(mat_times, default=datetime.min) + self.logger.debug( + f"The newest material doc was generated at {max_mat_time}." + ) + + target_times = [ + g_doc[self.materials.last_updated_field] + for g_doc in all_target_docs + ] + min_target_time = min(target_times, default=datetime.max) + self.logger.debug( + f"The newest GROUP doc was generated at {min_target_time}." + ) + + mat_ids = set( + [mat_doc["material_id"] for mat_doc in all_mats_in_chemsys] + ) + + # If any material id is missing or if any material id has been updated + target_ids = set() + for g_doc in all_target_docs: + target_ids |= set(g_doc["material_ids"]) + + self.logger.debug( + f"There are {len(mat_ids)} material ids in the source database vs {len(target_ids)} in the target database." + ) + if mat_ids == target_ids and max_mat_time < min_target_time: + self.logger.info(f"Skipping chemsys {chemsys}.") + yield None + elif len(target_ids) == 0: + self.logger.info( + f"No documents in chemsys {chemsys} in the target database." + ) + yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} + else: + self.logger.info( + f"Nuking all {len(target_ids)} documents in chemsys {chemsys} in the target database." + ) + self._remove_targets(list(target_ids)) + yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} + else: + yield {"chemsys": chemsys, "materials": all_mats_in_chemsys} + + def update_targets(self, items: List): + items = list(filter(None, chain.from_iterable(items))) + if len(items) > 0: + self.logger.info("Updating {} sgroups documents".format(len(items))) + for struct_group_dict in items: + struct_group_dict[self.sgroups.last_updated_field] = datetime.utcnow() + self.sgroups.update(docs=items, key=["group_id"]) + else: + self.logger.info("No items to update") + + def _entry_from_mat_doc(self, mdoc): + # Note since we are just structure grouping we don't need to be careful with energy or correction + # All of the energy analysis is left to other builders + d_ = { + "entry_id": mdoc["material_id"], + "structure": mdoc["structure"], + "energy": -math.inf, + "correction": -math.inf, + } + return ComputedStructureEntry.from_dict(d_) + + def process_item(self, item: Any) -> Any: + if item is None: + return None + entries = [*map(self._entry_from_mat_doc, item["materials"])] + s_groups = StructureGroupDoc.from_ungrouped_structure_entries( + entries=entries, + ignored_species=[self.working_ion], + ltol=self.ltol, + stol=self.stol, + angle_tol=self.angle_tol, + ) + return [sg.dict() for sg in s_groups] + + def _remove_targets(self, rm_ids): + self.sgroups.remove_docs({"material_ids": {"$in": rm_ids}}) diff --git a/emmet-builders/emmet/builders/settings.py b/emmet-builders/emmet/builders/settings.py index 72aa1cc6d1..70d92fcade 100644 --- a/emmet-builders/emmet/builders/settings.py +++ b/emmet-builders/emmet/builders/settings.py @@ -32,6 +32,36 @@ class EmmetBuildSettings(EmmetSettings): description="Allowed task_types to build materials from", ) + SGROUP_WORKING_IONS: List[str] = Field( + ["Li", "Be", "Na", "Mg", "K", "Ca", "Rb", "Sr", "Cs", "Ba"], + description="Working ions names, all groups will consist of formulas " + "with and without the working ion." + ) + + SGROUP_REDOX_ELEMENTS: List[str] = Field( + [ + "Ti", + "V", + "Cr", + "Mn", + "Fe", + "Co", + "Ni", + "Cu", + "Nb", + "Mo", + "Sn", + "Sb", + "W", + "Re", + "Bi", + "C", + "Hf", + ], + description="Redox element names, all groups will consist of formulas " + "that must contain redox element." + ) + DEFAULT_REFERENCE: str = Field( "@article{Jain2013,\nauthor = {Jain, Anubhav and Ong, Shyue Ping and " "Hautier, Geoffroy and Chen, Wei and Richards, William Davidson and " @@ -59,3 +89,4 @@ class EmmetBuildSettings(EmmetSettings): ), description="Default History for provenance ", ) + diff --git a/emmet-core/emmet/core/migration.py b/emmet-core/emmet/core/migration.py new file mode 100644 index 0000000000..9ecf91577e --- /dev/null +++ b/emmet-core/emmet/core/migration.py @@ -0,0 +1,212 @@ +import logging +import math +import operator +from datetime import datetime +from itertools import groupby +from typing import Iterable, List, Union + +from monty.json import MontyDecoder +from pydantic import BaseModel, Field, validator +from pymatgen.analysis.diffusion.neb.full_path_mapper import MigrationGraph +from pymatgen.analysis.graphs import StructureGraph +from pymatgen.analysis.structure_matcher import ElementComparator, StructureMatcher +from pymatgen.core import Composition, Structure +from pymatgen.entries.computed_entries import ComputedEntry, ComputedStructureEntry +from pymatgen.symmetry.analyzer import SpacegroupAnalyzer + +from emmet.core.structure_group import StructureGroupDoc + +__author__ = "Jimmy Shen" +__email__ = "jmmshn@gmail.com" + +logger = logging.getLogger(__name__) + + +class MigrationGraphDoc(BaseModel): + """ + Migration Graph + """ + + battery_id: str = Field( + None, + description="The id for this migration graph document, shared with " + "insertion electrode since the same kind of structure " + "grouping is performed.", + ) + + # host_structure: Structure = Field( + # None, + # description="Host structure (structure without the working ion)", + # ) + # + # migration_graph: StructureGraph = Field( + # None, + # description="The StructureGraph object that contains all of the migration sites" + # ) + # + # framework: Composition = Field( + # None, + # description="The chemical compositions of the host framework", + # ) + # + # elements: List[Element] = Field( + # None, + # description="The atomic species contained in the host structure (not including the working ion).", + # ) + # + # nelements: int = Field( + # None, + # description="The number of elements in the material (not including the working ion).", + # ) + # + # chemsys: str = Field( + # None, + # description="The chemical system the host lattice belongs to (not including the working ion)", + # ) + + found_path: bool = Field( + None, description="True, if an intercalating path is found." + ) + + ltol: float = Field( + None, description="Lattice length tolerance parameter for the StructureMatcher." + ) + + stol: float = Field( + None, description="site position tolerance parameter for the StructureMatcher." + ) + + angle_tol: float = Field( + None, description="Bond angle tolerance parameter for the StructureMatcher." + ) + + symprec: float = Field(None, description="SPGLIB tolerance parameter.") + + migration_graph_object: MigrationGraph = Field( + None, description="The migration pathway object forom " "pymatgen-diffussion." + ) + + barrier: float = Field( + None, + description="The highest energy difference along the path with the " + "lowest cumulative absolute energy difference.", + ) + + last_updated: datetime = Field( + None, + description="Timestamp when this document was built.", + ) + + # Make sure that the datetime field is properly formatted + @validator("last_updated", pre=True) + def last_updated_dict_ok(cls, v): + return MontyDecoder().process_decoded(v) + + @classmethod + def from_entries( + cls, + entries: List[ComputedStructureEntry], + working_ion_entry: ComputedEntry, + ltol: float, + stol: float, + angle_tol: float, + symprec: float, + min_distance_cutoff: float = 5.0, + max_distance_cutoff: float = 10.0, + **kwargs, + ) -> Union["MigrationGraphDoc", None]: + """ + Parse a list of entries and construct the migration graph. + The tolerances must be explicitly provided. + Args: + entries: A list of entries that is already grouped together. + working_ion_entry: Computed entry containing the metallic phase of the working ion. + ltol: length tolerance parameter + stol: site tolerance parameter + angle_tol: angular tolerance parameter + symprec: SPGLIB tolerance parameter + min_distance_cutoff: The initial guess for the bonding distance, if no intercalation pathways are found, + the threshold will be increased by 1 Angstrom until max_distance_cutoff + max_distance_cutoff: The maximum we allowed to increase the distance cutoff to look for + intercalation pathways + kwargs: Additional kwargs to help search and filter, should be taken directly from electrode document. + + Returns: + A MigrationGraphDocument + """ + migrating_species = working_ion_entry.composition.reduced_formula + cur_id = kwargs.get("battery_id", "MISSING battery_id") + + slist = MigrationGraph.get_structure_from_entries( + entries=entries, + migrating_ion_entry=working_ion_entry, + ltol=ltol, + stol=stol, + angle_tol=angle_tol, + symprec=symprec, + ) + + if len(slist) == 0: + logger.warning( + f"No structure with meta-stable sites could be generate for id: [{cur_id}]" + ) + return None + + struct = slist[0] + d_cut = min_distance_cutoff + mg = None + while d_cut <= max_distance_cutoff: + mg = MigrationGraph.with_distance( + structure=struct, + migrating_specie=migrating_species, + max_distance=d_cut, + symprec=0.01, + ) + mg.assign_cost_to_graph() + u, path_hops = next(mg.get_path()) + if len(path_hops) != 0: + break + d_cut += 1.0 + + if mg is None: + logger.warning(f"No Migration graph could be generate for id: [{cur_id}]") + return None + + # adding the energy difference + for lab, d in mg.unique_hops.items(): + e_u = mg.only_sites.sites[d["iindex"]].properties["insertion_energy"] + e_v = mg.only_sites.sites[d["eindex"]].properties["insertion_energy"] + ediff = abs(e_u - e_v) + mg.add_data_to_similar_edges( + target_label=d["hop_label"], data={"ediff": ediff} + ) + + mg.assign_cost_to_graph(cost_keys=["ediff"]) + lowest_cost, best_path = math.inf, [] + + for u, path in mg.get_path(): + cum_cost = sum([hop["cost"] for hop in path]) + if cum_cost < lowest_cost: + lowest_cost, best_path = cum_cost, path + + all_sites_along_path = set() + + for hop in best_path: + all_sites_along_path |= {hop["iindex"], hop["eindex"]} + + site_energies = [ + mg.only_sites.sites[ii_].properties["insertion_energy"] + for ii_ in all_sites_along_path + ] + barrier = max(site_energies) - min(site_energies) + + fields = { + "ltol": ltol, + "stol": stol, + "angle_tol": angle_tol, + "migration_graph_object": mg, + "barrier": barrier, + } + + fields.update(kwargs) + return cls(**fields) diff --git a/emmet-core/emmet/core/structure_group.py b/emmet-core/emmet/core/structure_group.py index 9d1cf16225..b162249d71 100644 --- a/emmet-core/emmet/core/structure_group.py +++ b/emmet-core/emmet/core/structure_group.py @@ -11,7 +11,8 @@ from pymatgen.entries.computed_entries import ComputedEntry, ComputedStructureEntry from pymatgen.symmetry.analyzer import SpacegroupAnalyzer -from emmet.core.mpid import MPID +__author__ = "Jimmy Shen" +__email__ = "jmmshn@gmail.com" logger = logging.getLogger(__name__) @@ -79,6 +80,18 @@ class StructureGroupDoc(BaseModel): "present the chemsys will also include the ignored species.", ) + ltol: float = Field( + None, description="Lattice length tolerance parameter for the StructureMatcher." + ) + + stol: float = Field( + None, description="site position tolerance parameter for the StructureMatcher." + ) + + angle_tol: float = Field( + None, description="Bond angle tolerance parameter for the StructureMatcher." + ) + last_updated: datetime = Field( None, description="Timestamp when this document was built.", @@ -94,6 +107,7 @@ def from_grouped_entries( cls, entries: List[Union[ComputedEntry, ComputedStructureEntry]], ignored_species: List[str], + **kwargs, ) -> "StructureGroupDoc": """ " Assuming a list of entries are already grouped together, create a StructureGroupDoc @@ -125,6 +139,7 @@ def from_grouped_entries( "chemsys": "-".join(sorted(all_atoms | set(ignored_species))), "has_distinct_compositions": len(all_comps) > 1, } + fields.update(kwargs) return cls(**fields) @@ -184,7 +199,11 @@ def from_ungrouped_structure_entries( ) for g in group_entries_with_structure_matcher(f_group_l, sm): struct_group = cls.from_grouped_entries( - g, ignored_species=ignored_species + g, + ignored_species=ignored_species, + ltol=ltol, + stol=stol, + angle_tol=angle_tol, ) cnt_ += len(struct_group.material_ids) results.append(struct_group) diff --git a/emmet-core/requirements.txt b/emmet-core/requirements.txt index fe014dee64..81f058e88d 100644 --- a/emmet-core/requirements.txt +++ b/emmet-core/requirements.txt @@ -1,4 +1,5 @@ pymatgen==2022.0.5 +pymatgen-analysis-diffusion==2021.4.29 monty==v2021.3.3 pydantic==1.8.1 pybtex==0.24.0 diff --git a/tests/emmet-core/test_migration.py b/tests/emmet-core/test_migration.py new file mode 100644 index 0000000000..ed966e3a70 --- /dev/null +++ b/tests/emmet-core/test_migration.py @@ -0,0 +1,57 @@ +import pytest +from monty.serialization import loadfn +from pymatgen.apps.battery.conversion_battery import ConversionElectrode +from pymatgen.apps.battery.insertion_battery import InsertionElectrode +from pymatgen.core import Composition +from pymatgen.entries.computed_entries import ComputedEntry + +from emmet.core.electrode import ( + ConversionElectrodeDoc, + ConversionVoltagePairDoc, + InsertionElectrodeDoc, + InsertionVoltagePairDoc, +) +from emmet.core.migration import MigrationGraphDoc + +__author__ = "Jimmy Shen" +__email__ = "jmmshn@gmail.com" + + +@pytest.fixture(scope="session") +def entries_lto(test_dir): + """ + Recycle the test cases from pymatgen + """ + entries = loadfn(test_dir / "LiTiO2_batt.json") + for itr, ient in enumerate(entries): + ient.entry_id = f"mp-{itr}" + return entries + + +@pytest.fixture(scope="session") +def insertion_elec(test_dir) -> InsertionElectrode: + """ + Insertion electrod object + """ + entry_Li = ComputedEntry("Li", -1.90753119) + entries_LTO = loadfn(test_dir / "LiTiO2_batt.json") + return InsertionElectrode.from_entries(entries_LTO, entry_Li) + + +def test_StructureGroupDoc_from_ungrouped_entries(insertion_elec: InsertionElectrode): + entries = insertion_elec.get_stable_entries() + entry_li = ComputedEntry("Li", -1.90753119) + dist_thresh = 4 + mg = MigrationGraphDoc.from_entries( + entries=entries, + working_ion_entry=entry_li, + ltol=0.4, + stol=0.6, + angle_tol=15, + symprec=0.1, + min_distance_cutoff=dist_thresh, + ) + + assert len(mg.migration_graph_object.only_sites) == 6 + for u, v, d in mg.migration_graph_object.m_graph.graph.edges(data=True): + assert d["hop_distance"] < dist_thresh