# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
The setup for a simulation with the aTTN or TTN network bringing together all
pieces, e.g., the model, and the observables.
"""
# pylint: disable=too-many-lines, too-many-locals
import hashlib
import json
import logging
import multiprocessing as mprc
import os
import os.path
import shutil
import sys
from copy import deepcopy
from pathlib import Path
from time import sleep
from warnings import warn
import h5py
import numpy as np
from qtealeaves import __version__
from qtealeaves.abstracttns.abstract_tn import _AbstractTN
from qtealeaves.emulator.attn_simulator import ATTN
from qtealeaves.emulator.lptn_simulator import LPTN
from qtealeaves.emulator.state_simulator import StateVector
from qtealeaves.modeling import LocalTerm
from qtealeaves.tensors import TensorBackend
from qtealeaves.tooling import QteaJsonEncoder, QTeaLeavesError
from qtealeaves.tooling.parameterized import _ParameterizedClass
from .ed_simulation import run_ed_simulation
from .observables_processing import (
read_hdf5_recursive,
reshape_observable_timewise,
write_hdf5_recursive,
)
from .tn_simulation import TN_TYPE, run_tn_simulation
__all__ = ["QuantumGreenTeaSimulation", "ATTNSimulation", "DynamicsQuench"]
logger = logging.getLogger(__name__)
PYTHON_ED_BACKENDS = [0]
PYTHON_TN_BACKENDS = [1, 2, 3, 4, 5, 6, 7, 8, 9]
PYTHON_BACKENDS = PYTHON_ED_BACKENDS + PYTHON_TN_BACKENDS
class DisentanglerPositionException(Exception):
"""
Class of exceptions raised when aTTN disentanglers are
placed on the invalid position. Raised within the
`check_disentangler_position` function.
"""
# pylint: disable-next=too-many-instance-attributes
[docs]
class QuantumGreenTeaSimulation(_ParameterizedClass):
"""
Simulation class containing the model, the operators, the convergence
settings, and the observables.
**Arguments**
model : instance of :class:`QuantumModel`
Defines the model, i.e., the Hamiltonian of the system.
operators : instance of :class:`TNOperators`
Defines the operators used in the model and the observables.
convergence : instance of :class:`TNConvergenceParameters`
Specifies the convergence parameters for the simulation,
for example the bond dimension.
observables : instance of :class:`TNObservables`
Defines which observables to measure in the simulation. For details,
check the specific observable class.
folder_name_output : str, or callable; optional
Specifies the folder where the output is stored. Will be created
if not existing. Path can contain subfolders and can be parametrized via
`params` dictionary.
Default to `output`.
tn_type : int | type, optional
Specifies which tensor network ansatz is used. The options are `5` for TTN,
`6` for MPS, `7` for aTTN, `8` for TTOs, `9` for LPTNs. `0` is
reserved for ED simulations.
Passing a type, allows to pass custom TN classes inherited from the
Quantum TEA :class:`_AbstractTN`; this supports the DMRG kind of working
flow, but has its limits when encountering more tricky parts like
observables or excited state searches.
Default to `6`.
py_tensor_backend : :class:`TensorBackend`
Setup for tensor backend in python - here define which tensor class,
device, and dtype to use. For now, has to be compatible with the
`tensor_backend` option of simulation configuration.
Default to `None` (resulting in `TensorBackend()`).
mpo_mode : int, optional
Choosing the MPO representation inside the simulation, where
the value `-1` enables an auto-selection (falls to iTPO).
The other values are TPO via iTPO (`0`), iTPO (`1`), iTPO with
update (`2`, for quenches), iTPO with compression (`3`), and sparse
MPO (`10`, partially enabled), indexed sparse MPO (`11`, partially enabled)
Default to `-1`.
disentangler : None or np.ndarray, optional
If the aTTN is used, the disentangler positions can be passed
as a 2d numpy array with n rows and 2 columns, where n is the
number of disentanglers. The indices must be passed as python
indices starting from 0 in the mapped 1d system. The other option is
to set it to None, in that case the disentangler positions are automatically
selected if the tensor network is aTTN.
Default to None.
has_log_file : bool, optional
Flag if log file should be created instead of writing
the output to standard out. If False, all output will
go to standard out.
Default to False (no log file)
store_checkpoints : bool, optional
If True, the state will be written to a file after each
sweep in the ground state search and after each time step
in a time evolution. Ground state searches have the option
to write checkpoints also after each tensor optimization,
which the algorithm can decide on without control by the
user. If `True`, the `delete_existing_folder` argument in
``QuantumGreenTeaSimulation.run()`` must be `False`.
Default to True.
tensor_backend : None, optional
(Deprecated argument. Any input will be ignored.)
Default to `None`.
**Reserved keywords of the simulation dictionary**
The following keywords cannot be used for adapting simulations to the
needs by the user, but have a predefined meaning. Additional predefined
keywords may be imported, e.g., when using a specific model or a specific
set of operators.
* `continue_file` : load file at the beginning of the simulation.
* `use_mpo_from_continue_file` : bool (Python only). Use MPO stored
in continue file.
* `seed` : seed for random number generators.
* `SymmetrySector`
* `SymmetryGenerators`
* `SymmetryTypes`
* `Quenches`
* `ed_sparse` : bool, use in exact state emulator to
switch between sparse and dense Hamiltonian.
* `sweep_order` : defines the sweep order for the statics (python only)
* `exclude_from_hash` : dictionary entries that will be excluded from hash.
* `num_excited_states` : number of excited states to search for.
* `start_dynamics_from` : which state to use for the dynamics.
**Details**
1) Checkpoints vs continue files: checkpointing is used to restart the same
simulation. Continue files can be used to set the input state across
simulations, but the simulation will start always at the beginning with
this user-provided input state.
2) Hashing: hashes are generated to identify that a simulation setup did
not change. For example, changes in a simulation which uses checkpoints
can be detected this way.
"""
# pylint: disable-next=too-many-arguments
def __init__(
self,
model,
operators,
convergence,
observables,
folder_name_output="output",
file_name_input="TTN.in",
tn_type: int | type = 6,
py_tensor_backend: TensorBackend | None = None,
mpo_mode=-1,
disentangler=None,
has_log_file=False,
store_checkpoints=True,
tensor_backend=None,
):
if py_tensor_backend is None:
# prevents dangerous default value similar to dict/list
py_tensor_backend = TensorBackend()
if tensor_backend is not None:
warn(
"tensor_backend is inferred by the py_tensor_backend."
"This parameter is deprecated and is ignored."
)
self.model = model
self.operators = operators
self.convergence = convergence
self.observables = observables
self.file_name_input = file_name_input
self.folder_name_output = folder_name_output
self.version_input_processor = 3
self.tn_type = tn_type
self.mpo_mode = mpo_mode
self.disentangler = disentangler
self.disentangler_eval = None
self.has_log_file = has_log_file
self.store_checkpoints = store_checkpoints
self.py_tensor_backend = py_tensor_backend
[docs]
@staticmethod
def checkpoint_params_hash(params):
"""Generate hash of a simulation to identify if checkpointed simulation."""
exclude_from_hash = params.get("exclude_from_hash", [])
if len(exclude_from_hash) == 0:
dict_for_hash = params
else:
dict_for_hash = {}
for key, value in params.items():
if key in exclude_from_hash:
continue
dict_for_hash[key] = value
params_str = json.dumps(dict_for_hash, sort_keys=True, cls=QteaJsonEncoder)
return hashlib.sha256(params_str.encode("utf-8")).hexdigest()
[docs]
def get_groundstate_energy(self, params):
"""
(DEPRECATED) Rudimentary way to extract the ground state energy via the
convergence file.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
**Details**
Use ``get_static_obs`` in future.
"""
warn("`get_groundstate_energy` is deprecated.")
num_trajectories = self.observables.get_num_trajectories(params=params)
if num_trajectories > 1:
raise QTeaLeavesError(
"`get_groundstate_energy` not available for quantum trajectories."
)
folder_name_output = self.eval_str_param(self.folder_name_output, params)
line = None
# pylint: disable-next=no-member
with open(os.path.join(folder_name_output, "convergence.log"), "r") as fh:
for line in fh:
pass
if line is None:
raise QTeaLeavesError("Reading ground state energy from empty file.")
return float(line.split()[1])
def _get_static_obs_from_raw_files(self, params: dict):
"""
Return a dictionary with all observables of the static simulation,
e.g., the ground state search. For evolutions in temperature,
i.e., LPTNs, a list of dictionaries is returned.
This function reads the raw files written during the simulation.
It is meant to be used internally, use `get_static_obs` instead.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
"""
folder_name_output = self.eval_str_param(self.folder_name_output, params)
num_traj = self.observables.get_num_trajectories(params=params)
# utility to separate trajectories from other data
def _separate_trajectories_inplace(res: dict):
if num_traj > 1:
trajectories = {}
for k in range(num_traj):
trajectories[k] = res[("trajectory", k)]
res.pop(("trajectory", k))
res["trajectories"] = trajectories
ansatz_cls = self.get_ansatz_cls(params)
if issubclass(ansatz_cls, LPTN):
# LPTN statics have multiple measurements in temperature
static_obs_list = []
max_iter = self.convergence.eval_numeric_param(
self.convergence.max_iter, params
)
meas_freq = self.convergence.eval_numeric_param(
self.convergence.measure_obs_every_n_iter, params
)
for ii in range(max_iter):
if (ii + 1 != max_iter) and ((ii + 1) % meas_freq != 0):
# no measurement took place
continue
fname = "static_obs_%08d.h5" % (ii + 1)
obs_dict = self.observables.read(fname, folder_name_output, params)
static_obs_list.append(obs_dict)
return dict(enumerate(static_obs_list))
# Simulations with one final measurement at the end of the statics
results = {}
results[0] = self.observables.read("static_obs.h5", folder_name_output, params)
_separate_trajectories_inplace(results[0])
num_excited_states = params.get("num_excited_states", 0)
if num_excited_states > 0:
# If we have excited states, read these as well.
# In this case, the groundstate is stored with key 0.
for excited_ii in range(1, params["num_excited_states"] + 1):
excited_fname = f"static_obs_excited{excited_ii:03d}.h5"
total_path = os.path.join(folder_name_output, excited_fname)
if os.path.isfile(total_path):
# One scenario is that the job fails after finding a few,
# but not all excited states. We still want to be able to parse such results.
results[excited_ii] = self.observables.read(
excited_fname, folder_name_output, params
)
_separate_trajectories_inplace(results[excited_ii])
return results
# pylint: disable-next=too-many-branches
def _get_dynamic_obs_from_raw_files(self, params: dict):
"""
Return dictionaries with all observables of the static simulation,
e.g., the dynamics. The first level of the dictionary is
iterating over the quenches; the inner dictionaries collect data
for each observable as time series.
This function reads the raw files written during the simulation.
It is meant to be used internally, use `get_dynamic_obs` instead.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
"""
folder_name_output = self.eval_str_param(self.folder_name_output, params)
if "Quenches" not in params:
return {}
all_results = []
idx = 0
for _, quench in enumerate(params["Quenches"]): # looping through quenches
quench_results = []
warning_count = 0
# According to the convention in iter_params_dts we have
# three cases
#
# * dt > 0: step belonging to the time evolution
# * dt == 0: measurement
# * dt < 0: skipped measurement (nothing to read)
for dt in quench.iter_params_dts(params):
if isinstance(dt, complex):
# Evolution step for sure, no measurement
continue
if dt > 0.0:
# Evolution step, no measurement
continue
idx += 1
if np.isclose(dt, 0.0):
# Only dt == 0 are measurements by convention - time
# evolution step treated with continue in 1st if-case
file_name = "dyn_obs%08d_%08d.h5" % (1, idx)
# Try to read, and warn if the file does not exist.
# Do not warn more than three times for each quench.
# Cannot test with a single `os.path.isfile` as
# observables might be split into trajectories.
try:
quench_results.append(
self.observables.read(file_name, folder_name_output, params)
)
except FileNotFoundError: # handle missing file
if warning_count < 3:
warn(
f"File {folder_name_output}/{file_name} not found. Skipping."
)
if warning_count == 3:
warn("Further warnings will be suppressed.")
warning_count += 1
# If the folder is missing append None
# instead of the result dictionary.
quench_results.append(None)
# extract trajectories if existing
trajectories = {}
num_traj = self.observables.get_num_trajectories(params=params)
if num_traj > 1:
for traj_id in range(num_traj):
traj_results = []
for timestep_result in quench_results:
if timestep_result is None:
traj_results.append(None)
else:
traj_results.append(
timestep_result.get(("trajectory", traj_id), None)
)
if traj_results[-1] is None:
warn(f"Missing data for trajectory {traj_id} at time step.")
else:
timestep_result.pop(("trajectory", traj_id))
# Reshape results to have time series per observable key
trajectories[traj_id] = reshape_observable_timewise(traj_results)
trajectories[traj_id]["seed"] = trajectories[traj_id]["seed"][0]
trajectories[traj_id]["trajectory_id"] = traj_id
# Reshape results to have time series per observable key
# for all non-trajectory data
qres = reshape_observable_timewise(quench_results)
qres["trajectories"] = trajectories
all_results.append(qres)
return dict(enumerate(all_results))
[docs]
def get_static_obs_all(self, params: dict, use_collected_file: bool = True) -> dict:
"""
Return a dictionary with all observables of the static simulation.
The ground state and all excited states are returned as inner dictionaries
with the state index as key (0 for ground state, 1 for first excited state, etc.).
For evolutions in temperature (LPTNs) the index corresponds to temperatures.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
use_collected_file : bool, optional
If True, read data from the HDF5 observables file. If the file is not
found, it falls back to reading the raw file structure.
Default to True.
"""
if use_collected_file:
folder_name_output = self.eval_str_param(self.folder_name_output, params)
filename = os.path.join(folder_name_output, "observables.h5")
if os.path.isfile(filename):
with h5py.File(filename, "r") as fh:
return read_hdf5_recursive(fh["static"])
else:
logger.debug(
"Processed file %s not found, falling back to raw files.", filename
)
return self._get_static_obs_from_raw_files(params)
[docs]
def get_static_obs(
self, params: dict, index: int = 0, use_collected_file: bool = True
) -> dict:
"""
Return a dictionary with the observables of the static simulation
for a specific state index (0 for ground state, >0 for excited states).
For evolutions in temperature (LPTNs) the index corresponds to temperatures.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
index : int, optional
The index of the state to return. 0 for ground state,
1 for first excited state, etc. For LPTNs, the integer
index corresponds to temperature.
Default to 0.
use_collected_file : bool, optional
If True, read data from the HDF5 observables file. If the file is not
found, it falls back to reading the raw file structure.
Default to True.
"""
assert index >= 0, "Index must be non-negative."
all_obs = self.get_static_obs_all(
params=params, use_collected_file=use_collected_file
)
if index not in all_obs:
raise QTeaLeavesError(
f"Requested static observable index {index} not found."
)
return all_obs[index]
[docs]
def get_dynamic_obs_all(
self, params: dict, use_collected_file: bool = True
) -> dict:
"""
Return dictionaries with all observables of the dynamic simulation.
The first level of the dictionary is iterating over the quenches;
the inner dictionaries collect data for each observable as time series.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
use_collected_file : bool, optional
If True, read data from the HDF5 observables file. If the file is not
found, it falls back to reading the raw file structure.
Default to True.
"""
if use_collected_file:
folder_name_output = self.eval_str_param(self.folder_name_output, params)
filename = os.path.join(folder_name_output, "observables.h5")
if os.path.isfile(filename):
with h5py.File(filename, "r") as fh:
return read_hdf5_recursive(fh["dynamic"])
else:
logger.debug(
"Processed file %s not found, falling back to raw files.", filename
)
return self._get_dynamic_obs_from_raw_files(params)
[docs]
def get_dynamic_obs(
self, params: dict, quench_index: int = 0, use_collected_file: bool = True
) -> dict:
"""
Return a dictionary with the observables of the dynamic simulation
for a specific quench index. The output dictionary collects data
for each observable as time series.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
quench_index : int, optional
The index of the quench to return.
Default to 0.
use_collected_file : bool, optional
If True, read data from the HDF5 observables file. If the file is not
found, it falls back to reading the raw file structure.
Default to True.
"""
assert quench_index >= 0, "Quench index must be non-negative."
all_obs = self.get_dynamic_obs_all(
params=params, use_collected_file=use_collected_file
)
if quench_index not in all_obs:
raise QTeaLeavesError(
f"Requested dynamic observable index {quench_index} not found."
)
return all_obs[quench_index]
[docs]
def collect_observables(
self,
params: dict,
output_file: None | str = None,
delete_raw_files: bool = False,
forbid_overwrite: bool = True,
) -> None:
"""
Collect all observables into a single HDF5 file.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
output_file : None or str, optional
If None, the output file is named `observables.h5`
in the output folder. If a string is provided, it
is used as the output filename.
Default to None.
delete_raw_files : bool, optional
If True, delete the raw files after collection.
Default to False.
forbid_overwrite : bool, optional
If True, and the output file already exists,
the collection is skipped.
Default to True.
"""
folder_name_output = self.eval_str_param(self.folder_name_output, params)
if output_file is None:
filename = os.path.join(folder_name_output, "observables.h5")
else:
assert isinstance(output_file, str), "output_file must be a string"
filename = output_file
# handle existing file case
if os.path.isfile(filename):
if forbid_overwrite:
warn(f"Output file {filename} already exists. Skipping collection.")
return
os.remove(filename)
current_sim_hash = self.checkpoint_params_hash(params)
static_obs = self.get_static_obs_all(params, use_collected_file=False)
dynamic_obs = self.get_dynamic_obs_all(params, use_collected_file=False)
with h5py.File(filename, "w") as f:
# writing metadata
f.create_dataset(
"tea-version", data=[str(__version__)], dtype=h5py.string_dtype()
)
f.create_dataset("sim_hash", data=current_sim_hash)
# add static and dynamic observables
write_hdf5_recursive(f.create_group("static"), static_obs)
write_hdf5_recursive(f.create_group("dynamic"), dynamic_obs)
if delete_raw_files:
# delete raw files in simulation folder
for fname in os.listdir(folder_name_output):
# delete only h5 files that are not the collected one
if fname != "observables.h5" and fname.endswith(".h5"):
os.remove(os.path.join(folder_name_output, fname))
return
[docs]
def extend_params(self, params):
"""
Extend the parameters list and if the number of quantum trajectories
is > 1, add to params the trajectory id and seed.
**Arguments**
params : dict or list of dicts
The parameter dictionary or dictionaries, which is required
to obtain the output folder.
"""
if isinstance(params, dict):
# convert params into a list
params = [params]
params_ext = []
for elem in params:
num_trajectories = self.observables.get_num_trajectories(params=elem)
if num_trajectories == 1:
params_ext.append(elem)
else:
for ii in range(num_trajectories):
tmp = deepcopy(elem)
tmp["trajectory_id"] = self.observables.get_id_trajectories(ii)
tmp["seed"] = self.observables.get_seed_trajectories(tmp)
params_ext.append(tmp)
return params_ext
[docs]
def get_ansatz_cls(self, params: dict) -> type[_AbstractTN]:
"""Get and check the ansatz class.
**Arguments**
params : simulation dictionary.
**Returns**
ansatz_cls (type[_AbstractTN]) : resolves the ansatz class and checks if it is
consistent with the simulation, e.g., number of excited states. It returns the
type itself, not an instance. (Also allowed to return StateVector
for compatibility.)
**Raises**
TypeError, QTeaLeavesError, NotImplementedError
"""
tn_type = self.eval_param(self.tn_type, params)
if tn_type in TN_TYPE:
# TTN, MPS, ATTN or TTO simulation; covers integer values except 0
ansatz_cls = TN_TYPE[tn_type]
elif tn_type == 0:
# ED case is not covered in TN_TYPE, because that is only for
# python TN backends
ansatz_cls = StateVector
elif isinstance(tn_type, type):
# Custom classes defined by user from scratch, from our classes
ansatz_cls = tn_type
if issubclass(ansatz_cls, StateVector):
# We want to allow this, although return type will be wrong
pass
elif not issubclass(ansatz_cls, _AbstractTN):
raise TypeError("Custom class has to inherit from _AbstractTN.")
elif tn_type in (tn_as_str := [elem.__name__ for elem in TN_TYPE.values()]):
# Covers definitions via string
ansatz_cls = list(TN_TYPE.values())[tn_as_str.index(tn_type)]
else:
raise QTeaLeavesError(f"The tn_type {tn_type} is not available.")
# Run check on number of excited states immediately as well
num_excited_states = params.get("num_excited_states", 0)
if (num_excited_states > 0) and (ansatz_cls.projector_attr() is None):
raise NotImplementedError(
f"Found {num_excited_states=}. Excited state search is not"
+ f"implemented for {ansatz_cls.extension}."
)
return ansatz_cls
[docs]
def run(
self,
params,
delete_existing_folder=False,
nthreads=1,
collect_observables=True,
):
"""
Run a simulation or multiple simulations possibly via threading.
**Arguments**
params : dict or list of dicts
The parameter dictionary or dictionaries, which is required
to obtain the output folder.
If using DenseMPO for simulation, it must be passed in the
parameters dictionary with key "dense_mpo", adding also
"exclude_from_hash": ["dense_mpo"] to avoid issues when writing
to file.
delete_existing_folder : bool, optional
If flag is True, existing folder with potential results
will be overwritten without warning. If False, an error
is raised if the folders already exist.
If `True`, `store_checkpoints` must be `False`.
Default to False
nthreads : int, optional
If number of threads greater is one, we launch serial
simulations. If greater than one, multiple threads
are started to run simulations in parallel.
Default to 1.
collect_observables : bool, optional
If True, collect observables at the end of the run.
The raw file hierarchy is not removed after collection.
Default is True.
**Details**
We auto-select backends depending on the `tn_type` specified
for the simulation.
If you run the threaded version, set the OMP_NUM_THREADS
and MKL_NUM_THREADS (executable compiled with ifort)
accordingly. The number of threads defined here times
the OMP_NUM_THREADS should not exceed the number of your
processors.
If running the TTN time evolution, workflow is the following:
#1 Loading the initial TTN state :
This can be either a random TTN, or the TTN read from a
user-set file.
#2 Groundstate search to set the initial condition :
There are two options - initial condition is the groundstate
of the specified Hamiltonian, or initial condition is a TTN
from file set by a user.
The groundstate is found by iterative search, therefore if
the initial condition is set by user, put `max_iter=0` in
convergence parameters. Otherwise, set `max_iter>0`.
#3 Perform a time evolution starting from the obtained
initial condition.
"""
# create a new list for the parameter to add different
# entries in the case of quantum trajectories
params_ext = self.extend_params(params=params)
# Check that we don't accidentally delete the current or
# parent directory
if delete_existing_folder:
for elem in params_ext:
folder_name_output = self.observables.get_folder_trajectories(
self.folder_name_output, elem
)
# pylint: disable-next=no-member
current_path = Path(os.getcwd())
# pylint: disable-next=no-member
output_path = Path(os.path.abspath(folder_name_output))
out_folder_danger = []
# True if out folder is the same as the current folder
out_folder_danger.append(output_path == current_path)
# True if out folder is a parent folder of the current folder
out_folder_danger.append(output_path in current_path.parents)
out_folder_is_dangerous = any(out_folder_danger)
if out_folder_is_dangerous:
raise ValueError(
"Preventing the deleting of current/parent folder. "
"If delete_existing_folder is True, input or output "
"folders cannot be set to the current folder or any "
"parent folder."
)
# Now we have a list
if nthreads == 1:
for elem in params_ext:
self.run_single(elem, delete_existing_folder=delete_existing_folder)
else:
self.run_threaded(
params_ext,
delete_existing_folder=delete_existing_folder,
nthreads=nthreads,
)
if collect_observables:
if isinstance(params, dict):
# convert params into a list
params = [params]
for elem in params:
self.collect_observables(elem, delete_raw_files=False)
return
[docs]
def status(self, params):
"""
Returns tuple of three integers containing the number of simulations
not started, number of simulation started but not finished, and the
number of finished simulations.
**Arguments**
params : dict or list of dicts
The parameter dictionary or dictionaries, which is required
to obtain the output folder.
"""
# create a new list for the parameter to add different
# entries in the case of quantum trajectories
params_ext = self.extend_params(params=params)
num_unstarted = 0
num_interrupted = 0
num_finished = 0
for elem in params_ext:
folder_name_output = self.observables.get_folder_trajectories(
self.folder_name_output, elem
)
finished_json = os.path.join(folder_name_output, "has_finished.json")
if os.path.isfile(finished_json):
num_finished += 1
continue
dyn_checkpoints = os.path.join(
folder_name_output, "has_dyn_checkpoints.txt"
)
if os.path.isfile(dyn_checkpoints):
# Dynamics started, but did not finish
num_interrupted += 1
continue
stat_checkpoints = os.path.join(
folder_name_output, "has_stat_checkpoints.txt"
)
if os.path.isfile(stat_checkpoints):
# Statics started, but did not finish
num_interrupted += 1
continue
# Did not reach first checkpoint of statics, consider it as simulation
# which did not start yet
num_unstarted += 1
return num_unstarted, num_interrupted, num_finished
# pylint: disable-next=too-many-statements, too-many-branches
[docs]
def run_single(self, params, delete_existing_folder=False):
"""
Run simulation specified via the parameter dictionary.
**Arguments**
params : dict
The parameter dictionary, which is required to obtain
the output folder.
delete_existing_folder : bool, optional
If flag is True, existing folder with potential results
will be overwritten without warning. If False, an error
is raised if the folders already exist.
If `True`, `store_checkpoints` must be `False`.
Default to False
**Details**
We auto-select backends depending on the `tn_type` specified
for the simulation.
* Exact diagonalization
* Python tensor network backend
"""
ansatz_cls = self.get_ansatz_cls(params)
if issubclass(ansatz_cls, ATTN):
# check the positions before starting an actual simulation
_ = self.check_disentangler_position(
params, self.eval_numeric_param(self.disentangler, params)
)
# the positions are evaluated in the run_simulation itself
# to avoid potential overwriting in running multiple simulations
# Remove any previous logger settings
self.write_input_single(params, delete_existing_folder)
if issubclass(ansatz_cls, StateVector):
# Run ED simulation
run_ed_simulation(self, params)
else:
# By definition, we must have a python backend and we
# run checks in `get_ansatz_cls` that we have a valid
# subclass
run_tn_simulation(self, params)
return
[docs]
def run_threaded(self, params, delete_existing_folder=False, nthreads=4):
"""
Run simulation specified via the parameter dictionary.
**Arguments**
params : list
List of the parameter dictionaries, which is required to obtain
the output folder.
delete_existing_folder : bool, optional
If flag is True, existing folder with potential results
will be overwritten without warning. If False, an error
is raised if the folders already exist.
If `True`, `store_checkpoints` must be `False`.
Default to False
nthreads : int, optional
Number of threads to start multiple simulations in
parallel.
Default to 4.
"""
if nthreads == 1:
# Route back to serial code ...
self.run(
params, delete_existing_folder=delete_existing_folder, nthreads=nthreads
)
return
# number of threads should not exceed length of parameter list
nthreads = min(nthreads, len(params))
# Fill list with the first nthreads simulations
job_list = []
for ii in range(nthreads):
next_args = (self, params[ii], delete_existing_folder)
next_simulation = mprc.Process(target=_run_single_thread, args=next_args)
next_simulation.start()
job_list.append(next_simulation)
# Replace jobs whenever one finishes ...
next_check = -1
ii = nthreads
while ii < len(params):
for jj in range(nthreads):
next_check += 1
if next_check == nthreads:
next_check = 0
if not job_list[next_check].is_alive():
next_args = (self, params[ii], delete_existing_folder)
next_simulation = mprc.Process(
target=_run_single_thread, args=next_args
)
next_simulation.start()
job_list[next_check] = next_simulation
ii += 1
break
if jj + 1 == nthreads:
# None of the jobs was finished; wait for 10s
sleep(10)
# All jobs launched ... now order does not matter for join
for ii in range(nthreads):
job_list[ii].join()
return
[docs]
def autoselect_disentangler(self, params):
"""
Given a number of sites and Hamiltonian, the function
automatically chooses the positions for aTTN disentanglers.
The function iterates through all possible pairs and puts the disentanglers
according to the following:
- we prioritize the disentanglers which support the links of the highest
bond dimension, i.e. the ones in the highest layers
- we place disentanglers to a position only if it fully lays on the MPO term
(this has proved to be a successful tactics)
- we skip DE positions which violate the restrictions in
`check_disentangler_position()` function
- we skip DE positions which connect the links whose dimensions cannot
grow more than maximal bond dimension (lower layers of the tree)
Parameters
----------
params : dict
Dictionary with the simulation parameters.
Return
------
auto_dis : np.array
Disentangler positions.
"""
# Total number of sites in the system
ll_1d_mapped = self.model.get_number_of_sites(params)
lattice_size = self.model.eval_lvals(params)
bond_dim = self.eval_numeric_param(
self.convergence.sim_params["max_bond_dimension"], params
)
local_dim = int(self.operators.get_local_links(ll_1d_mapped, params)[0])
# The array in which the DE positions will be added
auto_dis = []
# for checking if there is an interaction term at the DE
interactions = []
for term in self.model.hterms:
if not isinstance(term, LocalTerm):
all_interactions = term.get_interactions(lattice_size, params)
for single_interaction in all_interactions:
interactions.append(single_interaction[1])
# we iterate over all the site pairs to find the best DE positions
# to support the highest bond dim links, first loop over pairs at
# different halves, then quarters, etc.
sites_in_group = ll_1d_mapped // 2
# loop until we reach the smallest division
# pylint: disable-next=too-many-nested-blocks
for _ in range(int(np.log2(ll_1d_mapped)) - 1):
# no disentangler needed if maximal possible dimension of
# connecting link is smaller than the bond_dim
max_dim = local_dim**sites_in_group
if max_dim < bond_dim:
break
# how many groups we have for a certain division. For example, if
# the division is in quarters, there will be 4 groups.
num_groups = ll_1d_mapped // sites_in_group
for group in range(0, num_groups, 2):
# We iterate over combinations of pairs where first and second site
# belong to different groups
# Priority is given to the disentanglers connected to the sites
# which are physically closer, i.e. cover more entanglement.
for ii in range(sites_in_group - 1, -1, -1):
for jj in range(sites_in_group):
# find the indices of sites within the first and second group
site1 = group * sites_in_group + ii
site2 = (group + 1) * sites_in_group + jj
# these are the indices of the sites of new potential disentangler
de_position = [site1, site2]
# check if disentangler lays fully on an interaction term
count = 0
for interaction in interactions:
if all(de_site in interaction for de_site in de_position):
count += 1
# if not, skip this term
if count < 1:
continue
# append the new disentangler to the disentangler list
# and keep it only if it is on a valid position
auto_dis.append(de_position)
try:
_ = self.check_disentangler_position(params, auto_dis)
except DisentanglerPositionException:
auto_dis.pop()
sites_in_group = sites_in_group // 2
return np.array(auto_dis)
# pylint: disable-next=too-many-statements, too-many-branches
[docs]
def check_disentangler_position(self, params, disentangler_pos):
"""
This function checks if the input disentangler positions are valid
and returns them. If the input positions are None, returns
the autoselected ones.
Arguments
---------
params: dict
Dictionary with the simulation parameters.
disentangler_pos : list/array or None
Disentangler positions. If list/array, each row
represents one disentangler. If None, the positions are
autoselected based on the Hamiltonian.
Return
------
disentangler_pos : list/array
Disentangler positions - if input was None then
returns the autoselected positions, if input was list
then returns the array version which is needed for
aTTN algorithms. Otherwise returns the input disentangler
positions.
"""
if disentangler_pos is None:
disentangler_pos = self.autoselect_disentangler(params)
return disentangler_pos
ll_1d_mapped = self.model.get_number_of_sites(params)
lattice_size = self.model.eval_lvals(params)
if not isinstance(disentangler_pos, np.ndarray):
# if list, convert to array
if isinstance(disentangler_pos, list):
disentangler_pos = np.array(disentangler_pos)
else:
raise DisentanglerPositionException(
"Disentangler positions must be a list or a numpy array, not "
f"{type(disentangler_pos)}."
)
if len(disentangler_pos.shape) != 2:
raise DisentanglerPositionException(
"Disentangler positions must be rank-2, but has a shape "
f"{disentangler_pos.shape}."
)
if disentangler_pos.shape[1] != 2:
raise DisentanglerPositionException(
"Disentangler positions must be 2 columns, but has a shape "
f"{disentangler_pos.shape}."
)
if np.min(disentangler_pos) < 0:
raise DisentanglerPositionException(
"All disentangler positions must be greater than -1."
)
if np.max(disentangler_pos) >= ll_1d_mapped:
raise DisentanglerPositionException(
"All disentangler positions must be smaller than system size."
)
if any(
(np.min(dis) % 2 == 0) and (np.max(dis) - np.min(dis) == 1)
for dis in disentangler_pos
):
raise DisentanglerPositionException(
"Disentangler cannot be attached to only one tensor."
)
if any(dis[0] == dis[1] for dis in disentangler_pos):
raise DisentanglerPositionException(
"Disentangler cannot be placed on only one site."
)
# Check if MPO is connected to different disentanglers:
# Create array with num_sites elements, assign each disentangler a value
# and put these values to the corresponding disentangler positions.
# The values are 1,2,3, ..., num_disentangler
lattice_array = np.zeros(ll_1d_mapped, dtype=int)
ind_count = 1
for dis in disentangler_pos:
if any(lattice_array[dis] > 0):
raise DisentanglerPositionException(
"A single physical link cannot have more than one"
" disentangler attached to it."
)
lattice_array[dis] += ind_count
ind_count += 1
# Iterate through interaction terms within Hamiltonian and check if
# two-body interaction MPO is connected to two different disentanglers.
# We check if MPO is connected to different disentanglers by checking
# the values on lattice_array on these sites - from the definition of
# lattice_array above, there are two different disentanglers if the
# lattice sites have a different value>0
# pylint: disable-next=too-many-nested-blocks
for term in self.model.hterms:
if not isinstance(term, LocalTerm):
all_interactions = term.get_interactions(lattice_size, params)
for single_interaction in all_interactions:
# single_interaction[1] contains the 1d coordinates of each
# two-body term on the lattice
one_term = lattice_array[single_interaction[1]]
if len(np.unique(one_term[one_term > 0])) > 1:
raise DisentanglerPositionException(
f"MPO on sites {single_interaction[1]} connected to "
"different disentanglers."
)
# If there are no multiple disentanglers on a single interaction
# term, check also for the cases when disentangler position yields
# a 4-link MPO, which creates an error on fortran side. No
# further loop over the disentanglers is necessary due to this
# fact!
# disentanglers on 4-body terms are the candidates
if len(one_term) > 3 and np.count_nonzero(one_term) >= 1:
# reorder the sites in interaction term
order = np.argsort(single_interaction[1])
one_term = one_term[order]
single_int = np.array(single_interaction[1])[order]
# index of disentangler in `one_term`
term_dis_ind = np.nonzero(one_term)[0]
# index of the disentangler in `disentangler_pos`
ind = np.unique(one_term[term_dis_ind])[0] - 1
# access the position of that disentangler in TTN
dis = disentangler_pos[ind]
# One can show that the 4-link MPO will appear if MPO sites
# on one side of the disentangler are in both L and R environment.
# We check this by defining the array `env` of size of the 1D mapped lattice
# we will fill it with different values according to whether they
# belong to the left or right environment: if the site
# belongs to left environment, the value is 2 and if it belongs to
# right environment, the value is 3.
# this part of the code finds the position of L and R
# environment
div = int(ll_1d_mapped / 2)
while dis[0] // div == dis[1] // div:
div = int(div / 2)
# cut is the most left site in the right environment area
cut = (dis[1] // div) * div
# mark the regions of different environments with
# different values
env = (
np.ones(ll_1d_mapped, dtype=int) * 2
) # first initialize all as L
env[cut : cut + div] = 3 # the section of R
# (the reason for such separation is the positioning of the anchor
# point in the algorithm)
# The disentangler is on a valid position only if it does
# not divide the left or right environment into two pieces.
# In terms of this check, the above criterion is satisfied
# only if the sites in the `env`` array on the left and
# right of the disentangler surely belong to L and R, i.e.
# when abs(env[left site] - env[right site]) = 1 [from the values
# of `env` array in the code above].
# take only the part of `env` where the interaction term is, i.e.,
# convert something like 00022330000 for the whole lattice to
# 0223 for the site of the MPO.
env_term = env[single_int]
# iterate over the disentangler sites of one disentangler which are
# on this term.
for de_ind in term_dis_ind:
# flag if in bulk with respect to whole MPO (not left /
# right environment).
disentangler_in_bulk = de_ind not in (0, len(env_term) - 1)
# If not in bulk with respect to the whole MPO, it must
# be valid (also on the edge of left/right environment)
if not disentangler_in_bulk:
continue
# Now we have for sure a disentangler site in the bulk
# Does it cut any possible left/right environment into two?
# The sequences like 0223 are only one iff we are at the
# boundary between 2 and 3 which is a valid position.
if abs(env_term[de_ind - 1] - env_term[de_ind + 1]) != 1:
raise DisentanglerPositionException(
"Disentangler configuration resulting in a 4-linkID MPO"
)
return disentangler_pos
[docs]
class ATTNSimulation(QuantumGreenTeaSimulation):
"""
For backwards compatibility.
"""
def __init__(self, *args, **kwargs):
warn("Deprecated ATTNSimulation; switch to QuantumGreenTeaSimulation.")
version_input_processor = kwargs.get("version_input_processor", None)
if version_input_processor is not None:
warn("Deprecated argument input processor.")
del kwargs["version_input_processor"]
super().__init__(*args, **kwargs)
if version_input_processor is not None:
self.version_input_processor = version_input_processor
def _run_single_thread(obj, params, delete_existing_folder):
"""
Internal function to launch a single thread of a threaded simulation.
**Arguments**
obj : instance of :class:``ATTNSimulation``.
Simulation class to be run.
params : dict
The parameter dictionary, which is required to obtain
the output folder.
delete_existing_folder : bool, optional
If flag is True, existing folder with potential results
will be overwritten without warning. If False, an error
is raised if the folders already exist.
If `True`, `store_checkpoints` must be `False`.
Default to False
"""
obj.run_single(params, delete_existing_folder=delete_existing_folder)
return
[docs]
class DynamicsQuench(dict):
"""
Describing a quench of parameters for the dynamics part of
a TN simulation. The parameters must be defined as a function
taking the time and the simulation dictionary as arguments.
**Arguments**
dt_grid : numpy.ndarray, str, callable
Contains the list of time steps, which can differ. Moreover,
it can be parameterized and be a callable or string.
time_evolution_mode : int, optional
Defines the time evolution mode.
0 : automatic selector
(Defaults to 1 for tensor networks.
For exact diagonalization,
defaults to 10 for systems smaller than 10 sites and 11 for others.)
1 : one-tensor TDVP
2 : two-tensor TDVP
3 : two-tensor TDVP (2nd order)
4 : single-tensor TDVP (2nd order)
5 : 9 steps of one-site and one step of two-site TDVP
10 : exact diagonalization with full Hamiltonian matrix generated
11 : exact diagonalization with Krylov
Not every network type might support all time evolution modes.
Default to 0.
measurement_period : int, str, callable, optional
Allows to define a measurement period every n time steps. It
considers only the time steps, not the time passed. There
is always a measurement at the end of the quench.
Default to 1.
oqs_mode : int, optional
Controls the open quantum system evolution, where no open
quantum is 0, quantum trajectories with jumps is 1,
quantum trajectories with norm only is 2, and density matrix
evolution (Lindblad master eq.) is 3.
Default to 0.
check_superfluous : bool, optional
If ``True``, a warning is raised whenever a dictionary key is
not in the required parameters.
Default to True
**Details**
Open systems implement the Lindblad equation as
.. math::
\\frac{d}{dt} \\rho = -i [H, \\rho]
+ \\sum \\gamma (L \\rho L^{\\dagger}
- \\frac{1}{2} \\{ L^{\\dagger} L, \\rho \\})
Complex time evolution is enabled if you pass the time step as complex dt
in the native python data `complex` to the dt-grid. Negative imaginary part
will lead to cooling, positive imaginary part results in heating and raises
a warning. Complex time evolution cannot be combined with open quantum systems.
The state will be automatically renormalized.
"""
# pylint: disable-next=too-many-arguments
def __init__(
self,
dt_grid,
time_evolution_mode=0,
measurement_period=1,
oqs_mode=0,
check_superfluous=True,
):
super().__init__()
self.dt_grid = dt_grid
self.time_evolution_mode = time_evolution_mode
self.measurement_period = measurement_period
self.oqs_mode = oqs_mode
self.check_superfluous = check_superfluous
def __len__(self):
"""
Overload length, now defined via number of time steps.
"""
if isinstance(self.dt_grid, str) or hasattr(self.dt_grid, "__call__"):
raise QTeaLeavesError("Cannot support length for non-ndarray.")
return len(self.dt_grid)
[docs]
def get_length(self, params):
"""
Calculate the length of the time grid even for parameterized
grids.
**Arguments**
params : dict
The parameter dictionary, which will be passed to callables
and used to evaluate string parameters.
"""
dt_grid = self.eval_numeric_param(self.dt_grid, params)
return len(dt_grid)
[docs]
def get_dt_grid(self, params):
"""
Evaluate the dt grid and return it.
**Arguments**
params : dict
The parameter dictionary, which will be passed to callables
and used to evaluate string parameters.
"""
return self.eval_numeric_param(self.dt_grid, params)
[docs]
def eval_numeric_param(self, elem, params):
"""
Evaluate a numeric parameter which might be defined via the
parameter dictionary.
**Arguments**
elem : callable, string, or int/float
Defines the parameter either via a function which return
the value, a string being an entry in the parameter
dictionary, or directly as the numeric value.
params : dict
The parameter dictionary, which will be passed to callables
and used to evaluate string parameters.
"""
if isinstance(elem, list):
return [self.eval_numeric_param(subelem, params) for subelem in elem]
if hasattr(elem, "__call__"):
val = elem(params)
elif isinstance(elem, str):
val = params[elem]
else:
val = elem
return val
[docs]
def iter_params(self, required_params, params, time_at_start=0.0):
"""
Iterate over the all time steps and the required parameters
within. Function returns the time-dependent parameters
evaluated at mid-timestep. Moreover, we return the
time-dependent parameters at the end of each time step
to allow for measurements.
**Arguments**
required_params : OrderedDict
Keys are the required parameters, values are the
default values if no function is present.
params : dict
Simulation parameter dictionary.
time_at_start : float, optional
Time at the beginning of the quench, which allows
to take into account previous quenches.
Default to 0.0
"""
dt_grid = self.eval_numeric_param(self.dt_grid, params)
tnow = time_at_start
if self.check_superfluous:
for key in self:
if key not in required_params:
warn("Detected superfluous function `%s` in quench." % (key))
for dt_ii in dt_grid:
tmid = tnow + 0.5 * dt_ii
for key in required_params.keys():
if key in self:
yield self[key](tmid, params)
else:
yield required_params[key]
tnow += dt_ii
for key in required_params.keys():
if key in self:
yield self[key](tnow, params)
else:
yield required_params[key]
[docs]
def iter_params_dts(self, params):
"""
Iterate over parameters's time steps. Coupling with a time-step dt
greater than zero, are intended for the time evolution. Every second
parameter is 0 or -99 and indicating the values are at the end point
for the measurements, i.e., doing measurements (zero) or doing
no measurement (-99), respectively. Up to now, no other values
less or equal to zero are associated with some special meaning.
**Arguments**
params : dict
Simulation parameter dictionary.
"""
dt_grid = self.eval_numeric_param(self.dt_grid, params)
meas_period = self.eval_numeric_param(self.measurement_period, params)
for ii, dt in enumerate(dt_grid[:-1]):
yield dt
if ((ii + 1) % meas_period) == 0:
yield 0.0
else:
yield -99.0
yield dt_grid[-1]
yield 0.0
[docs]
def iter_dts(self):
"""
Generator yielding all the time steps dt within
this quench. It returns the delta, not the current
time in total.
"""
if isinstance(self.dt_grid, str) or hasattr(self.dt_grid, "__call__"):
raise QTeaLeavesError("Cannot support iter_dts for non-ndarray.")
yield from self.dt_grid