Source code for mchammer.data_containers.wang_landau_data_container

"""Definition of the Wang-Landau data container class."""

from warnings import warn
from collections import Counter, OrderedDict
from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Tuple, Union

import numpy as np
import pandas as pd

from ase.units import kB
from ase import Atoms
from pandas import DataFrame, concat as pd_concat

from icet import ClusterSpace
from .base_data_container import BaseDataContainer


[docs]class WangLandauDataContainer(BaseDataContainer): """ Data container for storing information concerned with :ref:`Wang-Landau simulation <wang_landau_ensemble>` performed with mchammer. Parameters ---------- structure : ase.Atoms reference atomic structure associated with the data container ensemble_parameters : dict parameters associated with the underlying ensemble metadata : dict metadata associated with the data container """ def _update_last_state(self, last_step: int, occupations: List[int], accepted_trials: int, random_state: tuple, fill_factor: float, fill_factor_history: Dict[int, float], entropy_history: Dict[int, Dict[int, float]], histogram=Dict[int, int], entropy=Dict[int, float]): """Updates last state of the Wang-Landau simulation. Parameters ---------- last_step last trial step occupations occupation vector observed during the last trial step accepted_trial number of current accepted trial steps random_state tuple representing the last state of the random generator fill_factor fill factor of Wang-Landau algorithm fill_factor_history evolution of the fill factor of Wang-Landau algorithm (key=MC trial step, value=fill factor) entropy_history evolution of the (relative) entropy accumulated during Wang-Landau simulation (key=MC trial step, value=(key=bin, value=entropy)) histogram histogram of states visited during Wang-Landau simulation entropy (relative) entropy accumulated during Wang-Landau simulation """ super()._update_last_state( last_step=last_step, occupations=occupations, accepted_trials=accepted_trials, random_state=random_state) self._last_state['fill_factor'] = fill_factor self._last_state['fill_factor_history'] = fill_factor_history self._last_state['entropy_history'] = entropy_history self._last_state['histogram'] = histogram self._last_state['entropy'] = entropy @property def fill_factor(self) -> float: """ final value of the fill factor in the Wang-Landau algorithm """ return float(self._last_state['fill_factor']) @property def fill_factor_history(self) -> DataFrame: """ evolution of the fill factor in the Wang-Landau algorithm """ return DataFrame({'mctrial': list(self._last_state['fill_factor_history'].keys()), 'fill_factor': list(self._last_state['fill_factor_history'].values())})
[docs] def get(self, *tags: str, fill_factor_limit: float = None) \ -> Union[np.ndarray, List[Atoms], Tuple[np.ndarray, List[Atoms]]]: """Returns the accumulated data for the requested observables, including configurations stored in the data container. The latter can be achieved by including 'trajectory' as one of the tags. Parameters ---------- tags names of the requested properties fill_factor_limit return data recorded up to the point when the specified fill factor limit was reached, or ``None`` if the entropy history is empty or the last fill factor is above the limit; otherwise return all data Raises ------ ValueError if tags is empty ValueError if observables are requested that are not in data container Examples -------- Below the `get` method is illustrated but first we require a data container. >>> from ase import Atoms >>> from icet import ClusterExpansion, ClusterSpace >>> from mchammer.calculators import ClusterExpansionCalculator >>> from mchammer.ensembles import WangLandauEnsemble >>> # prepare cluster expansion >>> prim = Atoms('Au', positions=[[0, 0, 0]], cell=[1, 1, 10], pbc=True) >>> cs = ClusterSpace(prim, cutoffs=[1.1], chemical_symbols=['Ag', 'Au']) >>> ce = ClusterExpansion(cs, [0, 0, 2]) >>> # prepare initial configuration >>> structure = prim.repeat((4, 4, 1)) >>> for k in range(8): ... structure[k].symbol = 'Ag' >>> # set up and run Wang-Landau simulation >>> calculator = ClusterExpansionCalculator(structure, ce) >>> mc = WangLandauEnsemble(structure=structure, ... calculator=calculator, ... energy_spacing=1, ... dc_filename='ising_2d_run.dc', ... fill_factor_limit=0.3) >>> mc.run(number_of_trial_steps=len(structure)*3000) # in practice one requires more steps We can now access the data container by reading it from file by using the `read` method. For the purpose of this example, however, we access the data container associated with the ensemble directly. >>> dc = mc.data_container The following lines illustrate how to use the `get` method for extracting data from the data container. >>> # obtain all values of the potential represented by >>> # the cluster expansion and the MC trial step along the >>> # trajectory >>> import matplotlib.pyplot as plt >>> s, p = dc.get('mctrial', 'potential') >>> _ = plt.plot(s, p) >>> # as above but this time only included data recorded up to >>> # the point when the fill factor reached below 0.6 >>> s, p = dc.get('mctrial', 'potential', fill_factor_limit=0.6) >>> _ = plt.plot(s, p) >>> plt.show(block=False) >>> # obtain configurations along the trajectory along with >>> # their potential >>> p, confs = dc.get('potential', 'trajectory') """ if len(tags) == 0: raise TypeError('Missing tags argument') local_tags = ['occupations' if tag == 'trajectory' else tag for tag in tags] for tag in local_tags: if tag in 'mctrial': continue if tag not in self.observables: raise ValueError('No observable named {} in data container'.format(tag)) # collect data mctrials = [row_dict['mctrial'] for row_dict in self._data_list] data = pd.DataFrame.from_records(self._data_list, index=mctrials, columns=local_tags) if fill_factor_limit is not None: # only include data for fill factors up to the limit df_ffh = self.fill_factor_history.astype( {'mctrial': np.int64, 'fill_factor': np.float64}) mctrial_last = df_ffh.loc[ df_ffh.fill_factor <= fill_factor_limit].mctrial.min() data = data.loc[data.index <= mctrial_last] data.dropna(inplace=True) # handling of trajectory def occupation_to_atoms(occupation): structure = self.structure.copy() structure.numbers = occupation return structure data_list = [] for tag in local_tags: if tag == 'occupations': traj = [occupation_to_atoms(o) for o in data['occupations']] data_list.append(traj) else: data_list.append(data[tag].values) if len(data_list) > 1: return tuple(data_list) else: return data_list[0]
[docs] def get_entropy(self, fill_factor_limit: float = None) -> DataFrame: """Returns the (relative) entropy from this data container accumulated during a :ref:`Wang-Landau simulation <wang_landau_ensemble>`. Returns ``None`` if the data container does not contain the required information. Parameters ---------- fill_factor_limit return the entropy recorded up to the point when the specified fill factor limit was reached, or ``None`` if the entropy history is empty or the last fill factor is above the limit; otherwise return the entropy for the last state """ if 'entropy' not in self._last_state: warn('There is no entropy information in the data container.') return None entropy = self._last_state['entropy'] if fill_factor_limit is not None: if 'entropy_history' not in self._last_state or \ len(self._last_state['entropy_history']) == 0: warn('The entropy history is empty.') return None if self._last_state['fill_factor'] > fill_factor_limit: warn('The last fill factor {} is higher than the limit' ' {}.'.format(self.fill_factor, fill_factor_limit)) return None for step, fill_factor in self._last_state['fill_factor_history'].items(): if fill_factor <= fill_factor_limit: entropy = self._last_state['entropy_history'][step] break # compile entropy into DataFrame energy_spacing = self.ensemble_parameters['energy_spacing'] df = DataFrame(data={'energy': energy_spacing * np.array(list(entropy.keys())), 'entropy': np.array(list(entropy.values()))}, index=list(entropy.keys())) # shift entropy for numerical stability df['entropy'] -= np.min(df['entropy']) return df
[docs] def get_histogram(self) -> DataFrame: """Returns the histogram from this data container accumulated since the last update of the fill factor. Returns ``None`` if the data container does not contain the required information. """ if 'histogram' not in self._last_state: return None # compile histogram into DataFrame histogram = self._last_state['histogram'] energy_spacing = self.ensemble_parameters['energy_spacing'] df = DataFrame(data={'energy': energy_spacing * np.array(list(histogram.keys())), 'histogram': np.array(list(histogram.values()))}, index=list(histogram.keys())) return df
[docs] @classmethod # todo: cls and the return should be type hinted as BaseDataContainer. # Unfortunately, this requires from __future__ import annotations, which # in turn requires Python 3.8. def read(cls, infile: Union[str, BinaryIO, TextIO], old_format: bool = False): """Reads data container from file. Parameters ---------- infile file from which to read old_format If true use old json format to read runtime data; default to false Raises ------ FileNotFoundError if file is not found (str) ValueError if file is of incorrect type (not a tarball) """ dc = super(WangLandauDataContainer, cls).read(infile=infile, old_format=old_format) for tag, value in dc._last_state.items(): if tag in ['histogram', 'entropy', 'fill_factor_history', 'entropy_history']: # the following accounts for the fact that the keys of dicts # are converted to str when writing to json and have to # converted back into numerical values dc._last_state[tag] = {} for key, val in value.items(): if isinstance(val, dict): val = {int(k): v for k, v in val.items()} dc._last_state[tag][int(key)] = val return dc
[docs]def get_density_of_states_wl(dcs: Union[WangLandauDataContainer, Dict[Any, WangLandauDataContainer]], fill_factor_limit: float = None) \ -> Tuple[DataFrame, dict]: """Returns a pandas DataFrame with the total density of states from a :ref:`Wang-Landau simulation <wang_landau_ensemble>`. If a dict of data containers is provided the function also returns a dictionary that contains the standard deviation between the entropy of neighboring data containers in the overlap region. These errors should be small compared to the variation of the entropy across each bin. The function can handle both a single data container and a dict thereof. In the latter case the data containers must cover a contiguous energy range and must at least partially overlap. Parameters ---------- dcs data container(s), from which to extract the density of states fill_factor_limit calculate the density of states using the entropy recorded up to the point when the specified fill factor limit was reached; otherwise return the density of states for the last state Raises ------ TypeError if dcs does not correspond to not a single (dictionary) of data container(s) from which the entropy can retrieved ValueError if the data container does not contain entropy information ValueError if a fill factor limit has been provided and the data container either does not contain information about the entropy history or if the last fill factor is higher than the specified limit ValueError if multiple data containers are provided and there are inconsistencies with regard to basic simulation parameters such as system size or energy spacing ValueError if multiple data containers are provided and there is at least one energy region without overlap """ # preparations if isinstance(dcs, WangLandauDataContainer): # fetch raw entropy data from data container df = dcs.get_entropy(fill_factor_limit) if df is None: raise ValueError('Entropy information could not be retrieved from' ' the data container {}.'.format(dcs)) errors = None if len(dcs.fill_factor_history) == 0 or dcs.fill_factor > 1e-4: warn('The data container appears to contain data from an' ' underconverged Wang-Landau simulation.') elif isinstance(dcs, dict) and isinstance(dcs[next(iter(dcs))], WangLandauDataContainer): # minimal consistency checks tags = list(dcs.keys()) tagref = tags[0] dcref = dcs[tagref] for tag in tags: dc = dcs[tag] if len(dc.structure) != len(dcref.structure): raise ValueError('Number of atoms differs between data containers ({}: {}, {}: {})' .format(tagref, dcref.ensemble_parameters['n_atoms'], tag, dc.ensemble_parameters['n_atoms'])) for param in ['energy_spacing', 'trial_move']: if dc.ensemble_parameters[param] != dcref.ensemble_parameters[param]: raise ValueError('{} differs between data containers ({}: {}, {}: {})' .format(param, tagref, dcref.ensemble_parameters['n_atoms'], tag, dc.ensemble_parameters['n_atoms'])) if len(dc.fill_factor_history) == 0 or dc.fill_factor > 1e-4: warn('Data container {} appears to contain data from an' ' underconverged Wang-Landau simulation.'.format(tag)) # fetch raw entropy data from data containers entropies = {} for tag, dc in dcs.items(): entropies[tag] = dc.get_entropy(fill_factor_limit) if entropies[tag] is None: raise ValueError('Entropy information could not be retrieved' ' from the data container {}.'.format(dc)) # sort entropies by energy entropies = OrderedDict(sorted(entropies.items(), key=lambda row: row[1].energy.iloc[0])) # line up entropy data errors = {} tags = list(entropies.keys()) for tag1, tag2 in zip(tags[:-1], tags[1:]): df1 = entropies[tag1] df2 = entropies[tag2] if all(df2.energy.isin(df1.energy)): warn('Window {} is a subset of {}'.format(tag2, tag1)) left_lim = np.min(df2.energy) right_lim = np.max(df1.energy) if left_lim >= right_lim: raise ValueError('No overlap in the energy range {}...{}.\n' .format(right_lim, left_lim) + ' The closest data containers have tags "{}" and "{}".' .format(tag1, tag2)) df1_ = df1[(df1.energy >= left_lim) & (df1.energy <= right_lim)] df2_ = df2[(df2.energy >= left_lim) & (df2.energy <= right_lim)] offset = (df2_.entropy - df1_.entropy).mean() errors['{}-{}'.format(tag1, tag2)] = (df2_.entropy - df1_.entropy).std() entropies[tag2].entropy = entropies[tag2].entropy - offset # compile entropy over the entire energy range data: Dict[float, float] = {} indices = {} counts = Counter() for df in entropies.values(): for index, en, ent in zip(df.index, df.energy, df.entropy): data[en] = data.get(en, 0) + ent counts[en] += 1 indices[en] = index for en in data: data[en] = data[en] / counts[en] # center entropy to prevent possible numerical issues entmin = np.min(list(data.values())) df = DataFrame(data={'energy': np.array(list(data.keys())), 'entropy': np.array(np.array(list(data.values()))) - entmin}, index=list(indices.values())) else: raise TypeError('dcs ({}) must be a data container with entropy data' ' or be a list of data containers' .format(type(dcs))) # density of states S_max = df.entropy.max() df['density'] = np.exp(df.entropy - S_max) / np.sum(np.exp(df.entropy - S_max)) return df, errors
def _extract_filter_data(dc: BaseDataContainer, columns_to_keep: List[str], fill_factor_limit: float = None) -> DataFrame: """ Extract data from a data container and filter the content. Parameters ---------- dc data container, from which to extract the data columns_to_keep list of requested properties fill_factor_limit only include data recorded up to the point when the specified fill factor limit was reached when computing averages; otherwise include all data """ df = dc.data if fill_factor_limit is not None: # only include data for fill factors up to the limit df_ffh = dc.fill_factor_history.astype( {'mctrial': np.int64, 'fill_factor': np.float64}) mctrial_last = df_ffh.loc[ df_ffh.fill_factor <= fill_factor_limit].mctrial.min() df = df.loc[df.mctrial <= mctrial_last] return df.filter(columns_to_keep)
[docs]def get_average_observables_wl(dcs: Union[WangLandauDataContainer, Dict[Any, WangLandauDataContainer]], temperatures: List[float], observables: List[str] = None, boltzmann_constant: float = kB, fill_factor_limit: float = None) -> DataFrame: """Returns the average and the standard deviation of the energy from a :ref:`Wang-Landau simulation <wang_landau_ensemble>` for the temperatures specified. If the ``observables`` keyword argument is specified the function will also return the mean and standard deviation of the specified observables. Parameters ---------- dcs data container(s), from which to extract density of states as well as observables temperatures temperatures, at which to compute the averages observables observables, for which to compute averages; the observables must refer to fields in the data container boltzmann_constant Boltzmann constant :math:`k_B` in appropriate units, i.e. units that are consistent with the underlying cluster expansion and the temperature units [default: eV/K] fill_factor_limit use data recorded up to the point when the specified fill factor limit was reached when computing averages; otherwise use data for the last state Raises ------ ValueError if the data container(s) do(es) not contain entropy data from Wang-Landau simulation ValueError if data container(s) do(es) not contain requested observable """ def check_observables(dc: WangLandauDataContainer, observables: Optional[List[str]]) -> None: """ Helper function that checks that observables are available in data frame. """ if observables is None: return for obs in observables: if obs not in dc.data.columns: raise ValueError('Observable ({}) not in data container.\n' 'Available observables: {}'.format(obs, dc.data.columns)) # preparation of observables columns_to_keep = ['potential', 'density'] if observables is not None: columns_to_keep.extend(observables) # check that observables are available in data container # and prepare comprehensive data frame with relevant information if isinstance(dcs, WangLandauDataContainer): check_observables(dcs, observables) df_combined = _extract_filter_data(dcs, columns_to_keep, fill_factor_limit) dcref = dcs elif isinstance(dcs, dict) and isinstance(dcs[next(iter(dcs))], WangLandauDataContainer): dfs = [] for dc in dcs.values(): check_observables(dc, observables) dfs.append(_extract_filter_data(dc, columns_to_keep, fill_factor_limit)) df_combined = pd_concat([df for df in dfs], ignore_index=True) dcref = list(dcs.values())[0] else: raise TypeError('dcs ({}) must be a data container with entropy data' ' or be a list of data containers' .format(type(dcs))) # fetch entropy and density of states from data container(s) df_density, _ = get_density_of_states_wl(dcs, fill_factor_limit) # compute density for each row in data container if observable averages # are to be computed if observables is not None: energy_spacing = dcref.ensemble_parameters['energy_spacing'] # NOTE: we rely on the indices of the df_density DataFrame to # correspond to the energy scale! This is expected to be handled in # the get_density_of_states function. bins = list(np.array(np.round(df_combined.potential / energy_spacing), dtype=int)) data_density = [dens / bins.count(k) for k, dens in df_density.density[bins].items()] enref = np.min(df_density.energy) averages = [] for temperature in temperatures: # mean and standard deviation of energy boltz = np.exp(- (df_density.energy - enref) / temperature / boltzmann_constant) sumint = np.sum(df_density.density * boltz) en_mean = np.sum(df_density.energy * df_density.density * boltz) / sumint en_std = np.sum(df_density.energy ** 2 * df_density.density * boltz) / sumint en_std = np.sqrt(en_std - en_mean ** 2) record = {'temperature': temperature, 'potential_mean': en_mean, 'potential_std': en_std} # mean and standard deviation of other observables if observables is not None: boltz = np.exp(- (df_combined.potential - enref) / temperature / boltzmann_constant) sumint = np.sum(data_density * boltz) for obs in observables: obs_mean = np.sum(data_density * boltz * df_combined[obs]) / sumint obs_std = np.sum(data_density * boltz * df_combined[obs] ** 2) / sumint obs_std = np.sqrt(obs_std - obs_mean ** 2) record['{}_mean'.format(obs)] = obs_mean record['{}_std'.format(obs)] = obs_std averages.append(record) return DataFrame.from_dict(averages)
[docs]def get_average_cluster_vectors_wl(dcs: Union[WangLandauDataContainer, dict], cluster_space: ClusterSpace, temperatures: List[float], boltzmann_constant: float = kB, fill_factor_limit: float = None) -> DataFrame: """Returns the average cluster vectors from a :ref:`Wang-Landau simulation <wang_landau_ensemble>` for the temperatures specified. Parameters ---------- dcs data container(s), from which to extract density of states as well as observables cluster_space cluster space to use for calculation of cluster vectors temperatures temperatures, at which to compute the averages boltzmann_constant Boltzmann constant :math:`k_B` in appropriate units, i.e. units that are consistent with the underlying cluster expansion and the temperature units [default: eV/K] fill_factor_limit use data recorded up to the point when the specified fill factor limit was reached when computing the average cluster vectors; otherwise use data for the last state Raises ------ ValueError if the data container(s) do(es) not contain entropy data from Wang-Landau simulation """ # fetch potential and structures if isinstance(dcs, WangLandauDataContainer): potential, trajectory = dcs.get('potential', 'trajectory', fill_factor_limit=fill_factor_limit) energy_spacing = dcs.ensemble_parameters['energy_spacing'] elif isinstance(dcs, dict) and isinstance(dcs[next(iter(dcs))], WangLandauDataContainer): potential, trajectory = [], [] for dc in dcs.values(): p, t = dc.get('potential', 'trajectory', fill_factor_limit=fill_factor_limit) potential.extend(p) trajectory.extend(t) energy_spacing = list(dcs.values())[0].ensemble_parameters['energy_spacing'] potential = np.array(potential) else: raise TypeError('dcs ({}) must be a data container with entropy data' ' or be a list of data containers' .format(type(dcs))) # fetch entropy and density of states from data container(s) df_density, _ = get_density_of_states_wl(dcs, fill_factor_limit) # compute weighted density and cluster vector for each bin in energy # range; the weighted density is the total density divided by the number # of structures that fall in the respective bin # NOTE: the following code relies on the indices of the df_density # DataFrame to correspond to the energy scale. This is expected to be # handled in the get_density_of_states function. cvs = [] weighted_density = [] bins = list(np.array(np.round(potential / energy_spacing), dtype=int)) for k, structure in zip(bins, trajectory): cvs.append(cluster_space.get_cluster_vector(structure)) weighted_density.append(df_density.density[k] / bins.count(k)) # compute mean and standard deviation (std) of temperature weighted # cluster vector averages = [] enref = np.min(potential) for temperature in temperatures: boltz = np.exp(- (potential - enref) / temperature / boltzmann_constant) sumint = np.sum(weighted_density * boltz) cv_mean = np.array([np.sum(weighted_density * boltz * cv) / sumint for cv in np.transpose(cvs)]) cv_std = np.array([np.sum(weighted_density * boltz * cv ** 2) / sumint for cv in np.transpose(cvs)]) cv_std = np.sqrt(cv_std - cv_mean ** 2) record = {'temperature': temperature, 'cv_mean': cv_mean, 'cv_std': cv_std} averages.append(record) return DataFrame.from_dict(averages)