# Source code for mchammer.ensembles.wang_landau_ensemble

"""Definition of the Wang-Landau algorithm class."""

import random

from collections import OrderedDict
from typing import BinaryIO, Dict, List, TextIO, Union

import numpy as np

from ase import Atoms
from pandas import DataFrame

from .. import WangLandauDataContainer
from ..calculators.base_calculator import BaseCalculator
from .thermodynamic_base_ensemble import BaseEnsemble

[docs]class WangLandauEnsemble(BaseEnsemble):
"""Instances of this class allow one to sample a system using the
Wang-Landau (WL) algorithm, see Phys. Rev. Lett. **86**, 2050
(2001) [WanLan01a]_. The WL algorithm enables one to acquire the
density of states (DOS) as a function of energy, from which one
can readily calculate many thermodynamic observables as a function
of temperature. To this end, the WL algorithm accumulates both the
microcanonical entropy :math:S(E) and a histogram :math:H(E)
on an energy grid with a predefined spacing (energy_spacing).

The algorithm is initialized as follows.

#. Generate an initial configuration.
#. Initialize counters for the microcanonical entropy
:math:S(E) and the histogram :math:H(E) to zero.
#. Set the fill factor :math:f=1.

It then proceeds as follows.

#. Propose a new configuration (see trial_move).
#. Accept or reject the new configuration with probability

.. math::

P = \\min \\{ 1, \\, \\exp [ S(E_\\mathrm{new}) - S(E_\\mathrm{cur}) ] \\},

where :math:E_\\mathrm{cur} and :math:E_\\mathrm{new} are the
energies of the current and new configurations, respectively.
#. Update the microcanonical entropy :math:S(E)\\leftarrow S(E) + f
and histogram :math:H(E) \\leftarrow H(E) + 1 where
:math:E is the energy of the system at the end of the move.
#. Check the flatness of the histogram :math:H(E). If
:math:H(E) > \\chi \\langle H(E)\\rangle\\,\\forall E reset the histogram
:math:H(E) = 0 and reduce the fill factor :math:f \\leftarrow f / 2.
The parameter :math:\\chi is set via flatness_limit.
#. If :math:f is smaller than fill_factor_limit terminate

The microcanonical entropy :math:S(E) and the histogram along
with related information are written to the data container every
time :math:f is updated. Using the density :math:\\rho(E) = \\exp S(E)
one can then readily compute various thermodynamic quantities,
including, e.g., the average energy:

.. math::

\\left<E\\right> = \\frac{\\sum_E E \\rho(E) \\exp(-E / k_B T)}{
\\sum_E \\rho(E) \\exp(-E / k_B T)}

Parameters
----------
structure : :class:Atoms <ase.Atoms>
atomic configuration to be used in the Wang-Landau simulation;
also defines the initial occupation vector
calculator : :class:BaseCalculator <mchammer.calculators.ClusterExpansionCalculator>
calculator to be used for calculating potential changes
trial_move : str
One can choose between two different trial moves for
generating new configurations. In a 'swap' move two sites are
selected and their occupations are swapped; in a 'flip' move
one site is selected and its occupation is flipped to a
different species. While 'swap' moves conserve the
concentrations of the species in the system, 'flip' moves
allow one in principle to sample the full composition space.
energy_spacing : float
defines the bin size of the energy grid on which the microcanonical
entropy :math:S(E), and thus the density :math:\\exp S(E), is
evaluated; the spacing should be small enough to capture the features
of the density of states; too small values will, however, render the
convergence very tedious if not impossible
energy_limit_left : float
defines the lower limit of the energy range within which the
microcanonical entropy :math:S(E) will be sampled. By default
(None) no limit is imposed. Setting limits can be useful if only a
part of the density of states is required.
energy_limit_right : float
defines the upper limit of the energy range within which the
microcanonical entropy :math:S(E) will be sampled. By default
(None) no limit is imposed. Setting limits can be useful if only a
part of the density of states is required.
fill_factor_limit : float
If the fill_factor :math:f falls below this value, the
WL sampling loop is terminated.
flatness_check_interval : int
For computational efficiency the flatness condition is only
evaluated every flatness_check_interval-th trial step. By
default (None) flatness_check_interval is set to 1000
times the number of sites in structure, i.e. 1000 Monte
Carlo sweeps.
flatness_limit : float
The histogram :math:H(E) is deemed sufficiently flat if
:math:H(E) > \\chi \\left<H(E)\\right>\\,\\forall
E. flatness_limit sets the parameter :math:\\chi.
user_tag : str
human-readable tag for ensemble [default: None]
dc_filename : str
name of file the data container associated with the ensemble
will be written to; if the file exists it will be read, the
data container will be appended, and the file will be
updated/overwritten
random_seed : int
seed for the random number generator used in the Monte Carlo
simulation
ensemble_data_write_interval : int
interval at which data is written to the data container; this
includes for example the current value of the calculator
(i.e. usually the energy) as well as ensembles specific fields
such as temperature or the number of atoms of different species
data_container_write_period : float
period in units of seconds at which the data container is
written to file; writing periodically to file provides both
a way to examine the progress of the simulation and to back up
the data [default: np.inf]
trajectory_write_interval : int
interval at which the current occupation vector of the atomic
configuration is written to the data container.
sublattice_probabilities : List[float]
probability for picking a sublattice when doing a random swap.
The list must contain as many elements as there are sublattices
and it needs to sum up to 1.

Example
-------
The following snippet illustrates how to carry out a Wang-Landau
simulation. For the purpose of demonstration, the parameters of
the cluster expansion are set to obtain a two-dimensional square
Ising model, one of the systems studied in the original work by
Wang and Landau::

>>> from ase import Atoms
>>> from icet import ClusterExpansion, ClusterSpace
>>> from mchammer.calculators import ClusterExpansionCalculator

>>> # prepare cluster expansion
>>> prim = Atoms('Au', positions=[[0, 0, 0]], cell=[1, 1, 10], pbc=True)
>>> cs = ClusterSpace(prim, cutoffs=[1.1], chemical_symbols=['Ag', 'Au'])
>>> ce = ClusterExpansion(cs, [0, 0, 2])

>>> # prepare initial configuration
>>> structure = prim.repeat((4, 4, 1))
>>> for k in range(8):
...     structure[k].symbol = 'Ag'

>>> # set up and run Wang-Landau simulation
>>> calculator = ClusterExpansionCalculator(structure, ce)
>>> mc = WangLandauEnsemble(structure=structure,
...                         calculator=calculator,
...                         energy_spacing=1,
...                         dc_filename='ising_2d_run.dc')
>>> mc.run(number_of_trial_steps=len(structure)*1000)  # in practice one requires more steps

"""

