Source code for qtealeaves.observables.tnobservables

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Observable to recap all the observables that can be measured by the TN
"""

import os
import re
import warnings
from collections import OrderedDict
from collections.abc import Iterator, Sequence
from copy import deepcopy
from pathlib import Path
from typing import Any, Self, TypeAlias

import h5py
import numpy as np

from qtealeaves import __version__
from qtealeaves.abstracttns.abstract_tn import _AbstractTN
from qtealeaves.tooling import QTeaLeavesError
from qtealeaves.tooling.parameterized import _ParameterizedClass

from .bond_entropy import BondEntropy
from .correlation import Correlation
from .custom_correlation import CustomCorrelation
from .custom_function_obs import CustomFunction
from .generic_mpo import GenericMPO
from .local import Local
from .log_negativity import LogNegativity
from .overlap import Overlap
from .probabilities import Probabilities
from .projective import Projective
from .state2file import State2File
from .tensor_product import TensorProduct
from .timecorrelator import TimeCorr
from .tnobase import _TNObsBase
from .weighted_sum import WeightedSum

__all__ = ["TNObservables"]

PathLike: TypeAlias = str | bytes | os.PathLike


[docs] class TNObservables(_ParameterizedClass): """ Organization of all the measurements to be taken during a tensor network simulation. To add new observables in the container you should simply use the :code:`+=` operator as shown in the example. **Example** .. code-block:: python obs = TNObservables() obs += any_other_observable **Arguments** filename_observables : str Base filename of the definition with the observables inside the input folder and observables subfolder. A postfix might be appended upon need. The file extension will be chosen by the backend. folder_observables : str Subfolder for the observable input files inside the input folder. num_trajectories : int Total number of quantum trajectories. **Details** Up to now, the class organizes and accepts observables of the type :class:`Local`, :class:`Correlation`, :class:`Overlap`, :class:`State2File`, :class:`TensorProduct`, :class:`WeightedSum`, :class:`Projective`, :class:`Probabilities`, :class:`BondEntropy`, :class:`CustomCorrelation`, :class:`GenericMPO`, :class:`LogNegativity`, :class:`CustomFunction` """ def __init__( self, filename_observables: PathLike = "observables.in", folder_observables: PathLike = "observables", num_trajectories: int = 1, do_write_hdf5: bool = False, ) -> None: self.results_buffer: dict[str, Any] = {} self.filename_observables = filename_observables self.folder_observables = folder_observables self.num_trajectories = num_trajectories self.do_write_hdf5 = do_write_hdf5 self.obs_list: OrderedDict[str, _TNObsBase] = OrderedDict() elems: Sequence[type[_TNObsBase]] = [ Local, Correlation, Overlap, State2File, TensorProduct, WeightedSum, Projective, Probabilities, BondEntropy, TimeCorr, GenericMPO, CustomCorrelation, LogNegativity, CustomFunction, ] for elem in elems: obj = elem.empty() self.obs_list[repr(obj)] = obj
[docs] def get_num_trajectories(self, **kwargs: Any) -> int: """ Get number of quantum trajectories **Arguments** params : keyword argument A dictionary with parameters is accepted as a keyword argument. """ params = kwargs.get("params", {}) num_trajectories = self.eval_numeric_param(self.num_trajectories, params) # the first entry of the seed list should be <4096 if num_trajectories > 4095: raise QTeaLeavesError("Number of trajectory over the limit for the seed.") return num_trajectories # type: ignore[no-any-return]
[docs] def add_observable(self, obj: _TNObsBase) -> None: """ Add a specific observable to the measurements. The class of the observable added must match the predefined observables inside this class. """ if repr(obj) not in self.obs_list: raise QTeaLeavesError(f"Observables not of valid type: {repr(obj)}") self.obs_list[repr(obj)] += obj
def __iadd__(self, obj: Any) -> Self: """ Overwrite operator ``+=`` to simplify syntax. """ self.add_observable(obj) return self
[docs] @staticmethod def add_trajectories(all_results: dict, new: dict[str, Any]) -> dict: """ Add the observables for different quantum trajectories. **Arguments** all_results : dict Dictionary with observables. new : dict Dictionary with new observables to add to all_results. """ for name in ["energy", "norm"]: if name not in all_results: all_results[name] = new[name] else: all_results[name] += new[name] # `time` is the time of the quench, not the CPU time # Must be the same for all simulations if dynamics # Do not add, just pick if it is present if ("time" not in all_results) and ("time" in new): all_results["time"] = new["time"] return all_results
[docs] @staticmethod def avg_trajectories(all_results: dict, num_trajectories: int) -> dict: """ Get the average of quantum trajectories observables. **Arguments** all_results : dict Dictionary with observables. num_trajectories : int Total number of quantum trajectories. """ # skip time, already not added up since it is the time # in the simulation (must be same). for name in ["energy", "norm"]: all_results[name] /= num_trajectories return all_results
[docs] def collect_operators(self) -> list[tuple[str, str]]: """ Collect all operators with some additional information. This function is just collecting the information from all observables stored within this class. """ op_lst = [] for elem in self.obs_list: for op_tuple in self.obs_list[elem].collect_operators(): op_lst.append(op_tuple) return list(set(op_lst))
[docs] def write_results( self, filename: PathLike, params: dict[str, Any], state_ansatz: type[_AbstractTN], ) -> None: """ Write the complete measurement HDF5 file. The assumption is that the results buffer of each measurement was set with all the required results. **Arguments** filename : PathLike Target location of the file. params : dict Simulation parameters. state_ansatz : type[_AbstractTN] Label identifying the state ansatz currently in use. """ # We probably need params in the future - avoid unused argument error _ = len(params) filename = str(filename) with h5py.File(filename, "w") as fh: fh.create_dataset( "tea-version", data=[str(__version__)], dtype=h5py.string_dtype() ) # Energy, norm and runtime energy = self.results_buffer["energy"] if hasattr(energy, "__len__"): energy = energy[-1] # Resolve list via last element fh.create_dataset("energy", data=energy) fh.create_dataset("norm", data=self.results_buffer["norm"]) if "time" in self.results_buffer: fh.create_dataset("time", data=self.results_buffer["time"]) else: # nan is replacing -999e300, ignore type arg since h5py-stubs is wrong fh.create_dataset("time", data=np.nan) # type: ignore[arg-type] # reset results buffer self.results_buffer = {} for elem in self.obs_list: self.obs_list[elem].write_results(fh, state_ansatz=state_ansatz)
[docs] @staticmethod def read_cpu_time_from_log( filename_result: str, params: dict[str, Any] ) -> Iterator[tuple[str, float]]: """ Read the CPU time if it can be resolved via the log file. **Arguments** filename_result : str Filename to the output file including the path. This filename can vary for statics and dynamics and therefore has to be passed. Function assumes that the log-file is stored next to it. **Returns** Iterator returns key, value pair if available. """ regex = re.compile(r"CPU time:?\s*([\d\.]+)") log_file = Path(filename_result).parent / params.get("log_file", "sim.log") if not log_file.is_file(): return with log_file.open() as log_file: for line in reversed(log_file.readlines()): match = regex.search(line) if match: yield "cpu_time", float(match.group(1)) break
# pylint: disable-next=too-many-locals
[docs] def read_file(self, filename: str, params: dict[str, Any]) -> dict[str, Any]: """ Read all the results from a single file and store them in a dictionary. **Arguments** filename : str Filename to the output file including the path. This filename can vary for statics and dynamics and therefore has to be passed. params : dict The parameter dictionary, which is required to obtain the output folder. """ results = {} fh = h5py.File(filename, "r") # check code version, not fatal, just a warning # pylint: disable-next=no-member file_version = fh["tea-version"][0].decode("utf-8") if file_version != str(__version__): warnings.warn( f"version mismatch between file and code {file_version} vs {__version__}" ) energy = fh["energy"][()] results["energy"] = float(energy) norm = fh["norm"][()] results["norm"] = float(norm) time = fh["time"][()] results["time"] = time # can be float or complex for key, value in self.read_cpu_time_from_log(filename, params): results[key] = value for elem in self.obs_list: for key, value in self.obs_list[elem].read(fh, params=params): results[key] = value return results
[docs] def read(self, filename: str, folder: str, params: dict[str, Any]) -> dict: """ Read all the results and store them in a dictionary. **Arguments** filename : str Filename of the output file (not including the path). This filename can vary for statics and dynamics and therefore has to be passed. folder : str Folder of the output file. params : dict The parameter dictionary, which is required to obtain the output folder. """ num_trajectories = self.get_num_trajectories(params=params) if num_trajectories == 1: # no quantum trajectories folder_name_output = self.eval_str_param(folder, params) # get results # pylint: disable-next=no-member full_file_path = os.path.join(folder_name_output, filename) results = self.read_file(full_file_path, params) return results # read results for quantum trajectories results_qt = {} for ii in range(num_trajectories): # add trajectory id into params tmp = deepcopy(params) tmp["trajectory_id"] = self.get_id_trajectories(ii) tmp["seed"] = self.get_seed_trajectories(tmp) folder_name_output = self.get_folder_trajectories(folder, tmp) # get results for each quantum trajectory # pylint: disable-next=no-member full_file_path = os.path.join(folder_name_output, filename) name_ii = ("trajectory", tmp["trajectory_id"]) results_qt[name_ii] = self.read_file(full_file_path, params) results_qt[name_ii]["seed"] = tmp["seed"] # add the observables results_qt = self.add_trajectories(results_qt, results_qt[name_ii]) for elem in self.obs_list: results_qt = self.obs_list[elem].add_trajectories( results_qt, results_qt[name_ii] ) # compute the average of the observables results_qt = self.avg_trajectories(results_qt, num_trajectories) for elem in self.obs_list: results_qt = self.obs_list[elem].avg_trajectories( results_qt, num_trajectories ) return results_qt
[docs] @staticmethod def get_id_trajectories(val: int) -> int: """ Get the id for the given quantum trajectory. **Arguments** val : int value for id of the quantum trajectory. """ return val
[docs] @staticmethod def get_seed_trajectories( params: dict[str, Any], ) -> list[int]: """ Get the seed for the given quantum trajectory. **Arguments** params : dict The parameter dictionary, which is required to obtain the output folder. **Returns** seed : list The seed for the simulation has 4 entries. The following rules applies: - entries must be smaller than 4096 - entries must be bigger than 0 - last entry must be odd """ if "seed" in params: raise QTeaLeavesError("The seed cannot be specified by the user.") seed = [params["trajectory_id"] + 1, 13, 17, 19] return seed
[docs] def get_folder_trajectories(self, folder_name: str, params: dict[str, Any]) -> str: """ Evaluate the folder name and add the trajectory id if running trajectories. **Arguments** folder_name : str Name of the input/output folder for the simulation. params : dict The parameter dictionary, which is required to obtain the output folder. **Returns** folder_name : str Modified folder name with the trajectory id. """ num_trajectories = self.get_num_trajectories(params=params) folder_name = self.eval_str_param(folder_name, params) if num_trajectories != 1: folder_name = folder_name + "/qt_" + str(params["trajectory_id"]) return folder_name