def __init__(self,
structure: Atoms,
calculator: BaseCalculator,
energy_spacing: float,
energy_limit_left: float = None,
energy_limit_right: float = None,
trial_move: str = 'swap',
fill_factor_limit: float = 1e-6,
flatness_check_interval: int = None,
flatness_limit: float = 0.8,
user_tag: str = None,
dc_filename: str = None,
data_container: str = None,
random_seed: int = None,
data_container_write_period: float = 600,
ensemble_data_write_interval: int = None,
trajectory_write_interval: int = None,
sublattice_probabilities: List[float] = None) -> None:

# set trial move
if trial_move == 'swap':
self.do_move = self._do_swap
self._get_sublattice_probabilities = self._get_swap_sublattice_probabilities
elif trial_move == 'flip':
self.do_move = self._do_flip
self._get_sublattice_probabilities = self._get_flip_sublattice_probabilities
else:
raise ValueError('Invalid value for trial_move: {}.'
' Must be either "swap" or "flip".'.format(trial_move))

# set default values that are system dependent
if flatness_check_interval is None:
flatness_check_interval = len(structure) * 1e3

# parameters pertaining to construction of entropy and histogram
self._energy_spacing = energy_spacing
self._fill_factor_limit = fill_factor_limit
self._flatness_check_interval = flatness_check_interval
self._flatness_limit = flatness_limit

# energy window
self._bin_left = self._get_bin_index(energy_limit_left)
self._bin_right = self._get_bin_index(energy_limit_right)
if self._bin_left is not None and \
self._bin_right is not None and self._bin_left >= self._bin_right:
raise ValueError('Invalid energy window: left boundary ({}, {}) must be'
' smaller than right boundary ({}, {})'
.format(energy_limit_left, self._bin_left,
energy_limit_right, self._bin_right))
self._reached_energy_window = self._bin_left is None and self._bin_right is None

# ensemble parameters
self._ensemble_parameters = {}
self._ensemble_parameters['energy_spacing'] = energy_spacing
self._ensemble_parameters['trial_move'] = trial_move
self._ensemble_parameters['energy_limit_left'] = energy_limit_left
self._ensemble_parameters['energy_limit_right'] = energy_limit_right
# The following parameters are _intentionally excluded_ from
# the ensemble_parameters dict as it would prevent users from
# changing their values between restarts. The latter is advantageous
# as these runs can require restarts and possibly parameter adjustments
# to achieve convergence.
#  * fill_factor_limit
#  * flatness_check_interval
#  * flatness_limit

# add species count to ensemble parameters
symbols = set([symbol for sub in calculator.sublattices
for symbol in sub.chemical_symbols])
for symbol in symbols:
key = 'n_atoms_{}'.format(symbol)
count = structure.get_chemical_symbols().count(symbol)
self._ensemble_parameters[key] = count

# the constructor of the parent classes must be called *after*
# the ensemble_parameters dict has been populated
super().__init__(
structure=structure,
calculator=calculator,
user_tag=user_tag,
random_seed=random_seed,
dc_filename=dc_filename,
data_container=data_container,
data_container_class=WangLandauDataContainer,
data_container_write_period=data_container_write_period,
ensemble_data_write_interval=ensemble_data_write_interval,
trajectory_write_interval=trajectory_write_interval)

# handle probabilities for swaps on different sublattices
if sublattice_probabilities is None:
self._sublattice_probabilities = self._get_sublattice_probabilities()
else:
self._sublattice_probabilities = sublattice_probabilities

# initialize Wang-Landau algorithm; in the case of a restart
# these quantities are read from the data container file; the
# if-conditions prevent these values from being overwritten
self._converged = None
self._potential = self.calculator.calculate_total(
occupations=self.configuration.occupations)
if not hasattr(self, '_fill_factor'):
self._fill_factor = 1
if not hasattr(self, '_fill_factor_history'):
if self._reached_energy_window:
self._fill_factor_history = {self.step: self._fill_factor}
else:
self._fill_factor_history = {}
if not hasattr(self, '_histogram'):
self._histogram = {}
if not hasattr(self, '_entropy'):
self._entropy = {}

@property
def fill_factor(self) -> float:
""" current value of fill factor """
return self._fill_factor

@property
def fill_factor_history(self) -> Dict[int, float]:
"""evolution of the fill factor in the Wang-Landau algorithm (key=MC
trial step, value=fill factor)
"""
return self._fill_factor_history

@property
def converged(self) -> bool:
""" True if convergence has been achieved """
return self._converged

@property
def flatness_limit(self) -> float:
"""The histogram :math:H(E) is deemed sufficiently flat if
:math:H(E) > \\chi \\left<H(E)\\right>\\,\\forall
E where flatness_limit sets the parameter :math:\\chi.
"""
return self._flatness_limit

@flatness_limit.setter
def flatness_limit(self, new_value) -> None:
self._flatness_limit = new_value
self._converged = None

@property
def fill_factor_limit(self) -> float:
""" If the fill_factor :math:f falls below this value, the
Wang-Landau sampling loop is terminated. """
return self._fill_factor_limit

@fill_factor_limit.setter
def fill_factor_limit(self, new_value) -> None:
self._fill_factor_limit = new_value
self._converged = None

@property
def flatness_check_interval(self) -> int:
""" number of MC trial steps between checking the flatness
condition """
return self._flatness_check_interval

@flatness_check_interval.setter
def flatness_check_interval(self, new_value: int) -> None:
self._flatness_check_interval = new_value

[docs]    def get_entropy(self) -> DataFrame:
""" entropy accumulated during Wang-Landau simulation """
df = DataFrame(data={'energy': self._energy_spacing * np.array(list(self._entropy.keys())),
'entropy': np.array(list(self._entropy.values()))},
index=list(self._entropy.keys()))
df['entropy'] -= np.min(df['entropy'])
return df

[docs]    def get_histogram(self) -> DataFrame:
""" histogram accumulated since last update of fill factor """
df = DataFrame(data={'energy': self._energy_spacing *
np.array(list(self._histogram.keys())),
'histogram': np.array(list(self._histogram.values()))},
index=list(self._histogram.keys()))
df['histogram'] -= np.min(df['histogram'])
return df

def _terminate_sampling(self) -> bool:
"""Returns True if the Wang-Landau algorithm has converged. This is
used in the run method implemented of BaseEnsemble to
evaluate whether the sampling loop should be terminated.
"""
return self._converged is True  # N.B.: self._converged can be None

def _restart_ensemble(self):
"""Restarts ensemble using the last state saved in the data container
file. Note that this method does _not_ use the last_state property of
the data container but rather uses the last data written the data frame.
"""
super()._restart_ensemble()
self._fill_factor = self.data_container._last_state['fill_factor']
self._fill_factor_history = self.data_container._last_state['fill_factor_history']
self._histogram = self.data_container._last_state['histogram']
self._entropy = self.data_container._last_state['entropy']

[docs]    def write_data_container(self, outfile: Union[str, BinaryIO, TextIO]):
"""Updates last state of the Wang-Landau simulation and
writes DataContainer to file.

Parameters
----------
outfile
file to which to write
"""
self._data_container._update_last_state(
last_step=self.step,
occupations=self.configuration.occupations.tolist(),
accepted_trials=self._accepted_trials,
random_state=random.getstate(),
fill_factor=self._fill_factor,
fill_factor_history=self._fill_factor_history,
histogram=OrderedDict(sorted(self._histogram.items())),
entropy=OrderedDict(sorted(self._entropy.items())))
self.data_container.write(outfile)

def _acceptance_condition(self, potential_diff: float) -> bool:
"""Evaluates Metropolis acceptance criterion.

Parameters
----------
potential_diff
change in the thermodynamic potential associated
with the trial step
"""

# acceptance/rejection step
bin_cur = self._get_bin_index(self._potential)
bin_new = self._get_bin_index(self._potential + potential_diff)
if self._allow_move(bin_cur, bin_new):
S_cur = self._entropy.get(bin_cur, 0)
S_new = self._entropy.get(bin_new, 0)
delta = np.exp(S_cur - S_new)
if delta >= 1 or delta >= self._next_random_number():
accept = True
self._potential += potential_diff
bin_cur = bin_new
else:
accept = False
else:
accept = False

if not self._reached_energy_window:
# check whether the target energy window has been reached
self._reached_energy_window = self._inside_energy_window(bin_cur)
# if the target window has been reached remove unused bins
# from histogram and entropy counters
if self._reached_energy_window:
self._fill_factor_history[self.step] = self._fill_factor
# flush data from data container except for initial step
self._data_container._data_list = [self._data_container._data_list]
self._entropy = {k: self._entropy[k]
for k in self._entropy if self._inside_energy_window(k)}
self._histogram = {k: self._histogram[k]
for k in self._histogram if self._inside_energy_window(k)}

# update histograms and entropy counters
self._update_entropy(bin_cur)

return accept

def _update_entropy(self, bin_cur: int) -> None:
"""Updates counters for histogram and entropy, checks histogram
flatness, and updates fill factor if indicated.
"""

# update histogram and entropy
self._entropy[bin_cur] = self._entropy.get(bin_cur, 0) + self._fill_factor
self._histogram[bin_cur] = self._histogram.get(bin_cur, 0) + 1

# check flatness of histogram
if self.step % self._flatness_check_interval == 0 and \
self.step > 0 and self._reached_energy_window:

# shift entropy counter in order to avoid overflow
entropy_ref = np.min(list(self._entropy.values()))
for k in self._entropy:
self._entropy[k] -= entropy_ref

histogram = np.array(list(self._histogram.values()))
limit = self._flatness_limit * np.average(histogram)
if np.all(histogram >= limit):

# check whether the Wang-Landau algorithm has converged
self._converged = (self._fill_factor <= self._fill_factor_limit)
if not self._converged:
# update fill factor and reset histogram
self._fill_factor /= 2
self._fill_factor_history[self.step] = self._fill_factor
self._histogram = dict.fromkeys(self._histogram, 0)

def _get_bin_index(self, energy: float) -> int:
""" Returns bin index for histogram and entropy dictionaries. """
if energy is None or np.isnan(energy):
return None
return int(np.around(energy / self._energy_spacing))

def _allow_move(self, bin_cur: int, bin_new: int) -> bool:
"""Returns True if the current move is to be included in the
accumulation of histogram and entropy. This logic has been
moved into a separate function in order to enhance
"""
if self._bin_left is None and self._bin_right is None:
# no limits on energy window
return True
if self._bin_left is not None:
if bin_cur < self._bin_left:
# not yet in window (left limit)
return True
if bin_new < self._bin_left:
# imposing left limit
return False
if self._bin_right is not None:
if bin_cur > self._bin_right:
# not yet in window (right limit)
return True
if bin_new > self._bin_right:
# imposing right limit
return False
return True

def _inside_energy_window(self, bin_k: int) -> bool:
"""Returns True if bin_k is inside the energy window specified for
this simulation.
"""
if self._bin_left is not None and bin_k < self._bin_left:
return False
if self._bin_right is not None and bin_k > self._bin_right:
return False
return True

def _do_trial_step(self):
""" Carries out one Monte Carlo trial step. """
sublattice_index = self.get_random_sublattice_index(self._sublattice_probabilities)
return self.do_move(sublattice_index=sublattice_index)

def _do_swap(self, sublattice_index: int, allowed_species: List[int] = None) -> int:
"""Carries out a Monte Carlo trial that involves swapping the species
on two sites. This method has been copied from
ThermodynamicBaseEnsemble.

Parameters
---------
sublattice_index
the sublattice the swap will act on
allowed_species
list of atomic numbers for allowed species

Returns
-------
Returns 1 or 0 depending on if trial move was accepted or rejected
"""
sites, species = self.configuration.get_swapped_state(sublattice_index, allowed_species)
potential_diff = self._get_property_change(sites, species)
if self._acceptance_condition(potential_diff):
self.update_occupations(sites, species)
return 1
return 0

def _do_flip(self, sublattice_index: int, allowed_species: List[int] = None) -> int:
"""Carries out one Monte Carlo trial step that involves flipping the
species on one site. This method has been adapted from
ThermodynamicBaseEnsemble.

Parameters
---------
sublattice_index
the sublattice the flip will act on
allowed_species
list of atomic numbers for allowed species

Returns
-------
Returns 1 or 0 depending on if trial move was accepted or rejected

"""
index, species = self.configuration.get_flip_state(sublattice_index, allowed_species)
potential_diff = self._get_property_change([index], [species])
if self._acceptance_condition(potential_diff):
self.update_occupations([index], [species])
return 1
return 0

def _get_swap_sublattice_probabilities(self) -> List[float]:
"""Returns sublattice probabilities suitable for swaps. This method
has been copied without modification from ThermodynamicBaseEnsemble.
"""
sublattice_probabilities = []
for i, sl in enumerate(self.sublattices):
if self.configuration.is_swap_possible(i):
sublattice_probabilities.append(len(sl.indices))
else:
sublattice_probabilities.append(0)
norm = sum(sublattice_probabilities)
if norm == 0:
raise ValueError('No swaps are possible on any of the active sublattices.')
sublattice_probabilities = [p / norm for p in sublattice_probabilities]
return sublattice_probabilities

def _get_flip_sublattice_probabilities(self) -> List[float]:
"""Returns the default sublattice probability which is based on the
sizes of a sublattice. This method has been copied without
modification from ThermodynamicBaseEnsemble.
"""
sublattice_probabilities = []
for _, sl in enumerate(self.sublattices):
if len(sl.chemical_symbols) > 1:
sublattice_probabilities.append(len(sl.indices))
else:
sublattice_probabilities.append(0)
norm = sum(sublattice_probabilities)
sublattice_probabilities = [p / norm for p in sublattice_probabilities]
return sublattice_probabilities