Source code for qtealeaves.abstracttns.abstract_tn

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
The module contains an abstract tensor network, from which other tensor
networks can be derived.
"""

# pylint: disable=too-many-lines, too-many-statements, too-many-branches, too-many-locals, too-many-arguments

import abc
import inspect
import json
import logging
import os
import pickle
from collections.abc import Iterable, Iterator
from copy import deepcopy
from functools import partial
from time import time as tictoc
from typing import Any, Callable, Self, Sequence, TypeAlias, TypeVar

import mpmath as mp  # type: ignore[import-untyped]
import numpy as np

from qtealeaves.convergence_parameters import TNConvergenceParameters
from qtealeaves.emulator.unitariesprojmeas import UnitarySetupProjMeas
from qtealeaves.mpos.abstracteffop import (
    _AbstractEffectiveMpo,
    _AbstractEffectiveOperators,
    _AbstractEffectiveProjector,
)
from qtealeaves.solvers import KrylovInfo
from qtealeaves.tensors import TensorBackend
from qtealeaves.tensors.abstracttensor import (
    _AbstractDataMover,
    _AbstractQteaBaseTensor,
    _AbstractQteaTensor,
)
from qtealeaves.tooling import QteaJsonEncoder, QTeaLeavesError, QTeaOPESError
from qtealeaves.tooling.mpisupport import MPI, TN_MPI_TYPES
from qtealeaves.tooling.type_alias import TnPos

__all__ = [
    "postprocess_statedict",
    "MPI",
    "TN_MPI_TYPES",
]

logger = logging.getLogger(__name__)

# Cannot import them here, but improves clarity a bit
MPS: TypeAlias = "_AbstractTN"  # pylint: disable=invalid-name

T = TypeVar("T")


# def default_optional(value: Optional[T], default: T) -> T:
def default_optional(value: T | None, default: T) -> T:
    """Helper to collapse an optional argument to its actual type."""
    if value is None:
        return default
    return value


def resolve_scalar_list(value: T | Sequence[T]) -> Sequence[T]:
    """Helper to resolve a scalar and a list for typing."""
    if np.isscalar(value):
        return [value]  # type: ignore[list-item]

    return value  # type: ignore[return-value]


# pylint: disable-next=dangerous-default-value
def logger_warning(*args: Any, storage: list = []) -> None:
    """Workaround to display warnings only once in logger."""
    if args in storage:
        return

    storage.append(args)
    logger.warning(*args)


# pylint: disable-next=too-many-public-methods, too-many-instance-attributes
[docs] class _AbstractTN(abc.ABC): """ Abstract tensor network class with methods applicable to any tensor network. Parameters ---------- num_sites: int Number of sites convergence_parameters: :py:class:`TNConvergenceParameters` Class for handling convergence parameters. In particular, in the python TN simulator, we are interested in: - the *maximum bond dimension* :math:`\\chi`; - the *cut ratio* :math:`\\epsilon` after which the singular values are neglected, i.e. if :math:`\\lamda_1` is the bigger singular values then after an SVD we neglect all the singular values such that :math:`\\frac{\\lambda_i}{\\lambda_1}\\leq\\epsilon` local_dim: int, optional Local dimension of the degrees of freedom. Default to 2. requires_singvals : boolean, optional Allows to enforce SVD to have singular values on each link available which might be useful for measurements, e.g., bond entropy (the alternative is traversing the whole TN again to get the bond entropy on each link with an SVD). tensor_backend : `None` or instance of :class:`TensorBackend` Default for `None` is :class:`QteaTensor` with np.complex128 on CPU. """ extension = "tn" has_de = False skip_tto = False def __init__( self, num_sites: int, convergence_parameters: TNConvergenceParameters, local_dim: int = 2, requires_singvals: bool = False, tensor_backend: TensorBackend | None = None, ) -> None: # Class attributes by arguments self._num_sites = num_sites self._local_dim = ( [local_dim] * num_sites if np.isscalar(local_dim) else local_dim ) self._convergence_parameters = convergence_parameters self._tensor_backend = default_optional(tensor_backend, TensorBackend()) # Other class attributes self._iso_center = None self.eff_op: _AbstractEffectiveOperators = None # type: ignore[assignment] self.eff_proj: list[_AbstractEffectiveProjector] = [] # internal storage for last energy measurement for algorithms which # need an estimate where it is sufficient to rely that this number # is not too outdated self._prev_energy: float = None # type: ignore[assignment] # Selection of QR vs SVD self._requires_singvals = requires_singvals # store solver to be used self._solver: type = None # type: ignore[assignment] # Attributes for MPI self.comm = None # ML wants to use iso_towards without QR/SVD (which is one super-special # use case of a binary label (or rounding approach) + single-tensor update. self._decomposition_free_iso_towards = False # Run checks on input # ------------------- if not isinstance(convergence_parameters, TNConvergenceParameters): raise TypeError( "Convergence parameters must be TNConvergenceParameters class." ) if not isinstance(self._tensor_backend, TensorBackend): raise TypeError( f"Passed wrong type {type(self._tensor_backend)} to backend." ) if self._convergence_parameters.max_bond_dimension < 1: raise ValueError("The minimum bond dimension for a product state is 1.") if self._convergence_parameters.cut_ratio < 0: raise ValueError("The cut_ratio value must be positive.") # For definition in init here, it must be a list if len(self.local_dim) != num_sites: # type: ignore[arg-type] raise ValueError( f"Length of local_dim {len(local_dim)} " # type: ignore[arg-type] f"differs from num_sites {num_sites}." ) if np.min(self.local_dim) < 2: raise ValueError("Local dimension cannot be smaller than 2.") # internal variable for flex-TDVP self._timestep_mode_5_counter = 0 # cache for local density matrices self._cache_rho: dict[int, _AbstractQteaTensor] = {} # MPI initialization self._initialize_mpi() # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def convergence_parameters(self) -> TNConvergenceParameters: """Get the convergence settings from the TN.""" return self._convergence_parameters @convergence_parameters.setter def convergence_parameters(self, value: TNConvergenceParameters) -> None: """ Set the convergence settings from the TN. (no immediate effect, only in next steps). """ self._convergence_parameters = value @property def data_mover(self) -> _AbstractDataMover: """Get the data mover od the tensor.""" return self._tensor_backend.datamover @property @abc.abstractmethod def default_iso_pos(self) -> TnPos: """ Returns default isometry center position, e.g., for initialization of effective operators. """ @property def device(self) -> str: """Device where the tensor is stored.""" return self._tensor_backend.device @property def memory_device(self) -> str: """Return the memory device stored in the tensor backend.""" return self.tensor_backend.memory_device @property def computational_device(self) -> str: """Return the computational device stored in the tensor backend.""" return self.tensor_backend.computational_device @property def dtype(self) -> Any: """Data type of the underlying arrays.""" for tensor in self._iter_tensors(): return tensor.dtype return None
[docs] def dtype_to_char(self) -> str: """ Translate current data type of the tensor back to char C, D, S, Z, H, or I. """ for tensor in self._iter_tensors(): return tensor.dtype_to_char() raise QTeaLeavesError("Empty tensor network.")
@property def dtype_eps(self) -> float: """Data type's machine precision of the underlying arrays.""" for tensor in self._iter_tensors(): return tensor.dtype_eps raise QTeaLeavesError("Empty tensor network.") @property def iso_center(self) -> TnPos | None: """Isometry center of the tensor network (int|tuple|None)""" return self._iso_center @iso_center.setter def iso_center(self, value: TnPos | None) -> None: """Change the value of the iso center""" if isinstance(value, (int, tuple)): self._iso_center = value # type: ignore[assignment] elif value is None: self._iso_center = None else: self._iso_center = tuple(value) @property def iso_pos(self) -> TnPos: """Returns the position of the iso center, but throws error if None.""" if self.iso_center is None: raise QTeaLeavesError("TN ansatz is not isometrized.") return self.iso_center @property def has_symmetry(self) -> bool: """Check if TN is built out of symmetric tensors.""" for tensor in self._iter_tensors(): return tensor.has_symmetry raise QTeaLeavesError("Empty tensor network.") @property def num_sites(self) -> int: """Number of sites property.""" return self._num_sites @property def local_dim(self) -> Sequence[int]: """Local dimension property""" if isinstance(self._local_dim, int): # Constant Hilbert space return [self._local_dim] * self.num_sites if isinstance(self._local_dim, np.ndarray): # Potentially different Hilbert spaces via numpy array return self._local_dim if isinstance(self._local_dim[0], (int, np.int64, np.int32)): # Potentially different Hilbert spaces via list, detect # cases without symmetry by checking first entry for int return self._local_dim # type: ignore[return-value] # Case for symmetries return [elem.shape for elem in self._local_dim] # type: ignore[misc,union-attr] @property def local_links(self) -> Any: """Return information on local link (for symmetries more than integer).""" return self._local_dim @property def solver(self) -> type | None: """Return current solver for the TN.""" return self._solver @solver.setter def solver(self, value: type) -> None: """Set the solver, e.g., currently used for exp(-i H dt) |psi>.""" self._solver = value @property def tensor_backend(self) -> TensorBackend: """Return tensor backend stored for this TN-ansatz.""" return self._tensor_backend
[docs] @staticmethod def tn_mpi_types() -> dict[str, Any]: """Provide convenient access to the `TN_MPI_TYPES` for TN ansaetze.""" return TN_MPI_TYPES
# -------------------------------------------------------------------------- # Overwritten operators # -------------------------------------------------------------------------- def __repr__(self) -> str: """ Return the class name as representation. """ return self.__class__.__name__ @abc.abstractmethod def __getitem__(self, key: TnPos) -> _AbstractQteaTensor: """Overwriting the getitem, you can access tensors in the TN by their position. Parameters ---------- key : TnPos Position in the ansatz of which the tensor should be returned. """ def __len__(self) -> int: """ Provide number of sites in the TN. """ return self.num_sites @abc.abstractmethod def __setitem__(self, key: TnPos, value: _AbstractQteaTensor) -> None: """Set a tensor in the TN by its position. Parameters ---------- key : TnPos Position in the ansatz of which the tensor should be set. value : _AbstractQteaTensor New tensor to be set at the given position. """ # -------------------------------------------------------------------------- # classmethod, classmethod like # --------------------------------------------------------------------------
[docs] @classmethod @abc.abstractmethod def from_statevector( cls, statevector: np.ndarray, local_dim: int = 2, conv_params: TNConvergenceParameters | None = None, tensor_backend: TensorBackend | None = None, ) -> Self: """Decompose statevector to tensor network of chosen type."""
[docs] @classmethod @abc.abstractmethod def from_density_matrix( cls, rho: np.ndarray, n_sites: int, dim: int, conv_params: TNConvergenceParameters, tensor_backend: TensorBackend | None = None, prob: bool = False, ) -> Self: """Decompose density matrix to tensor network of chosen type."""
[docs] @classmethod @abc.abstractmethod def from_lptn( cls, lptn: Self, conv_params: TNConvergenceParameters | None = None, **kwargs: Any, ) -> Self: """Converts LPTN to tensor network of chosen type."""
[docs] @classmethod @abc.abstractmethod def from_mps( cls, mps: Self, conv_params: TNConvergenceParameters | None = None, **kwargs: Any, ) -> Self: """Converts MPS to tensor network of chosen type."""
[docs] @classmethod @abc.abstractmethod def from_ttn( cls, ttn: Self, conv_params: TNConvergenceParameters | None = None, **kwargs: Any, ) -> Self: """Converts TTN to tensor network of chosen type."""
[docs] @classmethod @abc.abstractmethod def from_tto( cls, tto: Self, conv_params: TNConvergenceParameters | None = None, **kwargs: Any, ) -> Self: """Converts TTO to tensor network of chosen type."""
# pylint: disable=inconsistent-return-statements
[docs] @classmethod def from_x( cls, obj: Self, conv_params: TNConvergenceParameters | None = None, **kwargs: Any, ) -> Self: """ Converts tensor network of type `x` to chosen type. Parameters ---------- obj: :py:class:`_AbstractTN` object to convert to chosen type. conv_params: :py:class:`TNConvergenceParameters`, optional Input for handling convergence parameters Default is None. Return ------ new_obj: chosen type decomposition of obj. If truncations are part of the underlying result, they are skipped. """ if not isinstance(obj, _AbstractTN): raise TypeError( f"The input is not an abstract TN, but got type {type(obj)}." ) from_x_dict: dict[str, Callable[[Any], Self | tuple]] = { "mps": cls.from_mps, "lptn": cls.from_lptn, "ttn": cls.from_ttn, "tto": cls.from_tto, } if obj.extension not in from_x_dict: raise TypeError( f"Input is of {type(obj)} and cannot be converted to {cls}." ) result = from_x_dict[obj.extension](obj, conv_params, **kwargs) # type: ignore[call-arg] if isinstance(result, cls): return result # This catches probably some TTO cases returning a tuple # of the ansatz and the truncation for elem in result: # type: ignore[union-attr] if isinstance(elem, cls): return elem raise QTeaLeavesError("Method did not encounter _AbstractTN to return.")
# pylint: enable=inconsistent-return-statements
[docs] @classmethod @abc.abstractmethod def product_state_from_local_states( cls, mat: np.ndarray | list[np.ndarray], padding: Sequence | None = None, convergence_parameters: TNConvergenceParameters | None = None, tensor_backend: TensorBackend | None = None, ) -> Self: """Construct a product (separable) state in a suitable tensor network form, given the local states of each of the sites."""
[docs] @classmethod def read_pickle( cls, filename: str, tensor_backend: TensorBackend | None = None, convergence_parameters: TNConvergenceParameters | None = None, ) -> Self: """ Read via pickle-module. filename : str, path-like Filename including path where to find the tensor network. tensor_backend : :class:`TensorBackend` | None, optional If not `None`, attribute `_tensor_backend` is set and data type and device are converted. Default to `None` (no conversion) convergence_parameters: :py:class:`TNConvergenceParameters` Class for handling convergence parameters. Default to `None` (use default convergence parameters) """ ext = "pkl" + cls.extension if not filename.endswith(ext): raise QTeaLeavesError( f"Filename {filename} not valid, extension should be {ext}." ) with open(filename, "rb") as fh: obj = pickle.load(fh) if not isinstance(obj, cls): raise TypeError( f"Loading wrong tensor network ansatz: {type(obj)} vs {cls}." ) if tensor_backend is not None: obj.convert(dtype=tensor_backend.dtype, device=tensor_backend.memory_device) # pylint: disable-next=protected-access obj._tensor_backend = tensor_backend if convergence_parameters is not None: obj.convergence_parameters = convergence_parameters else: msg = ( "Reading pickled tensor network without specifying convergence parameters.", "Convergence parameters are not saved in the pickle file.", "This will set convergence parameters to deafult.", ) logger.warning(msg) return obj
[docs] @classmethod @abc.abstractmethod def mpi_bcast( cls, state: Self, comm: Any, tensor_backend: TensorBackend, root: int = 0 ) -> Self: """ Broadcast a whole tensor network. """
[docs] def copy( self, dtype: Any = None, device: str | None = None, *, copy_convergence_parameters: bool = True, ) -> Self: """ Make a copy of a TN. **Arguments** dtype : target dtype of the new TN. Optional. Default to `None`. device : target device of the new TN. Optional. Default to `None`. copy_convergence_parameters : copy the convergence parameters from the source state. Optional, by keyword-only. Default to `True`. **Details** The following attributes have a special treatment and are not present in the copied object. * convergence_parameters (see boolean flag) * log file (filehandle) * MPI communicator """ # Store attributes which cannot be pickled, so also potential problems # with deepcopy storage = self._store_attr_for_pickle() obj = deepcopy(self) self._restore_attr_for_pickle(storage) if copy_convergence_parameters: obj.convergence_parameters = deepcopy(self.convergence_parameters) obj.convert(dtype, device) return obj
[docs] @abc.abstractmethod def to_dense(self, true_copy: bool = False) -> Self: """Convert into a TN with dense tensors (without symmetries)."""
[docs] @classmethod @abc.abstractmethod def read( cls, filename: str, tensor_backend: TensorBackend, cmplx: bool = True, order: str = "F", ) -> Self: """Read a TN from a formatted file."""
# -------------------------------------------------------------------------- # Checks and asserts # --------------------------------------------------------------------------
[docs] def is_dtype_complex(self) -> bool: """Check if data type is complex based on one example tensor..""" for tensor in self._iter_tensors(): return tensor.is_dtype_complex() raise QTeaLeavesError("Empty tensor network.")
[docs] @staticmethod def assert_extension(obj: "_AbstractTN", extension: str) -> None: """ Asserts that `obj.extension` is equal to `extension`. Parameters ---------- obj : any Will raise error if obj is not an instance of :class:`_AbstractTN` extension : str Will check if the extension of the TN ansatz matches the extionsion of the argument. If not, a type error is raised. """ if not isinstance(obj, _AbstractTN): raise TypeError(f"Expected _AbstractTN, but got type {type(obj)}.") if obj.extension != extension: raise TypeError(f"Expected {extension}, but got type {type(obj)}.")
[docs] def sanity_check(self) -> None: """ By default, we provide an empty sanity check method which can be overwritten by each class to check properties of the network. """
# by default, no checks and no need to overwrite method in class # -------------------------------------------------------------------------- # Abstract methods to be implemented # --------------------------------------------------------------------------
[docs] @abc.abstractmethod def apply_projective_operator( self, site: int, selected_output: int | None = None, remove: bool = False ) -> tuple[int, float]: """ Apply a projective operator to the site **site**, and give the measurement as output. You can also decide to select a given output for the measurement, if the probability is non-zero. Finally, you have the possibility of removing the site after the measurement. .. warning:: Applying projective measurements/removing sites is ALWAYS dangerous. The information of the projective measurement should be in principle carried over the entire mps, by iteratively applying SVDs across all sites. However, this procedure is highly suboptimal, since it is not always necessary and will be processed by the following two-sites operators. Thus, the procedure IS NOT applied here. Take care that entanglement measures through :class:`TNObsBondEntropy` may give incorrect results right after a projective operator application. Furthermore, if working with parallel approaches, projective operators should be treated with even more caution, since they CANNOT be applied in parallel. Parameters ---------- site: int Index of the site you want to measure selected_output: int, optional If provided, the selected state is measured. Throw an error if the probability of the state is 0 remove: bool, optional If True, the measured index is traced away after the measurement. Default to False. Returns ------- meas_state: int Measured state state_prob : float Probability of measuring the output state """
[docs] @abc.abstractmethod def build_effective_operators(self, measurement_mode: bool = False) -> None: """ Build the complete effective operator on each of the links. Now assumes `self.eff_op` is set. """
@abc.abstractmethod def _convert_singvals(self, dtype: Any, device: str | None) -> None: """Convert the singular values of the tensor network to dtype/device.""" @abc.abstractmethod def _iter_physical_links(self) -> Iterator[tuple[int, ...]]: """ Gives an iterator through the physical links. In the MPS, the physical links are connected to nothing, i.e. we assign the tensor index -2 Return ------ Tuple[int] The identifier from_tensor, to_tensor """
[docs] @abc.abstractmethod def get_rho_i(self, idx: int) -> _AbstractQteaTensor: """ Get the reduced density matrix of the site at index idx Parameters ---------- idx : int Index of the site """
[docs] @abc.abstractmethod def get_tensor_of_site(self, idx: int) -> _AbstractQteaTensor: """ Generic function to retrieve the tensor for a specific site. Compatible across different tensor network geometries. Parameters ---------- idx : int Return tensor containing the link of the local Hilbert space of the idx-th site. """
[docs] @abc.abstractmethod def iso_towards( self, new_iso: TnPos, keep_singvals: bool = False, trunc: bool = False, conv_params: TNConvergenceParameters | None = None, move_to_memory_device: bool = True, ) -> list[np.ndarray]: """ Shift the isometry center to the tensor at the corresponding position, i.e., move the isometry to a specific tensor, that might not be a physical. Parameters ---------- new_iso : Position in the TN of the tensor which should be isometrized. keep_singvals : bool, optional If True, keep the singular values even if shifting the iso with a QR decomposition. Default to False. trunc : Boolean, optional If `True`, the shifting is done via truncated SVD. If `False`, the shifting is done via QR. Default to `False`. conv_params : :py:class:`TNConvergenceParameters`, optional Convergence parameters to use for the SVD. If `None`, convergence parameters are taken from the TTN. Default to `None`. move_to_memory_device : bool, optional If True, when a mixed device is used, move the tensors that are not the isometry center back to the memory device. Default to True. Returns: truncated_s_vals (list[np.ndarray): or numpy-like entries in list. Singular values cut in the process of shifting the isometry center. Details ------- The tensors used in the computation will always be moved on the computational device. For example, the isometry movement keeps the isometry center end the effective operators around the center (if present) always on the computational device. If move_to_memory_device is False, then all the tensors (effective operators) on the path from the old iso to the new iso will be kept in the computational device. This is very useful when you iterate some protocol between two tensors, or in general when two tensors are involved. """
[docs] @abc.abstractmethod def isometrize_all(self) -> None: """ Isometrize towards the default isometry position with no previous isometry center, e.g., works as well on random states. Returns ------- None """
@abc.abstractmethod def _iter_tensors(self) -> Iterator[_AbstractQteaTensor]: """Iterate over all tensors forming the tensor network (for convert etc)."""
[docs] @abc.abstractmethod def norm(self) -> float: """ Calculate the norm of the state. """
# We opt for typing with float only (no complex) as this works on # all tensor networks independent of the underlying tensor data type
[docs] @abc.abstractmethod def scale(self, factor: float) -> None: """ Multiply the tensor network state by a scalar factor (in-place). Parameters ---------- factor : float Factor for multiplication of current tensor network state. """
# We opt for typing with float only (no complex) as this works on # all tensor networks independent of the underlying tensor data type
[docs] @abc.abstractmethod def scale_inverse(self, factor: float) -> None: """ Multiply the tensor network state by the inverse of a scalar factor. Parameters ---------- factor : float Factor for multiplication of current tensor network state. """
# self.scale(1.0 / factor) # Division will be excuted on device
[docs] @abc.abstractmethod def site_canonize(self, idx: int, keep_singvals: bool = False) -> None: """ Shift the isometry center to the tensor containing the corresponding site, i.e., move the isometry to a specific Hilbert space. This method can be implemented independent of the tensor network structure. Parameters ---------- idx : int Index of the physical site which should be isometrized. keep_singvals : bool, optional If True, keep the singular values even if shifting the iso with a QR decomposition. Default to False. """
@abc.abstractmethod def _update_eff_ops(self, id_step: Sequence[int]) -> None: """ Update the effective operators after the iso shift Parameters ---------- id_step : List[int] List with information of the iso moving path. The encoding is specific to the ansatz. Returns ------- None Updates the effective operators in place """ @abc.abstractmethod def _partial_iso_towards_for_timestep( self, pos: TnPos, next_pos: TnPos, no_rtens: bool = False ) -> tuple[_AbstractQteaTensor | int, TnPos, int, Sequence[int]]: """ Move by hand the iso for the evolution backwards in time Parameters ---------- pos : Tuple[int] | int Position of the tensor evolved next_pos : Tuple[int] | int Position of the next tensor to evolve Returns ------- _AbstractQteaTensor | int The R tensor of the iso movement. integer if no_rtens = True Tuple[int] The position of the partner (the parent in TTNs) int The link of the partner pointing towards pos List[int] The update path to pass to _update_eff_ops """ # requires next_pos to be not `None` and actual TnPos
[docs] @abc.abstractmethod def default_sweep_order(self, skip_exact_rgtensors: bool = False) -> list[TnPos]: """ Default sweep order to be used in the ground state search/time evolution. Arguments --------- skip_exact_rgtensors : bool, optional Allows to exclude tensors from the sweep which are at full bond dimension and represent just a unitary transformation. Usually set via the convergence parameters and then passed here. Default to `False`. Returns ------- List[int] | List[Tuple[int]] The generator that you can sweep through """ raise NotImplementedError("This method is ansatz-specific")
[docs] def default_sweep_order_back( self, skip_exact_rgtensors: bool = False ) -> list[TnPos]: """Default sweep order backwards, e.g., for second-order methods.""" return self.default_sweep_order(skip_exact_rgtensors=skip_exact_rgtensors)[::-1]
[docs] def filter_sweep_order( self, sweep_order: Sequence[TnPos], skip_exact_rgtensors: bool ) -> Sequence[TnPos]: """Filter a sweep order with respect to exact rg tensors if flag active.""" if not skip_exact_rgtensors: return sweep_order default_order = self.default_sweep_order(skip_exact_rgtensors=True) filtered_sweep_order = [] for elem in sweep_order: if elem in default_order: filtered_sweep_order.append(elem) return filtered_sweep_order
[docs] def get_projector_function( self, pos: TnPos, pos_links: list[TnPos | None] ) -> Callable: """ Generates a function which locally projects out the effective projectors. Used in the excited state search. **Arguments** pos : tuple[int] The position of the tensor from which to project. pos_links : list[tuple] Position of neighboring tensors where the links in `tensor` lead to. **Returns** Callable : the function. """ # partial should return a pointer to the function with arguments applied, so it # should not create a whole new function object return partial( self._project_out_projectors, projectors=self.eff_proj, pos=pos, pos_links=pos_links, )
@staticmethod def _project_out_projectors( tensor: _AbstractQteaTensor, projectors: Sequence, pos: TnPos, pos_links: Sequence[TnPos | None], ) -> _AbstractQteaTensor: """ Takes the tensor and projects out all eff_projectors at the given position. Used for the excited state search. Does not renormalize the tensor to the initial norm! This seems to work better for the excited state search (it produces lower overlaps). **Arguments** tensor : :class:`_AbstractQteaTensor` The tensor from which to subtract the projection. projectors : list A list of projectors as effective operators. position : tuple | int The position of the tensor. pos_links : list[tuple] Position of neighboring tensors where the links in `tensor` lead to. **Returns** :class:`_AbstractQteaTensor` """ tabs: Callable = tensor.get_attr("abs") # type: ignore[assignment] # calculate all overlaps first, sort them, and first orthogonalize # the ones with the biggest overlap overlaps = [] abs_overlaps = [] local_projectors = [] for projector in projectors: local_projector = projector.contract_to_projector( tensor=None, pos=pos, pos_links=pos_links ) # move to the same device local_projector.convert(device=tensor.device) overlap = local_projector.dot(tensor) overlaps.append(overlap) abs_overlaps.append(tabs(overlap)) local_projectors.append(local_projector) # and now orthogonalize starting with the largest overlap sorted_ndx = sorted(enumerate(abs_overlaps), key=lambda x: x[1], reverse=True) for ii, _ in sorted_ndx: overlap = overlaps[ii] local_projector = local_projectors[ii] tensor.add_update(other=local_projector, factor_other=-1 * overlap) # WARNING: I know you want to renormalize here, but it is wrong! # You will get wrong results. Luka Feb 2024 return tensor
[docs] @abc.abstractmethod def to_statevector( self, qiskit_order: bool = False, max_qubit_equivalent: int = 20 ) -> _AbstractQteaTensor: """ Decompose a given TN into statevector form if pure. Parameters ---------- qiskit_order : bool, optional If true, the order is right-to-left. Otherwise left-to-right (which is the usual order in physics). Default to False. max_qubit_equivalent : int, optional Maximum number of qubits for which the statevector is computed. i.e. for a maximum hilbert space of 2**max_qubit_equivalent. Default to 20. Returns ------- psi : instance of :class:`_AbstractQteaTensor` The statevector of the system Raises ------ Mixed state: if mixed-state representations are not pure, an error will be raised. """
[docs] @abc.abstractmethod def write(self, filename: str, cmplx: bool = True) -> None: """Write the TN in python format into a FORTRAN compatible format."""
[docs] @abc.abstractmethod def apply_local_kraus_channel( self, kraus_ops: dict[int, _AbstractQteaTensor] ) -> float: """ Apply local Kraus channels to tensor network ------- Parameters ------- kraus_ops : dict of :py:class:`_AbstractQTeaTensor` Dictionary, keys are site indices and elements the corresponding 3-leg Kraus tensors Returns ------- singvals_cut: float Sum of singular values discarded due to truncation. """
[docs] @abc.abstractmethod def get_substate( self, first_site: int, last_site: int, truncate: bool = False ) -> Self: """ Returns the smaller TN built of tensors from `first_site` to `last_site`, where sites refer to physical sites. Parameters ---------- first_site : int First (physical) site defining a range of tensors which will compose the new TN. Python indexing assumed, i.e. counting starts from 0. last_site : int Last (physical) site of a range of tensors which will compose the new TN. Python indexing assumed, i.e. counting starts from 0. truncate : Bool If False, tensors are returned as is, possibly with non-dummy links on edge tensors. If True, the edges of will be truncated to dummy links. Default to False. """
[docs] @abc.abstractmethod def kron(self, other: Self) -> Self: """ Concatenate two TN, taking the kronecker/outer product of the two states. The bond dimension assumed is the maximum between the two bond dimensions. Parameters ---------- other : :py:class:`_AbstractTN` TN to concatenate with self. Returns ------- :py:class:`_AbstractTN` Concatenation of the first TTN with the second in order """
# -------------------------------------------------------------------------- # Methods that can be inherited # -------------------------------------------------------------------------- def _apply_projective_operator_common( self, site: int, selected_output: int ) -> tuple[_AbstractQteaTensor, int, float]: """ Execute common steps across different ansätze. Returns ------- rho_i : _AbstractQteaTensor meas_state: integer old_norm : norm before calculating rho_i and renormalizing """ if selected_output is not None and selected_output > self.local_dim[site] - 1: raise ValueError("The seleted output must be at most local_dim-1") # Set the orthogonality center self.site_canonize(site, keep_singvals=True) # Normalize old_norm = self.norm() self.scale_inverse(old_norm) rho_i = self.get_rho_i(site) probabilities = rho_i.diag(real_part_only=True, do_get=True) # type: ignore[attr-defined] cumul_probs = np.cumsum(probabilities) random_u = np.random.rand() meas_state = None for ii, cprob_ii in enumerate(cumul_probs): if selected_output is not None and ii != selected_output: continue if cprob_ii >= random_u or selected_output == ii: meas_state = ii # state_prob = probabilities[ii] break if meas_state is None: raise QTeaLeavesError("Did not run into measurement.") return rho_i, meas_state, old_norm
[docs] def checkpoint_copy_simulation_attr(self, src: Any) -> None: """Copy attributes linked to the simulation, like convergence parameters.""" self.convergence_parameters = src.convergence_parameters self.solver = src.solver if src.comm is not None: raise ValueError("Checkpoints and MPI are not yet enabled.")
[docs] def checkpoint_store( self, folder_name_output: str, dyn_checkpoint_file: str | None, int_str: str, checkpoint_indicator_file: str, is_dyn: bool = False, jdic: Any = None, ) -> str: """ Store the tensor network as checkpoint. **Arguments** folder_name_output : str Name of the output folder, where we store checkpoints. dyn_checkpoint_file : str or `None` Name of the previous checkpoint file, which can be deleted after creating the new checkpoint. int_str : str Identifier containing integers as string to identify the checkpoint when loading in a potential future run. checkpoint_indicator_file: str Path to file which indicates whether checkpoints exists. is_dyn : bool, optional Flag to indicate if checkpoint is for statics (False) or dynamics (True). Default to `False`. jdic : json-compatible structure or `None`, optional Store additional information as json. Default to `None` (store nothing). """ prev_checkpoint_file = dyn_checkpoint_file dyn_stat_switch = "dyn" if is_dyn else "stat" dyn_checkpoint_file = os.path.join( folder_name_output, f"TTN_{dyn_stat_switch}_{int_str}" ) self.save_pickle(dyn_checkpoint_file) if jdic is not None: with open(dyn_checkpoint_file + ".json", "w+") as fh: json.dump(jdic, fh, cls=QteaJsonEncoder) # Delete previous checkpoint if prev_checkpoint_file is not None: ext = ".pkl" + self.extension os.remove(prev_checkpoint_file + ext) if os.path.isfile(prev_checkpoint_file + ".json"): os.remove(prev_checkpoint_file + ".json") if not os.path.isfile(checkpoint_indicator_file): with open(checkpoint_indicator_file, "w+") as fh: pass return dyn_checkpoint_file
[docs] def clear_cache_rho(self) -> None: """Clear cache of reduced density matrices.""" self._cache_rho = {}
[docs] def convert(self, dtype: Any, device: str | None) -> None: """Convert the data type and device of a tensor network inplace.""" if isinstance(dtype, str): dtype = self.dtype_from_char(dtype) # Converting singular values relies on tensor backend of tensors, # thus do it before converting the tensors self._convert_singvals(dtype, device) # convert all tensors in a tensor network if (self.dtype != dtype) or (self.device != device): for tensor in self._iter_tensors(): tensor.convert(dtype, device) # convert all effective operators if self.eff_op is not None: self.eff_op.convert(dtype, device) # convert all effective projectors for proj in self.eff_proj: proj.convert(dtype, device) # type: ignore[arg-type] # convert all reduced density matrices for _, rho in self._cache_rho.items(): # Convert is an in-place update rho.convert(dtype, device)
[docs] def move_pos(self, pos: TnPos, device: Any = None, stream: bool = False) -> None: """ Move just the tensor in position `pos` with the effective operators insisting on links of `pos` on another device. Other objects like effective projectors will be moved as well. Acts in place. Note: the implementation of the tensor backend can fallback to synchronous moves only depending on its implementation. Parameters ---------- pos : int | Tuple[int] Integers identifying a tensor in a tensor network. device : str, optional Device where you want to send the QteaTensor. If None, no conversion. Default to None. stream : bool | stream | None, optional If True, use a new stream for memory communication given by the data mover. If False, use synchronous communication with GPU. If not a boolean (and not None), we assume it is a stream object compatible with the backend. None is deprecated and equals to False. Default to False (Use null stream). """ if device is None: # Quick return as pointed out in the docstring. return if stream is None: logger_warning("stream=None is deprecated, use False instead.") stream = False # Boolean logic for stream and sync are opposite sync = not stream if isinstance(stream, bool) else stream _ = stream # Convert the tensor self.data_mover.move(self[pos], device=device, sync=sync) pos_links = self.get_pos_links(pos) # Convert the effective operators if present if self.eff_op is not None: for pos_link in pos_links: if (pos_link, pos) in self.eff_op: eff_op = self.eff_op[(pos_link, pos)] # Note: effective operators on one link can be a list of tensors, # so we need to check if it is an iterable # https://docs.python.org/3/library/collections.abc.html#id19 if isinstance(eff_op, Iterable): for tensor in eff_op: self.data_mover.move(tensor, device=device, sync=sync) else: self.data_mover.move(eff_op, device=device, sync=sync) # Convert the effective projectors if present. for proj in self.eff_proj: for pos_link in pos_links: # Also move the tensor of psi0 (tensor at pos must exist) # Here, we just need the tensor and use move of the data_mover # instead of move_pos self.data_mover.move( proj.psi0[pos], # type: ignore[index,arg-type] device=device, sync=sync, ) # pylint: disable-next=protected-access if (pos_link, pos) in proj: self.data_mover.move( proj[(pos_link, pos)], # type: ignore[index] device=device, sync=sync, ) self._tensor_backend.tensor_cls.free_device_memory(device=device)
[docs] def dtype_from_char(self, dtype: str) -> Any: """Resolve data type from chars C, D, S, Z and optionally H.""" for tensor in self._iter_tensors(): return tensor.dtype_from_char(dtype) raise QTeaLeavesError("Querying on empty tensor network.")
[docs] def normalize(self) -> None: """ Normalize the state depending on its current norm. """ self.scale_inverse(self.norm())
[docs] def pre_timeevo_checks(self, raise_error: bool = False) -> bool: """Check if a TN ansatz is ready for time-evolution.""" is_check_okay = True if not self.is_dtype_complex() and raise_error: raise QTeaLeavesError("Trying time evolution with real state.") if not self.is_dtype_complex(): is_check_okay = False logger.warning("Trying to run time evolution with real state.") return is_check_okay
[docs] @staticmethod def projector_attr() -> str | None: """Name as string of the projector class to be used with the ansatz. Returns: Name usable as `getattr(qtealeaves.mpos, return_value)` to get the actual effective projector suitable for this class. If no effective projector class is avaiable, `None` is returned. """ return None
[docs] def set_cache_rho(self) -> None: """Cache the reduced density matrices for faster access.""" for ii in range(self.num_sites): self._cache_rho[ii] = self.get_rho_i(ii)
def _store_attr_for_pickle(self) -> dict[str, Any]: """Return dictionary with attributes that cannot be pickled and unset them.""" storage = { "conv_params": self._convergence_parameters, "comm": self.comm, "solver": self._solver, "cache_rho": self._cache_rho, } self._convergence_parameters = None # type: ignore[assignment] self.comm = None self._solver = None # type: ignore[assignment] self.clear_cache_rho() return storage def _restore_attr_for_pickle(self, storage: dict[str, Any]) -> None: """Restore attributed removed for pickle from dictionary.""" # Reset temporary removed attributes self._convergence_parameters = storage["conv_params"] self.comm = storage["comm"] self._solver = storage["solver"] self._cache_rho = storage["cache_rho"]
[docs] def save_pickle(self, filename: str) -> None: """ Save class via pickle-module. A state is always saved on a host CPU with tensor_backend.device set to "cpu". After saving, the state and the tensor_backend are converted back to the original device. **Details** The following attributes have a special treatment and are not present in the copied object. * convergence_parameters * log file (filehandle) * MPI communicator """ # otherwise (TTN) run into exception # "Need to isometrize to [0, 0], but at {self.iso_center}." # in ttn_simulator.build_effective_operators self.iso_towards(self.default_iso_pos) # Temporary remove objects which cannot be pickled which # included convergence parameters for lambda function and # parameterized variables, the log file as file handle and # the MPI communicator storage = self._store_attr_for_pickle() memory_device = self.tensor_backend.memory_device computational_device = self.tensor_backend.computational_device # Assume pickle needs to be on host. # Internally an empty call if everything is already on cpu. self.convert(None, "cpu") ext = "pkl" + self.extension if not filename.endswith(ext): filename += "." + ext with open(filename, "wb") as fh: pickle.dump(self, fh) if self.iso_center is None: # Move everything, although this is not a standard case self.convert(dtype=None, device=memory_device) elif computational_device != memory_device: # Mixed device, move back only the iso_center self.move_pos(pos=self.iso_center, device=computational_device) else: # otherwise everything is moved, could be completely on the GPU self.convert(dtype=None, device=memory_device) self._restore_attr_for_pickle(storage)
# pylint: disable-next=unused-argument
[docs] def permute_spo_for_two_tensors( self, spo_list: list, theta: _AbstractQteaTensor, link_partner: int, # pylint: disable=unused-argument ) -> tuple[list, _AbstractQteaTensor, Sequence[int] | None]: """Returns permuted SPO list, permuted theta, and the inverse permutation.""" return spo_list, theta, None
[docs] def randomize( self, noise: float | None = None, mpo_to_recalculate_eff_ops: Any = None ) -> None: """ Randomize all tensors in the tensor network. Removes eff_ops and eff_proj. If an mpo is passed as mpo_to_recalculate_eff_ops, the effective operators are recalculated with it. Parameters ---------- noise : float | None The amount of noise added. None randomizes completely. reset_eff_ops_mpo : mpo Used to recalculate effective operator. """ for tens in self._iter_tensors(): old_norm = tens.norm_sqrt() tens.randomize(noise=noise) new_norm = tens.norm_sqrt() tens *= old_norm / new_norm # Also reset the effective operators and projectors. self.eff_op = None # type: ignore[assignment] self.eff_proj = [] # reset the iso position self.iso_center = None self.isometrize_all() self.normalize() # recalculate the effective operators if mpo_to_recalculate_eff_ops is not None: mpo_to_recalculate_eff_ops.setup_as_eff_ops(self)
# -------------------------------------------------------------------------- # Unsorted # -------------------------------------------------------------------------- def _postprocess_singvals_cut( self, singvals_cut: Any, conv_params: TNConvergenceParameters | None = None ) -> Any: """ Postprocess the singular values cut after the application of a tSVD based on the convergence parameters. Either take the sum of the squared singvals (if `conv_params.trunc_tracking_mode` is `"C"`) or the maximum (if `conv_params.trunc_tracking_mode` is `"M"`). Parameters ---------- singvals_cut : np.ndarray Singular values cut in a tSVD conv_params : TNConvergenceParameters, optional Convergence parameters. If None, the convergence parameters of the tensor network class is used, by default None Returns ------- float The processed singvals """ if conv_params is None: conv_params = self._convergence_parameters # If no singvals was cut append a 0 to avoid problems if len(singvals_cut) == 0: return 0 if conv_params.trunc_tracking_mode == "M": singvals_cut = singvals_cut.max() elif conv_params.trunc_tracking_mode == "C": if hasattr(singvals_cut, "sum"): # Works for numpy, cupy, pytorch ... singvals_cut = (singvals_cut**2).sum() else: # Tensorflow handling (example tensor to get attribute) for tensor in self._iter_tensors(): break else: raise QTeaLeavesError( "Empty tensor network cannot process singvals." ) func_sum = tensor.get_attr("sum") singvals_cut = func_sum(singvals_cut**2) # type: ignore[operator] else: raise QTeaLeavesError( f"Unknown trunc_tracking_mode {conv_params.trunc_method}" ) return singvals_cut ######################################### ########## MEASUREMENT METHODS ########## #########################################
[docs] def meas_local( self, op_list: _AbstractQteaTensor | Sequence[_AbstractQteaTensor] ) -> np.ndarray: """ Measure a local observable along all sites of the MPS Parameters ---------- op_list : list of :class:`_AbstractQteaTensor` local operator to measure on each site Return ------ measures : ndarray, shape (num_sites) Measures of the local operator along each site """ if isinstance(op_list, _AbstractQteaTensor): if len(set(self.local_dim)) != 1: raise QTeaLeavesError( "Trying to use single operator for non-unique Hilbert spaces." ) op_list = [op_list for _ in range(self.num_sites)] # Move all operators to the computational device. # Assume self.get_rho(ii) is always on the computational device. device_list = [] for ii, op in enumerate(op_list): # The if check because the user could pass a numpy array as the op_list. # In that case, we assume everything is on the same device. if isinstance(op, _AbstractQteaTensor): device_list.append(op.device) op.convert(device=self.tensor_backend.computational_device) else: device_list.append(None) # Always store on host measures = np.zeros(self.num_sites) # This subroutine can be parallelized if the singvals are stored using # joblib for ii in range(self.num_sites): rho_i = self.get_rho_i(ii) op = op_list[ii] # Ensure that numpy/cupy operators are on the computational device. # tensordot() should already check this, but it prints a warning # "Converting tensor on the fly" that can be avoided by converting # explicitly the array here. # Note: op in the op_list is not overwritten if not isinstance(op, _AbstractQteaTensor): # NOTE: maybe there is a better place to make this conversion op = self._tensor_backend.from_elem_array( op, dtype=rho_i.dtype, device=self.tensor_backend.computational_device, ) if op.ndim != 2: # Need copy, otherwise input tensor is modified ("passed-by-pointer") # This checks needs to be the 2nd one as `trace_one_dim_pair` is # only a method of _AbstractQteaTensor op = op.copy() op.trace_one_dim_pair([0, 3]) expectation = rho_i.tensordot(op, ([0, 1], [1, 0])) measures[ii] = np.real(expectation.get_entry()) # Move the operators back. for ii, op in enumerate(op_list): if isinstance(op, _AbstractQteaTensor): op.convert(device=device_list[ii]) return measures
[docs] def meas_magic( self, renyi_idx: int = 2, num_samples: int | Sequence[int] = 1000, return_probabilities: bool = False, precision: int = 14, ) -> Any: """ Measure the magic of the state as defined in https://arxiv.org/pdf/2303.05536.pdf, with a given number of samples. To see how the procedure works see meas_unbiased_probabilities. Parameters ---------- renyi_idx : int, optional Index of the Rényi entropy you want to measure. If 1, measure the Von Neumann entropy. Default to 2. num_samples : int | List[int], optional Number of random number sampled for the unbiased probability measurement. If a List is passed, then the algorithm is run over several superiterations and each entry on num_samples is the number of samples of a superiteration. Default to 1000. return_probabilities : bool, optional If True, return the probability dict. Default to False. precision: int, optional Precision for the probability interval computation. Default to 14. For precision>15 mpmath is used, so a slow-down is expected. Returns ------- float The magic of the state (potentially also returning probabilities based on the input arguments). """ _num_samples: Sequence[int] if np.isscalar(num_samples): _num_samples = [num_samples] # type: ignore[assignment,list-item] else: _num_samples = num_samples # type: ignore[assignment] # Sample the state probabilities opes_bound_probs: dict[str, tuple[float, float]] = {} opes_probs = np.array([]) for num_samp in _num_samples: # Sample the numbers samples = np.random.rand(int(num_samp)) # Do not perform the computation for the already sampled numbers probs, new_samples = _check_samples_in_bound_probs( samples, opes_bound_probs ) opes_probs = np.hstack((opes_probs, probs)) # Perform the sampling for the unseen samples bound_probs = self.meas_unbiased_probabilities( new_samples, mode="magic", precision=precision # type: ignore[arg-type] ) opes_bound_probs.update(bound_probs) # Add the sampled probability to the numpy array probs, _ = _check_samples_in_bound_probs(new_samples, bound_probs) opes_probs = np.hstack((opes_probs, probs)) # Compute the magic with the samples magic = -self.num_sites * np.log(2) # Pass from probability intervals to probability values if renyi_idx > 1: magic += np.log((opes_probs ** (renyi_idx - 1)).mean()) / (1 - renyi_idx) else: magic += -(np.log(opes_probs)).mean() if return_probabilities: return magic, opes_bound_probs return magic
[docs] def meas_projective( self, nmeas: int = 1024, qiskit_convention: bool = False, seed: int | None = None, unitary_setup: UnitarySetupProjMeas | None = None, do_return_probabilities: bool = False, ) -> Any: """ Perform projective measurements along the computational basis state Parameters ---------- nmeas : int, optional Number of projective measurements. Default to 1024. qiskit_convention : bool, optional If the sites during the measure are represented such that |201> has site 0 with value one (True, mimicks bits ordering) or with value 2 (False usually used in theoretical computations). Default to False. seed : int, optional If provided it sets the numpy seed for the random number generation. Default to None unitary_setup : `None` or :class:`UnitarySetupProjMeas`, optional If `None`, no local unitaries are applied during the projective measurements. Otherwise, the unitary_setup provides local unitaries to be applied before the projective measurement on each site. Default to `None`. do_return_probabilities : bool, optional If `False`, only the measurements are returned. If `True`, two arguments are returned where the first are the measurements and the second are their probabilities. Default to `False` Return ------ measures : dict[str, int] Dictionary where the keys are the states while the values the number of occurrences. The keys are separated by a comma if local_dim > 9. (Potentially also 2nd return value with probabilities based on input arguments.) """ if nmeas == 0: return {} if seed is not None and isinstance(seed, int): np.random.seed(seed) # Put in canonic form self.site_canonize(0) measures: Any = [] probabilities: Any = [] # Loop over number of measurements for _ in range(nmeas): state = np.zeros(self.num_sites, dtype=int) temp_tens = deepcopy(self.get_tensor_of_site(0)) # Loop over tensors cumulative_prob = 1.0 for ii in range(self.num_sites): target_prob = np.random.rand() measured_idx, temp_tens, prob_ii = self._get_child_prob( temp_tens, ii, target_prob, unitary_setup, state, qiskit_convention ) cumulative_prob *= prob_ii # Save the measured state either with qiskit or # theoretical convention if qiskit_convention: state[self.num_sites - 1 - ii] = measured_idx else: state[ii] = measured_idx max_local_dim = np.max(self.local_dim) if max_local_dim < 10: measure_ii = np.array2string( state, separator="", max_line_width=2 * self.num_sites )[1:-1] else: measure_ii = np.array2string( state, separator=",", max_line_width=2 * self.num_sites )[1:-1] probabilities.append(cumulative_prob) # Come back to CPU if on GPU for list in measures (not needed since it is string) measures.append(measure_ii) measures = np.array(measures) states, counts = np.unique(measures, return_counts=True) probabilities = dict(zip(measures, probabilities)) measures = dict(zip(states, counts)) if do_return_probabilities: return measures, probabilities return measures
[docs] def meas_unbiased_probabilities( self, num_samples: int | np.ndarray, qiskit_convention: bool = False, bound_probabilities: dict | None = None, do_return_samples: bool = False, precision: int = 15, mode: str = "projection_z", ) -> Any: """ Compute the probabilities of measuring a given state if its probability falls into the explored in num_samples values. The functions divide the probability space in small rectangles, draw num_samples random numbers and then follow the path until the end. The number of states in output is between 1 and num_samples. For a different way of computing the probability tree see the function :py:func:`meas_even_probabilities` or :py:func:`meas_greedy_probabilities` Parameters ---------- num_samples : int | np.ndarray Maximum number of states that could be measured. (Alternatively, it can be a numpy-array passing directly the samples.) qiskit_convention : bool, optional If the sites during the measure are represented such that |201> has site 0 with value one (True, mimics bits ordering) or with value 2 (False usually used in theoretical computations). Default to False. probability_bounds : dict, optional Bounds on the probability computed previously with this function, i.e. if a uniform random number has value `left_bound< value< right_bound` then you measure the state. The dict structure is `{'state' : (left_bound, right_bound)}`. If provided, it speed up the computations since the function will skip values in the intervals already known. By default None. do_return_samples : bool, optional Enables, if `True`, to return the random number used for sampling in addition to the `bound_probabilities`. If `False`, only the `bound_probabilities` are returned. Default to `False` precision : int, optional Decimal place precision for the mpmath package. It is only used inside the function, and setted back to the original after the computations. Default to 15. If it is 15 or smaller, it just uses numpy. mode : str, optional Mode of the unbiased sampling. Default is "projection_z", equivalent to sampling the basis states on the Z direction. Possibilities: - "projection_z" - "magic" Return ------ bound_probabilities : dict[str, tuple[float, float]] Dictionary analogous to the `probability_bounds` parameter. The keys are separated by a comma if local_dim > 9. samples : np.ndarray Random numbers from sampling, only returned if activated by optional argument. """ # Use mpmath if precision is larger than 15 has_mp = precision > 15 # Handle internal cache; keep if possible: if bound probabilities # are passed, it must be the same state and we can keep the # cache. do_clear_cache = bound_probabilities is None # Always set gauge to site=0; even if cache is not cleared, # the actual isometry center did not move self.site_canonize(0) # Normalize for quantum trajectories old_norm = self.norm() self.normalize() get_children_prob: Callable if mode == "projection_z": local_dim = self.local_dim get_children_prob = self._get_children_prob initial_tensor = self.get_tensor_of_site(0) elif mode == "magic": local_dim = [ 4, ] * self.num_sites get_children_prob = self._get_children_magic tmp_t = self.get_tensor_of_site(0) initial_tensor = tmp_t.eye_like(tmp_t.links[0]) else: raise ValueError(f"mode {mode} not available for unbiased sampling") # ==== Initialize variables ==== # all_probs is a structure to keep track of already-visited nodes in # the probability tree. The i-th dictionary of the list correspond to # a state measured up to the i-th site. Each dictionary has the states # as keys and as value the list [state_prob, state_tens] all_probs: list[dict[str, tuple[float, _AbstractQteaTensor]]] = [ {} for _ in range(self.num_sites) ] # Initialize precision old_precision = mp.mp.dps mp.mp.dps = precision # This precision is pretty much independent of the numpy-datatype as # it comes from multiplication. However, it is important when we sum # for the intervals mpf_wrapper, almost_equal = _mp_precision_check(precision) # Sample uniformly in 0,1 the samples, taking into account the already # sampled regions given by bound_probabilities if np.isscalar(num_samples): samples, bound_probabilities = _resample_for_unbiased_prob( num_samples, bound_probabilities # type: ignore[arg-type] ) else: samples = num_samples # type: ignore[assignment] bound_probabilities = ( {} if bound_probabilities is None else bound_probabilities ) # ==== Routine ==== cum_prob = None cum_probs = None zero_probs_counter = 0 for idx, sample in enumerate(samples): # If the sample is in an already sampled area continue if idx > 0: if cum_prob is None: logger.warning( "Error information QTeaOPESError at idx=%d with samples %s.", idx, str(samples[: idx + 1]), ) raise QTeaOPESError("cum_prob not defined.") if left_prob_bound < sample < left_prob_bound + cum_prob: continue # Set the current state to no state curr_state = "" # Set the current tensor to be measured to the first one tensor = deepcopy(initial_tensor) # Initialize the probability to 1 curr_prob = 1.0 # Initialize left bound of the probability interval. Arbitrary precision left_prob_bound = mpf_wrapper(0.0) # Loop over the sites for site_idx in range(0, self.num_sites): # Initialize new possible states, adding the digits of the local basis to # the state measured up to now if site_idx > 0: states = [ curr_state + f",{ii}" for ii in range(local_dim[site_idx]) ] else: states = [curr_state + f"{ii}" for ii in range(local_dim[site_idx])] # Compute the children if we didn't already follow the branch if not np.all([ss in all_probs[site_idx] for ss in states]): # Remove useless information after the first cycle. This operation is # reasonable since the samples are ascending, i.e. if we switch path # we will never follow again the old paths. if idx > 0: all_probs[site_idx:] = [ {} for _ in range(len(all_probs[site_idx:])) ] # Compute new probabilities probs, tensor_list = get_children_prob( tensor, site_idx, curr_state, do_clear_cache ) # Clear cache only upon first iteration do_clear_cache = False # get probs to arbitrary precision # if precision > 15: # probs = mp.matrix(probs) # Multiply by the probability of being in the parent state # Multiplication is safe from the precision point of view probs = curr_prob * probs # type: ignore[operator] # Update probability tracker for next branch, avoiding # useless additional computations for ss, prob, tens in zip(states, probs, tensor_list): all_probs[site_idx][ss] = [prob, tens] # type: ignore[assignment] # Retrieve values if already went down the path else: probs = [] tensor_list = [] for prob, tens in all_probs[site_idx].values(): probs.append(prob) tensor_list.append(tens) # Select the next branch if we didn't reach the leaves # according to the random number sampled if site_idx < self.num_sites - 1: cum_probs = np.cumsum(probs) # Compute cumulative # Select first index where the sample is lower than the cumulative try: meas_idx = [ sample < float(cum_prob) for cum_prob in cum_probs ].index(True) except ValueError as exc: # If the sample is not smaller than any of the cum_probs, # it must be the last sample (probably within machine precision) eps = abs(sample - cum_probs[-1]) if eps > 10 * tensor_list[0].dtype_eps: info_str = " with eps, dtype_eps, sample, cum_probs: " info_values = ", ".join( [ str(eps), str(tensor_list[0].dtype_eps), str(sample), str(cum_probs), ] ) # pylint: disable-next=logging-not-lazy logger.warning( "Error information QTeaOPESError" + info_str + info_values ) raise QTeaOPESError( "IndexError in OPES sampling not within machine precision." ) from exc if eps > 0: # Testing against exact zero on purpose, to warn if we are close # to machine precision here logger_warning( "Sampling with OPES close to machine precision." ) meas_idx = len(cum_probs) - 1 # Update run-time values based on measured index tensor = deepcopy(tensor_list[meas_idx]) curr_state = states[meas_idx] curr_prob = probs[meas_idx] # Update value of the sample based on the followed path cum_probs_idx = 0.0 if meas_idx == 0 else cum_probs[meas_idx - 1] sample -= float(cum_probs_idx) # Update left-boundary value with probability remaining on the left # of the measured index if meas_idx > 0: if has_mp: left_prob_bound += float(cum_probs[meas_idx - 1]) else: left_prob_bound += cum_probs[meas_idx - 1] # Save values if we reached the leaves else: cum_prob = mpf_wrapper(0.0) for ss, prob in zip(states, probs): if prob < 0: msg = ( "Probably you need an higher precision." + "Measured negative probability of %s, set to 0." ) logger.warning(msg, prob) prob = 0 tmp = float(prob) if has_mp else prob if not almost_equal((tmp, 0)): bound_probabilities[ss] = ( left_prob_bound + cum_prob, left_prob_bound + cum_prob + tmp, ) else: zero_probs_counter += 1 cum_prob += tmp # For TTN with caching strategy (empty interface implemented # also for any abstract tensor network) all_probs = self.clear_cache(all_probs=all_probs, current_key=curr_state) if zero_probs_counter > 0: msg = ( f"The probability of {zero_probs_counter} samples was " + "smaller than the precision. No bounds will be associated." ) logger.warning(msg) # Rewrite with qiskit convention and remove commas if needed bound_probabilities = postprocess_statedict( bound_probabilities, local_dim=self.local_dim[0], # Danger: assuming equal local dimension qiskit_convention=qiskit_convention, ) self.scale(old_norm) mp.mp.dps = old_precision if do_return_samples: return bound_probabilities, samples return bound_probabilities
[docs] def sample_n_unique_states( self, num_unique: int, exit_coverage: float = 0.9999, ignore_opes_errors: bool = False, **kwargs: Any, ) -> dict[str, tuple[float, float]]: """ Sample a given number of unique target states. This is the target number of states, the actual number of states can differ. **Arguments** num_unique : int Number of unique states to be sampled. This is a target number; the actual number of sampled states might differ in the end. exit_coverage : float, optional Coverage at which sampling can stop even without reaching the target number of unique states. Default to 0.9999 ignore_opes_errors : bool, optional Allows to ignore `QTeaOpesError` to return at least the samples that have been sampled so far. Default to False kwargs : keyword arguments Passed through to unbiased sampling, e.g., `qiskit_convention`, `precision`, and `mode`. `bound_probabilities` is accepted if called from MPI sampling (identified by left-right keys). **Details** The target number of unique states will not be reached if the probability of the sampled states reaches the `exit_coverage`. The target number of unique states will be overfulfilled in most other cases as the last superiteration might generate slightly more states than needed. """ sampling_result = None for key in kwargs: if key not in [ "qiskit_convention", "precision", "mode", "bound_probabilities", ]: raise ValueError(f"Keyword argument `{key}` not allowed.") # We want to reuse this function for sampling from MPI, but not necessarily # other calls should be able to pass kwargs bound_probabilities. For MPI # calls, we know either left or right must be set as key. if key == "bound_probabilities": sampling_result = kwargs["bound_probabilities"] if ("left" not in sampling_result) and ("right" not in sampling_result): raise QTeaLeavesError( "Only MPI sampling allowed for bound_probailities." ) if sampling_result is not None: kwargs = deepcopy(kwargs) del kwargs["bound_probabilities"] # initial data set try: sampling_result = self.meas_unbiased_probabilities( num_samples=num_unique, bound_probabilities=sampling_result, **kwargs ) except QTeaOPESError: if not ignore_opes_errors: # Trigger exception anyway raise # User requests to ignore them, print to logger, and return empty dict logger.warning("Initial OPES sampling failed.") return {} covered_probability = sum( interval[1] - interval[0] for interval in sampling_result.values() ) while (len(sampling_result) < num_unique) and ( covered_probability < exit_coverage ): delta = num_unique - len(sampling_result) num_samples = max(10, 2 * delta) try: sampling_result = self.meas_unbiased_probabilities( num_samples=num_samples, bound_probabilities=sampling_result, **kwargs, ) except QTeaOPESError: if not ignore_opes_errors: # Trigger exception anyway raise # User requests to ignore them, print to logger, and return what # we have so far logger.warning( "OPES sampling failed in loop; returning most recent results." ) return sampling_result # type: ignore[no-any-return] covered_probability = sum( interval[1] - interval[0] for interval in sampling_result.values() ) return sampling_result # type: ignore[no-any-return]
[docs] @staticmethod def mpi_sample_n_unique_states( state: "_AbstractTN", num_unique: int, comm: Any, tensor_backend: TensorBackend, cache_size: int | None = None, cache_clearing_strategy: str | None = None, filter_func: Callable | None = None, mpi_final_op: str | None = None, root: int = 0, **kwargs: Any, ) -> dict[str, tuple[float, float]]: """ Sample a target number of unique states. This is the target number of states, the actual number of states can differ. **Arguments** state : instance of :class:`_AbstractTN` State to be sampled from; needs to exist only on root and will be broadcasted via MPI to all other threads. num_unique : int Number of unique states to be sampled. This is a target number; the actual number of sampled states might differ in the end. comm : MPI-communicator from mpi4py Communicator of threads to be used for sampling. tensor_backend : :class:`TensorBackend` Tensor backend used for state, which will be needed to build up the state during bcast. cache_size : int, optional Cache size limit for the sampling (bytes) per MPI-thread. Default to 1,000,000,000 (1GB). cache_clearing_strategy : str, optional The strategy to be used to clear the cache within the sampling routine for TTN simulation. The possibilities are "num_qubits" or "state". Default to "num_qubits". filter_func : callable or `None`, optional Takes state string and probability boundaries as the two arguments in this order and returns `True` / `False. Filtering can reduce the workload before MPI-communication of states. Default to `None` (no filtering) mpi_final_op : str or `None` Either `None` or `mpi_gather` (root will contain all states) or `mpi_all_gather` (all threads will contain all states) Default to `None`. root : int, optional Thread-index of the MPI-thread holding the TN ansatz. Default to 0. ansatz : _AbstractTN (inside kwargs) Ansatz is needed to broadcast the TN state to the other processes. kwargs : keyword arguments Passed through to unbiased sampling, e.g., `qiskit_convention`, `precision`, and `mode`. """ for key in kwargs: if key not in ["qiskit_convention", "precision", "mode", "ansatz"]: raise ValueError(f"Keyword argument `{key}` not allowed.") if mpi_final_op not in [None, "mpi_gather", "mpi_all_gather"]: raise ValueError(f"Argument for mpi_final_op {mpi_final_op} not allowed.") if MPI is None: raise ImportError( "Trying to call sampling with MPI, but MPI was not imported." ) # We need a deepcopy of the keywork arguments here as we delete the key # "ansatz". Deleting this key "ansatz" is necessary as # ``sample_n_unique_states`` will check for superfluous keyword arguments. # This choice is a bit peculiar, but allows to hide the keyword argument # "ansatz" filled by the TN implementation (and not provided by the user). kwargs = deepcopy(kwargs) ansatz = kwargs["ansatz"] exit_coverage = kwargs.get("exit_coverage", 0.9999) del kwargs["ansatz"] size = comm.Get_size() rank = comm.Get_rank() psi = ansatz.mpi_bcast(state, comm, tensor_backend, root=root) if cache_size is not None: psi.set_cache_limit_sampling(cache_size) if cache_clearing_strategy is not None: psi.set_cache_clearing_strategy_sampling(strategy=cache_clearing_strategy) ranges = np.linspace(0, 1, size + 1) # We divide the workload of sampling by dividing the interval into size # subintervals evenly distributed. We use the sampling feature of defining # an already sampled interval to block for each MPI-process the intervals # of the other MPI processes. To identify the "special" intervals, they have # keys "left" and "right". sampling_result = {} if rank > 0: sampling_result["left"] = (0.0, ranges[rank]) if rank + 1 < size: sampling_result["right"] = (ranges[rank + 1], 1.0) total_num_unique_states = 0 total_covered_probability = 0 total_num_active = size ii_active = 1 while ( total_num_unique_states < num_unique and total_covered_probability < exit_coverage and total_num_active > 0 ): if ii_active == 1: num_unique_rank = int( np.ceil((num_unique - total_num_unique_states) / total_num_active) ) sampling_result = psi.sample_n_unique_states( num_unique_rank, bound_probabilities=sampling_result, **kwargs ) # Ignore left/right boundary, account for double counting states # at boundary ii_num_unique_states = len(sampling_result) - 3 ii_covered_probability = sum( interval[1] - interval[0] for interval in sampling_result.values() ) if ii_covered_probability >= exit_coverage: ii_active = 0 ii_covered_probability -= sampling_result.get("left", [0.0, 0.0])[1] ii_covered_probability -= 1.0 - sampling_result.get("right", [1.0, 1.0])[0] # pylint: disable=c-extension-no-member # Gather results total_num_unique_states = comm.allreduce(ii_num_unique_states, op=MPI.SUM) total_covered_probability = comm.allreduce( ii_covered_probability, op=MPI.SUM ) total_num_active = comm.allreduce(ii_active, op=MPI.SUM) # pylint: enable=c-extension-no-member if rank > 0: del sampling_result["left"] if rank + 1 < size: del sampling_result["right"] if filter_func is not None: # Apply filter passed by user keys_to_delete = [] for key, value in sampling_result.items(): if filter_func(key, value): keys_to_delete.append(key) # If the garbage collector kicks in during the loop ... sampling_result[key] = None # type: ignore[assignment] for key in keys_to_delete: del sampling_result[key] if mpi_final_op in ["mpi_gather", "mpi_all_gather"]: # Collect everything on root if comm.Get_rank() == root: for ii in range(comm.Get_size()): if ii == root: continue dict_ii = comm.recv(source=ii) sampling_result.update(dict_ii) else: comm.send(sampling_result, root) if mpi_final_op == "mpi_all_gather": sampling_result = comm.bcast(sampling_result, root=root) return sampling_result
def _get_children_prob( self, tensor: _AbstractQteaTensor, site_idx: int, curr_state: str, do_clear_cache: bool, ) -> tuple[list[float], list[_AbstractQteaTensor]]: """ Compute the probability and the relative tensor state of all the children of site `site_idx` in the probability tree Parameters ---------- tensor : _AbstractQteaTensor Parent tensor, with respect to which we compute the children site_idx : int Index of the parent tensor curr_state : str Comma-separated string tracking the current state of all sites already done with their projective measurements. do_clear_cache : bool Flag if the cache should be cleared. Only read for first site when a new meausrement begins. Returns ------- probabilities : list of floats Probabilities of the children tensor_list : list[_AbstractQteaTensor] Child tensors, already contracted with the next site if not last site. """ # Cannot implement it here, it highly depends on the TN # geometry raise NotImplementedError("This function has to be overwritten.") def _get_children_magic( self, transfer_matrix: _AbstractQteaTensor, site_idx: int, curr_state: str, do_clear_cache: bool, ) -> tuple[list[float], list[_AbstractQteaTensor]]: """ Compute the magic probability and the relative tensor state of all the children of site `site_idx` in the probability tree, conditioned on the transfer matrix Parameters ---------- transfer_matrix : np.ndarray Parent transfer matrix, with respect to which we compute the children site_idx : int Index of the parent tensor curr_state : str Comma-separated string tracking the current state of all sites already done with their projective measurements. do_clear_cache : bool Flag if the cache should be cleared. Only read for first site when a new measurement begins. Returns ------- probabilities : list of floats Probabilities of the children tensor_list : list of ndarray Child tensors, already contracted with the next site if not last site. """ # Cannot implement it here, it highly depends on the TN # geometry raise NotImplementedError("This function has to be overwritten.") # pylint: disable=unused-argument
[docs] def clear_cache( self, num_qubits_keep: int | None = None, all_probs: list[dict[str, tuple[float, _AbstractQteaTensor]]] | None = None, current_key: str | None = None, ) -> list[dict[str, tuple[float, _AbstractQteaTensor]]]: """ Clear cache until cache size is below cache limit again. This function is empty and works for any tensor network without cache. If the inheriting tensor network has a cache, it has to be overwritten. **Arguments** all_probs : list of dicts Contains already calculated branches of probability tree. Each TTN has to decide if they need to be cleaned up as well. """ # pylint: enable=unused-argument # To-do: handle linter warning instead # if self is None: # # Never true, but prevent linter warning (needs self when # # cache is actually defined) and unused arguments # print("Args", num_qubits_keep, all_probs, current_key) # return None default: list[dict[str, tuple[float, _AbstractQteaTensor]]] = [] return default_optional(all_probs, default)
def _get_child_prob( self, tensor: _AbstractQteaTensor, site_idx: int, target_prob: float, unitary_setup: UnitarySetupProjMeas | None, curr_state: np.ndarray, qiskit_convention: bool, ) -> tuple[int, _AbstractQteaTensor, float]: """ Compute which child has to be selected for a given target probability and return the index and the tensor of the next site to be measured. Parameters ---------- tensor : _AbstractQteaTensor Tensor representing the site to be measured with a projective measurement. site_idx : int Index of the site to be measured and index of `tensor`. target_prob : scalar Scalar drawn from U(0, 1) and deciding on the which projective measurement outcome will be picked. The decision is based on the site `site_idx` only. unitary_setup : instance of :class:`UnitarySetupProjMeas` or `None` If `None`, no local unitaries are applied. Otherwise, unitary for local transformations are provided and applied to the local sites. curr_state : np.ndarray of rank-1 and type int Record of current projective measurements done so far. qiskit_convention : bool Qiskit convention, i.e., ``True`` stores the projective measurement in reverse order, i.e., the first qubit is stored in ``curr_state[-1]``. Passing ``False`` means indices are equal and not reversed. Returns ------- measured_idx : index of the selected state tensor : projected and normalized tensor prob_jj : probability of the measured state """ # Cannot implement it here, it highly depends on the TN # geometry raise NotImplementedError("This function has to be overwritten.")
[docs] def compute_energy(self, pos: TnPos | None = None) -> float: """ Compute the energy of the TTN through the effective operator at position pos. Parameters ---------- pos : list, optional If a position is provided, the isometry is first shifted to that position and then the energy is computed. If None, the current isometry center is computed, by default None Returns ------- float Energy of the TTN """ if self.eff_op is None: logger.warning( "Tried to compute energy with no effective operators. Returning nan." ) return np.nan # Move the iso center if needed if pos is not None: self.iso_towards(pos) else: pos = self.iso_center _pos = default_optional(pos, self.iso_pos) self.move_pos(_pos, device=self._tensor_backend.computational_device) # Retrieve the tensor at the isometry tens = self[self.iso_pos] # Get the list of operators to contract pos_links = self.get_pos_links(_pos) # Contract the tensor with the effective operators around vec = self.eff_op.contract_tensor_lists(tens, _pos, pos_links) # force a standard python float energy = float(np.real(tens.get_of(tens.dot(vec)))) # Update internal storage self._prev_energy = energy # Force to return standard python float return float(np.real(np.array(tens.get_of(energy))))
######################################################################### ######################### Optimization methods ########################## #########################################################################
[docs] def optimize_single_tensor_for_fit(self, pos: TnPos) -> _AbstractQteaTensor: """ Optimize the tensor at position `pos` based on the effective operators loaded in the TTN Parameters ---------- pos : list of ints or int Position of the tensor in the TN Returns ------- tensor: The contraction being the optimal local tensor given the current environment and bond dimension. """ # Isometrise towards the desired tensor self.iso_towards(pos) pos_links = self.get_pos_links(pos) phi = self.eff_op.phi_for_fit # type: ignore[attr-defined] tensor = self.eff_op.contract_tensor_lists(phi[pos], pos, pos_links) return tensor
[docs] def optimize_single_tensor(self, pos: TnPos) -> float: """ Optimize the tensor at position `pos` based on the effective operators loaded in the TTN Parameters ---------- pos : list of ints or int Position of the tensor in the TN Returns ------- float Computed energy """ tic = tictoc() # Isometrise towards the desired tensor self.iso_towards(pos) pos_links = self.get_pos_links(pos) dim_problem = np.prod(self[pos].shape) if dim_problem == 1: # Nothing to do - ARPACK will fail eigenvalue = self.compute_energy() return eigenvalue # If we have effective projectors, get the function to project them out. # This is passed to the eig_api() as kwargs. # The projectors act directly on the Lanczos vectors. if len(self.eff_proj) > 0: project_out_projectors = self.get_projector_function( pos=pos, pos_links=pos_links ) # orthogonalize the initial tensor and normalize self[pos] = project_out_projectors(self[pos]) self[pos].normalize() else: project_out_projectors = None # If noise is added, randomize the tensor if self.convergence_parameters.noise != 0: self[pos].randomize(noise=self.convergence_parameters.noise) self[pos].normalize() # solve with solver eigenvalues, tensor = self[pos].eig_api( self.eff_op.contract_tensor_lists, self[pos].shape, self._convergence_parameters, args_func=(pos, pos_links), kwargs_func={"pre_return_hook": project_out_projectors}, ) logger.info( "Optimized tensor %-8s max chi: %-4d energy: %-19.14g time: %.1f", pos, max(tensor.shape), eigenvalues[0], tictoc() - tic, ) self[pos] = tensor # Update internal storage self._prev_energy = eigenvalues[0] return np.real(tensor.get_of(eigenvalues[0])) # type: ignore[no-any-return]
[docs] def optimize_two_tensors( self, pos: TnPos, pos_partner: TnPos, link_self: int, link_partner: int ) -> float: """ Local ground-state search on two tensors simultaneously. Parameters ---------- pos : Tuple[int] | int Position in the TN of the tensor to time-evolve pos_partner : int, tuple of ints (depending on TN) position of partner tensor, where link between tensor and partner tensor will be randomly expandend. link_self : int Link connecting to partner tensor (in tensor at `pos`) link_partner : int Link connecting to optimized tensors (in partner tensor). Returns ------- float Computed energy """ # Isometrize towards the desired tensor. # We do this additional step to ensure they are both # on the computational device self.iso_towards(pos_partner) self.iso_towards(pos, move_to_memory_device=False) tens_a = self[pos] tens_b = self[pos_partner] is_a_outgoing = tens_a.are_links_outgoing[link_self] theta = tens_a.tensordot(tens_b, ([link_self], [link_partner])) # Build custom eff ops list custom_ops: list[Any] = [] for ii, elem in enumerate(self.get_pos_links(pos)): if ii == link_self: continue if elem is None: custom_ops.append(None) else: custom_ops.append(self.eff_op[(elem, pos)]) for ii, elem in enumerate(self.get_pos_links(pos_partner)): if ii == link_partner: continue if elem is None: custom_ops.append(None) else: custom_ops.append(self.eff_op[(elem, pos_partner)]) custom_ops, theta, inv_perm = self.permute_spo_for_two_tensors( custom_ops, theta, link_partner ) if len(self.eff_proj) > 0: raise NotImplementedError( """The effective projectors are not implemented for two tensor optimization.""" ) eigenvalues, theta = theta.eig_api( self.eff_op.contract_tensor_lists, theta.shape, self._convergence_parameters, args_func=(None, None), kwargs_func={"custom_ops": custom_ops}, ) if inv_perm is not None: theta.transpose_update(inv_perm) links_a = list(range(tens_a.ndim - 1)) links_b = list(range(tens_a.ndim - 1, theta.ndim)) tens_a, tens_b, svals, svals_cut = theta.split_svd( links_a, links_b, contract_singvals="R", conv_params=self._convergence_parameters, is_link_outgoing_left=is_a_outgoing, ) svals_cut = self._postprocess_singvals_cut( singvals_cut=svals_cut, conv_params=self._convergence_parameters ) svals_cut = theta.get_of(svals_cut) self.set_singvals_on_link(pos, pos_partner, svals) nn = tens_a.ndim perm_a = list(range(link_self)) + [nn - 1] + list(range(link_self, nn - 1)) self[pos] = tens_a.transpose(perm_a) nn = tens_b.ndim perm_b = ( list(range(1, link_partner + 1)) + [0] + list(range(link_partner + 1, nn)) ) self[pos_partner] = tens_b.transpose(perm_b) self.iso_towards(pos_partner, keep_singvals=True) return np.real(tens_a.get_of(eigenvalues[0])) # type: ignore[no-any-return]
######################################################################### ######################## Time evolution methods ######################### #########################################################################
[docs] def timestep_single_tensor( self, pos: TnPos, next_pos: TnPos | None, sc: complex ) -> list[KrylovInfo]: """ Time step for a single-tensor update on a single tensor `exp(sc*Heff*dt)`. Parameters ---------- pos : Tuple[int] | int Position in the TN of the tensor to time-evolve next_pos: Tuple[int] | int | None Position in the TN of the next tensor to time-evolve. If is `None`, we do not back-evolve in time. sc : complex Multiplicative factor in the exponent `exp(sc*Heff*dt)` Return ------ List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ logger.info("Time-step at tensor %s-%s", pos, next_pos) logger.debug("Time-step at tensor's norm %f, scalar %s", self.norm(), sc) timestep_info = [] # Isometrize towards the desired tensor self.iso_towards(pos) pos_links = self.get_pos_links(pos) if sc.imag == 0: sc = sc.real krylov_solver = self._solver( self[pos], sc, self.eff_op.contract_tensor_lists, self._convergence_parameters, args_func=(pos, pos_links), ) self[pos], conv = krylov_solver.solve() timestep_info.append(conv) if next_pos is not None: # Have to evolve backwards # ------------------------ # # This is a bit inconvenient, because we have to shift the isometry # center by hand as the r-tensor has to be evolved backwards in time. ( rtens, pos_partner, link_partner, path_elem, ) = self._partial_iso_towards_for_timestep(pos, next_pos) # Retrieve operator from partner to iso center ops_a = self.eff_op[(pos_partner, pos)] # Path elem src layer-tensor-link, dst layer-tensor-link self._update_eff_ops(path_elem) # Needing just one operators, no idxs needed ops_b = self.eff_op[(pos, pos_partner)] # Assumed to be in the order of links ops_list_reverse = [ops_b, ops_a] krylov_solver = self._solver( rtens, -sc, self.eff_op.contract_tensor_lists, self._convergence_parameters, args_func=(None, None), kwargs_func={"custom_ops": ops_list_reverse}, ) rtens, conv = krylov_solver.solve() timestep_info.append(conv) tmp = rtens.tensordot(self[pos_partner], ([1], [link_partner])) if link_partner == 0: self[pos_partner] = tmp else: nn = self[pos_partner].ndim perm = ( list(range(1, link_partner + 1)) + [0] + list(range(link_partner + 1, nn)) ) self[pos_partner] = tmp.transpose(perm) self.iso_center = pos_partner # Move pos to the memory device self.move_pos(pos, device=self._tensor_backend.memory_device, stream=True) return timestep_info
[docs] def timestep_two_tensors( self, pos: TnPos, next_pos: TnPos | None, sc: complex, skip_back: bool ) -> list[KrylovInfo]: """ Time step for a single-tensor update on two tensors `exp(sc*Heff*dt)`. Parameters ---------- pos : Tuple[int] | int Position in the TN of the tensor to time-evolve next_pos: Tuple[int] | int | None Position in the TN of the next tensor to time-evolve sc : complex Multiplicative factor in the exponent `exp(sc*Heff*dt)` skip_back : bool Flag if backwards propagation of partner tensor can be skipped; used for last two tensors, partner tensor must be next position as well. Return ------ List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ logger.debug("Time-step at tensor %s", pos) timestep_info = [] if len(self.eff_proj) > 0: # the effective projectors should not be here, as they are ignored. raise QTeaLeavesError( "Doing time evolution with self.eff_proj set." + "Remove eff_proj before starting the dynamics." ) # Isometrize towards the desired tensor self.iso_towards(pos) # pos_partner, link_pos, link_partner = self.get_pos_partner_link_expansion(pos) link_pos: int ( link_pos, pos_partner, link_partner, path_elem, ) = self._partial_iso_towards_for_timestep( # type: ignore[assignment] pos, next_pos, no_rtens=True # type: ignore[arg-type] ) tens_a = self[pos] tens_b = self[pos_partner] is_a_outgoing = tens_a.are_links_outgoing[link_pos] theta = tens_a.tensordot(tens_b, ([link_pos], [link_partner])) # Build custom eff ops list custom_ops: list[Any] = [] for ii, elem in enumerate(self.get_pos_links(pos)): if ii == link_pos: continue if elem is None: custom_ops.append(None) else: custom_ops.append(self.eff_op[(elem, pos)]) for ii, elem in enumerate(self.get_pos_links(pos_partner)): if ii == link_partner: continue if elem is None: custom_ops.append(None) else: custom_ops.append(self.eff_op[(elem, pos_partner)]) custom_ops, theta, inv_perm = self.permute_spo_for_two_tensors( custom_ops, theta, link_partner ) krylov_solver = self._solver( theta, sc, self.eff_op.contract_tensor_lists, self._convergence_parameters, args_func=(None, None), kwargs_func={"custom_ops": custom_ops}, ) theta, conv = krylov_solver.solve() timestep_info.append(conv) if inv_perm is not None: theta.transpose_update(inv_perm) links_a = list(range(tens_a.ndim - 1)) links_b = list(range(tens_a.ndim - 1, theta.ndim)) tens_a, tens_b, svals, svals_cut = theta.split_svd( links_a, links_b, contract_singvals="R", conv_params=self._convergence_parameters, is_link_outgoing_left=is_a_outgoing, ) svals_cut = self._postprocess_singvals_cut( singvals_cut=svals_cut, conv_params=self._convergence_parameters ) svals_cut = theta.get_of(svals_cut) self.set_singvals_on_link(pos, pos_partner, svals) nn = tens_a.ndim perm_a = list(range(link_pos)) + [nn - 1] + list(range(link_pos, nn - 1)) self[pos] = tens_a.transpose(perm_a) nn = tens_b.ndim perm_b = ( list(range(1, link_partner + 1)) + [0] + list(range(link_partner + 1, nn)) ) self[pos_partner] = tens_b.transpose(perm_b) self._update_eff_ops(path_elem) self.iso_center = pos_partner # Move back to memory tensor at pos self.move_pos(pos, device=self._tensor_backend.memory_device, stream=True) if not skip_back: # Have to evolve backwards # ------------------------ pos_links = self.get_pos_links(pos_partner) krylov_solver = self._solver( self[pos_partner], -sc, self.eff_op.contract_tensor_lists, self._convergence_parameters, args_func=(pos_partner, pos_links), ) self[pos_partner], conv = krylov_solver.solve() timestep_info.append(conv) elif pos_partner != next_pos: raise QTeaLeavesError("Sweep order incompatible with two-tensor update.") return timestep_info
# pylint: disable-next=too-many-return-statements
[docs] def timestep( self, dt: float | complex, mode: int, sweep_order: Sequence[TnPos] | None = None, sweep_order_back: Sequence[TnPos] | None = None, ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep. Parameters ---------- dt : float | complex Timestep mode : int Currently encoded are single-tensor TDVP first order (1), two-tensor TDVP first order (2), two-tensor TDVP second order (3), and single-tensor TDVP second order (4). A flex-TDVP as (5) is pending. The "_kraus" extension for modes 1-4 includes dissipative evolution. sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. Details ------- Flex-TDVP in the fortran implementation was using two-tensor updates as long as the maximal bond dimension is not reached and then a ratio of 9 single-tensor updates to 1 two-tensor update step. """ if len(self.eff_proj) > 0: # the effective projectors should not be here, as they are ignored. raise QTeaLeavesError( "Doing time evolution with self.eff_proj set." + "Remove eff_proj before starting the dynamics." ) dt_real: float if mode == 1: return self.timestep_mode_1(dt, sweep_order=sweep_order) if mode == 2: return self.timestep_mode_2(dt, sweep_order=sweep_order) if mode == 3: return self.timestep_mode_3( dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back, ) if mode == 4: return self.timestep_mode_4( dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back, ) if mode == 5: return self.timestep_mode_5(dt, sweep_order=sweep_order) if mode == "1_kraus": dt_real = dt.real if isinstance(dt, complex) else dt return self.timestep_mode_1_kraus(dt_real, sweep_order=sweep_order) if mode == "2_kraus": dt_real = dt.real if isinstance(dt, complex) else dt return self.timestep_mode_2_kraus(dt_real, sweep_order=sweep_order) if mode == "3_kraus": dt_real = dt.real if isinstance(dt, complex) else dt return self.timestep_mode_3_kraus( dt_real, sweep_order=sweep_order, sweep_order_back=sweep_order_back ) if mode == "4_kraus": dt_real = dt.real if isinstance(dt, complex) else dt return self.timestep_mode_4_kraus( dt_real, sweep_order=sweep_order, sweep_order_back=sweep_order_back ) raise ValueError(f"Time evolution mode {mode} not available.")
[docs] def timestep_mode_1( self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None, normalize: bool = False, ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep (single-tensor update 1st order). Parameters ---------- dt : float | complex Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ convergence_info = [] if sweep_order is None: sweep_order = self.default_sweep_order() for ii, pos in enumerate(sweep_order): # 1st order update next_pos = None if (ii + 1 == len(sweep_order)) else sweep_order[ii + 1] convergence_info.extend( self.timestep_single_tensor(pos, next_pos, -1j * dt) ) if normalize: self.normalize() return convergence_info
[docs] def timestep_mode_2( self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep (two-tensor update 1st order). Parameters ---------- dt : float | complex Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ timestep_info = [] if sweep_order is None: sweep_order = self.default_sweep_order() for ii, pos in enumerate(sweep_order): # 1st order update next_pos = None if (ii + 1 == len(sweep_order)) else sweep_order[ii + 1] skip_back = ii + 2 == len(sweep_order) timestep_info.extend( self.timestep_two_tensors(pos, next_pos, -1j * dt, skip_back) ) if skip_back: break return timestep_info
[docs] def timestep_mode_3( self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None, sweep_order_back: Sequence[TnPos] | None = None, ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep (two-tensor update 2nd order). Parameters ---------- dt : float | complex Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ conv = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order) if sweep_order_back is None: sweep_order_back = self.default_sweep_order_back() conv_back = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order_back) return conv + conv_back
[docs] def timestep_mode_4( self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None, sweep_order_back: Sequence[TnPos] | None = None, normalize: bool = False, ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep (single-tensor update 2nd order). Parameters ---------- dt : float | complex Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ conv = self.timestep_mode_1( 0.5 * dt, sweep_order=sweep_order, normalize=normalize ) if sweep_order_back is None: sweep_order_back = self.default_sweep_order_back() conv_back = self.timestep_mode_1( 0.5 * dt, sweep_order=sweep_order_back, normalize=normalize ) return conv + conv_back
[docs] def timestep_mode_5( self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None, stride_two_tensor: int = 10, ) -> list[KrylovInfo]: """ Evolve the Tensor network for one timestep (mixed two-tensor and one-tensor update, first order). Parameters ---------- dt : float Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` stride_two_tensor: int If maximum bond dimension is reached, do a two-tensor update every `stride_two_tensor` steps. Returns ------- List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. """ timestep_info = [] if sweep_order is None: sweep_order = self.default_sweep_order() idx = self._timestep_mode_5_counter self._timestep_mode_5_counter += 1 # For the main loop, we always evolve the R-tensor back in time skip_back = False for ii, pos in enumerate(sweep_order[:-2]): # Everything but the last two next_pos = sweep_order[ii + 1] link_pos: int link_pos, _, _, _ = self._partial_iso_towards_for_timestep( # type: ignore[assignment] pos, next_pos, no_rtens=True ) is_link_full = self[pos].is_link_full(link_pos) enforce_two_tensor = idx % stride_two_tensor == 0 do_two_tensor = (not is_link_full) or enforce_two_tensor if do_two_tensor: timestep_info.extend( self.timestep_two_tensors(pos, next_pos, -1j * dt, skip_back) ) else: timestep_info.extend( self.timestep_single_tensor(pos, next_pos, -1j * dt) ) # Treat the last two tensors (cannot decide individually on update-scheme) pos = sweep_order[-2] next_pos = sweep_order[-1] link_pos, pos_partner, link_partner, _ = ( self._partial_iso_towards_for_timestep( # type: ignore[assignment] pos, next_pos, no_rtens=True ) ) is_link_full_a = self[pos].is_link_full(link_pos) is_link_full_b = self[pos_partner].is_link_full(link_partner) is_link_full = is_link_full_a or is_link_full_b enforce_two_tensor = idx % stride_two_tensor == 0 do_two_tensor = (not is_link_full) or enforce_two_tensor if do_two_tensor: skip_back = True timestep_info.extend( self.timestep_two_tensors(pos, next_pos, -1j * dt, True) ) else: timestep_info.extend(self.timestep_single_tensor(pos, next_pos, -1j * dt)) timestep_info.extend(self.timestep_single_tensor(next_pos, None, -1j * dt)) return timestep_info
[docs] def timestep_mode_1_kraus( self, dt: float, sweep_order: Sequence[TnPos] | None = None, normalize: bool = False, ) -> tuple[list[KrylovInfo], Any]: """ Evolve the Tensor network for one timestep of the Lindblad master equation (single-tensor update1st order) + Local dissipation. Parameters ---------- dt : float Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. svals_cut : singular values cut in truncation. """ if not isinstance(self.eff_op, _AbstractEffectiveMpo): raise QTeaLeavesError( "Effective operator must be derived from _AbstractEffectiveMpo for Kraus channels." ) kraus_ops = self.eff_op.get_local_kraus_operators(dt) conv1 = self.timestep_mode_1( 0.5 * dt, sweep_order=sweep_order, normalize=normalize ) self.apply_local_kraus_channel(kraus_ops=kraus_ops) conv2 = self.timestep_mode_1( 0.5 * dt, sweep_order=sweep_order, normalize=normalize ) conv = conv1 + conv2 return conv
[docs] def timestep_mode_2_kraus( self, dt: float, sweep_order: Sequence[TnPos] | None = None ) -> tuple[list[KrylovInfo], Any]: """ Evolve the Tensor network for one timestep of the Lindblad master equation (two-tensor update 1st order) + Local dissipation. Parameters ---------- dt : float Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. svals_cut : singular values cut in truncation. """ if not isinstance(self.eff_op, _AbstractEffectiveMpo): raise QTeaLeavesError( "Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates." ) kraus_ops = self.eff_op.get_local_kraus_operators(dt) conv1 = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order) self.apply_local_kraus_channel(kraus_ops=kraus_ops) conv2 = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order) conv = conv1 + conv2 return conv
[docs] def timestep_mode_3_kraus( self, dt: float, sweep_order: Sequence[TnPos] | None = None, sweep_order_back: Sequence[TnPos] | None = None, ) -> tuple[list[KrylovInfo], Any]: """ Evolve the Tensor network for one timestep of the Lindblad master equation (two-tensor update 2nd order) + Local dissipation. Parameters ---------- dt : float Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. svals_cut : singular values cut in truncation. """ if not isinstance(self.eff_op, _AbstractEffectiveMpo): raise QTeaLeavesError( "Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates." ) kraus_ops = self.eff_op.get_local_kraus_operators(dt) conv1 = self.timestep_mode_3( 0.5 * dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back, ) self.apply_local_kraus_channel(kraus_ops=kraus_ops) conv2 = self.timestep_mode_3( 0.5 * dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back ) conv = conv1 + conv2 return conv
[docs] def timestep_mode_4_kraus( self, dt: float, sweep_order: Sequence[TnPos] | None = None, sweep_order_back: Sequence[TnPos] | None = None, normalize: bool = False, ) -> tuple[list[KrylovInfo], Any]: """ Evolve the Tensor network for one timestep of the Lindblad master equation (single-tensor update 2nd order) + Local dissipation. Parameters ---------- dt : float Timestep sweep_order : List[int] | None Order in which we iterate through the network for the timestep. If None, use the default in `self.default_sweep_order()` sweep_order_back : List[int] | None Order in which we iterate backwards through the network for the timestep. If None, use the default in `self.default_sweep_order()[::-1]` Returns ------- conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo] Information about the convergence of each Krylov update. svals_cut : singular values cut in truncation. """ if not isinstance(self.eff_op, _AbstractEffectiveMpo): raise QTeaLeavesError( "Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates." ) kraus_ops = self.eff_op.get_local_kraus_operators(dt) conv1 = self.timestep_mode_4( 0.5 * dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back, normalize=normalize, ) self.apply_local_kraus_channel(kraus_ops=kraus_ops) conv2 = self.timestep_mode_4( 0.5 * dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back, normalize=normalize, ) conv = conv1 + conv2 return conv
######################################################################### ########################## Observables methods ########################## #########################################################################
[docs] def check_obs_input( self, ops: _AbstractQteaTensor | list[_AbstractQteaTensor], idxs: list[int] | None = None, ) -> None: """ Check if the observables are in the right format Parameters ---------- ops : list of np.ndarray or np.ndarray Observables to measure idxs: list of ints, optional If has len>0 we expect a list of operators, otherwise just one. Return ------ None """ local_dim = self.local_dim if not np.all(np.array(local_dim) == local_dim[0]): raise RuntimeError("Measurement not defined for non-constant local_dim") if idxs is None: ops = [ops] # type: ignore[list-item] # for op in ops: # if list(op.shape) != [local_dim[0]] * 2: # raise ValueError( # "Input operator should be of shape (local_dim, local_dim)" # ) if idxs is not None: if len(idxs) != len(ops): # type: ignore[arg-type] raise ValueError( "The number of indexes must match the number of operators" )
######################################################################### ############################## MPI methods ############################## ######################################################################### # pylint: disable=c-extension-no-member def _initialize_mpi(self) -> None: if (MPI is not None) and (MPI.COMM_WORLD.Get_size() > 1): self.comm = MPI.COMM_WORLD # type: ignore[assignment] # pylint: enable=c-extension-no-member
[docs] def mpi_send_tensor(self, tensor: _AbstractQteaTensor, to_: int) -> None: """ Send the tensor to the process `to_`. Parameters ---------- tensor : _AbstractQteaTensor Tensor to send to_ : int Index of the process where to send the tensor Returns ------- None """ tensor.mpi_send(to_, self.comm)
[docs] def mpi_receive_tensor(self, from_: int) -> _AbstractQteaTensor: """ Receive the tensor from the process `from_`. Parameters ---------- from_ : int Index of the process that sent the tensor Returns ------- xp.ndarray Received tensor """ return self._tensor_backend.tensor_cls.mpi_recv( from_, self.comm, self._tensor_backend )
[docs] def reinstall_isometry_parallel(self, *args: Any, **kwargs: Any) -> None: """ Reinstall the isometry in a parallel TN parallely """
# Empty on purpose: depends on TN ansatz and # it is used ONLY for MPI-distributed ansatzes
[docs] def reinstall_isometry_serial(self, *args: Any, **kwargs: Any) -> None: """ Reinstall the isometry in a parallel TN serially """
# Empty on purpose: depends on TN ansatz and # it is used ONLY for MPI-distributed ansatzes
[docs] @staticmethod def matrix_to_tensorlist( matrix: np.ndarray | _AbstractQteaBaseTensor, n_sites: int, dim: int, conv_params: TNConvergenceParameters, tensor_backend: TensorBackend | None = None, ) -> list[_AbstractQteaBaseTensor]: """ For a given matrix returns dense MPO form decomposing with SVDs Parameters ---------- matrix : ndarray Matrix to write in LPTN(MPO) format n_sites : int Number of sites dim : int Local Hilbert space dimension conv_params : :py:class:`TNConvergenceParameters` Input for handling convergence parameters. In particular, in the LPTN simulator we are interested in: - the maximum bond dimension (max_bond_dimension) - the cut ratio (cut_ratio) after which the singular values in SVD are neglected, all singular values such that :math:`\\lambda` /:math:`\\lambda_max` <= :math:`\\epsilon` are truncated tensor_backend : instance of :class:`TensorBackend` Default for `None` is :class:`QteaTensor` with np.complex128 on CPU. Return ------ List[_AbstractQteaBaseTensor] List of tensor, the MPO decomposition of the matrix """ if tensor_backend is None: # prevents dangerous default value similar to dict/list tensor_backend = TensorBackend() work: _AbstractQteaBaseTensor if not isinstance(matrix, tensor_backend.base_tensor_cls): work = tensor_backend.tensor_cls.from_elem_array(matrix) # type: ignore[attr-defined] else: work = matrix bond_dim = 1 tensorlist = [] for ii in range(0, n_sites - 1): # dim dim**(n_sites-1) # | || # O --[unfuse]--> O --[fuse upper and lower legs]--> # | || # # ==O== --[SVD, truncating]--> ==O-o-O== # # | | # --[unfuse]--> O-O ---iterate # | | # dim dim**(n_sites-1) work = work.reshape( ( bond_dim, dim, dim ** (n_sites - 1 - ii), dim, dim ** (n_sites - 1 - ii), ), ) tens_left, work, _, _ = work.split_svd( [0, 1, 3], [2, 4], contract_singvals="R", conv_params=conv_params ) tensorlist.append(tens_left) bond_dim = deepcopy(work.shape[0]) work = work.reshape((work.shape[0], dim, dim, 1)) tensorlist.append(work) return tensorlist
[docs] def debug_device_memory(self) -> None: """ Write informations about the memory usage in each device, and how many tensors are stored in each device. This should not be used in performance simulations but only in debugging. """ # First we do this for tensors in the Tensor network tensors_in_device: dict[str, int] = {} memory_in_device: dict[str, int] = {} for tens in self._iter_tensors(): if tens.device in tensors_in_device: tensors_in_device[tens.device] += 1 memory_in_device[tens.device] += tens.getsizeof() else: tensors_in_device[tens.device] = 1 memory_in_device[tens.device] = tens.getsizeof() nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum() for ( device, ntens, ) in tensors_in_device.items(): mem = memory_in_device[device] logger.debug( "TN tensors in %s are %d/%d for %d bytes", device, ntens, nt_tot, mem, ) # Then we do the same for tensors in the effective operators if self.eff_op is not None: tensors_in_device = {} memory_in_device = {} for eff_ops_link in self.eff_op.values(): for tens in eff_ops_link: if tens.device in tensors_in_device: tensors_in_device[tens.device] += 1 memory_in_device[tens.device] += tens.getsizeof() else: tensors_in_device[tens.device] = 1 memory_in_device[tens.device] = tens.getsizeof() nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum() for ( device, ntens, ) in tensors_in_device.items(): mem = memory_in_device[device] logger.debug( "Effective operators tensors in %s are %d/%d for %d bytes", device, ntens, nt_tot, mem, ) # Count the memory for effective projectors for proj in self.eff_proj: tensors_in_device = {} memory_in_device = {} for tens in proj.values(): if tens.device in tensors_in_device: tensors_in_device[tens.device] += 1 memory_in_device[tens.device] += tens.getsizeof() else: tensors_in_device[tens.device] = 1 memory_in_device[tens.device] = tens.getsizeof() nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum() for ( device, ntens, ) in tensors_in_device.items(): mem = memory_in_device[device] logger.debug( "Effective operators tensors in %s are %d/%d for %d bytes", device, ntens, nt_tot, mem, ) logger.debug( "Used bytes in device memory: %d/%d", self.data_mover.device_memory, self.data_mover.mempool.total_bytes(), # type: ignore[attr-defined] )
######################################################################### ############################## Measurement methods ###################### #########################################################################
[docs] def meas_exp_value(self, mpo: Any) -> float: """ Measures the expectation value of a given MPO for a TN state. Arguments --------- mpo : :py:class:`ITPO` or iterator The MPO whose expectation value we want to measure. The measurement has two modes. The first mode is passing the ITPO MPO, in which case the entire mpo is measured at once. The second mode is to pass an iterator which yields ITPO terms of the MPO batch by batch, in which case the output will be the sum of expectation values performed on each batch. The second mode is useful when the MPO to measure contains a large number of ITPO terms, therefore measuring it in batches escapes the memory issues. Return ------ exp_value : float Expectation value of the MPO. """ # save this info for later iso_center = deepcopy(self.iso_center) state_eff_op = self.eff_op # Check if mpo is an iterator, if not then measure entire mpo at once if not inspect.isgeneratorfunction(mpo): # check tensor backend mpo_tensor_backend = mpo.site_terms.construct_tensor_backend() if mpo_tensor_backend.device != self.tensor_backend.device: raise ValueError( "MPO and state devices don't match: " f"{mpo_tensor_backend.device} and {self.tensor_backend.device}." ) if mpo_tensor_backend.tensor_cls != self.tensor_backend.tensor_cls: raise ValueError( "MPO and state tensor classes don't match: " f"{mpo_tensor_backend.tensor_cls} and {self.tensor_backend.tensor_cls}." ) if hasattr(self, "de_layer"): # include the disentanglers into the itpo mpo = self.de_layer.contract_de_layer( mpo, self.tensor_backend, params=None ) # Carry out the measurement of this sub-MPO self.eff_op = None # type: ignore[assignment] # compute eff ops mpo.setup_as_eff_ops(self) # measure exp_value = self.compute_energy() # otherwise measure in batches else: # measuring iteratively batch by batch exp_value = 0.0 for sub_mpo in mpo(): if np.abs(exp_value) < 1e-8: # check tensor backend on the first entry mpo_tensor_backend = sub_mpo.site_terms.construct_tensor_backend() if mpo_tensor_backend.device != self.tensor_backend.device: raise ValueError( "MPO and state devices don't match: " f"{mpo_tensor_backend.device} and {self.tensor_backend.device}." ) if mpo_tensor_backend.tensor_cls != self.tensor_backend.tensor_cls: raise ValueError( "MPO and state tensor classes don't match: " f"{mpo_tensor_backend.tensor_cls} and {self.tensor_backend.tensor_cls}." ) if hasattr(self, "de_layer"): # include the disentanglers into the itpo sub_mpo = self.de_layer.contract_de_layer( sub_mpo, self.tensor_backend, params=None ) # Carry out the measurement of this sub-MPO self.eff_op = None # type: ignore[assignment] # compute eff ops sub_mpo.setup_as_eff_ops(self) # measure exp_value += self.compute_energy() # restore effective operators and iso center to get state into # previous form self.eff_op = state_eff_op _ = self.compute_energy() if iso_center is not None: self.iso_towards(iso_center) return exp_value
[docs] def postprocess_statedict( state_dict: dict[str, Any], local_dim: int = 2, qiskit_convention: bool = False ) -> dict: """ Remove commas from the states defined as keys of statedict and, if `qiskit_convention=True` invert the order of the digits following the qiskit convention Parameters ---------- state_dict : dict State dictionary, which keys should be of the format 'd,d,d,d,d,...,d' with d from 0 to local dimension local_dim : int or array-like of ints, optional Local dimension of the sites. Default to 2 qiskit_convention : bool, optional If True, invert the digit ordering to follow qiskit convention Return ------ dict The postprocessed state dictionary """ max_local_dim = np.max(resolve_scalar_list(local_dim)) postprocecessed_state_dict = {} for key, val in state_dict.items(): # If the maximum of the local_dim is <10 # remove the comma, since the definition # is not confusing if max_local_dim < 10: key = key.replace(",", "") # Invert the values if qiskit_convention == True if qiskit_convention: postprocecessed_state_dict[key[::-1]] = val else: postprocecessed_state_dict[key] = val return postprocecessed_state_dict
def _resample_for_unbiased_prob( num_samples: int, bound_probabilities: dict | None ) -> tuple[np.ndarray, dict]: """ Sample the `num_samples` samples in U(0,1) to use in the function :py:func:`meas_unbiased_probabilities`. If `bound_probabilities` is not None, then the function checks that the number of samples outside the ranges already computed in bound_probabilities are not in total num_samples. The array returned is sorted ascendingly Parameters ---------- num_samples : int Number of samples to be drawn for :py:func:`meas_unbiased_probabilities` bound_probabilities : dict or None See :py:func:`meas_unbiased_probabilities`. Return ------ np.ndarray Sorted samples in (0,1) dict Empty dictionary if bound_probabilities is None, otherwise the bound_probabilities input parameter. """ if (bound_probabilities is None) or (len(bound_probabilities) == 0): # Contains the boundary probability of measuring the state, i.e. if a uniform # random number has value left_bound< value< right_bound then you measure the # state. The dict structure is {'state' : [left_bound, right_bound]} bound_probabilities = {} samples = _random_uniform(0, 1, num_samples) else: # Prepare the functions to be used later on based on precision mpf_wrapper, almost_equal = _mp_precision_check(mp.mp.dps) # Go on and sample until you reach an effective number of num_samples, # withouth taking into account those already sampled in the given # bound_probabilities bounds_array = np.zeros((len(bound_probabilities), 2)) for idx, bound in enumerate(bound_probabilities.values()): bounds_array[idx, :] = bound bounds_array = bounds_array[bounds_array[:, 0].argsort()] if "left" in bound_probabilities and len(bound_probabilities) > 1: bounds_array[0, 1] = min(bounds_array[0, 1], bounds_array[1, 0]) if "right" in bound_probabilities and len(bound_probabilities) > 1: bounds_array[-1, 0] = max(bounds_array[-1, 0], bounds_array[-2, 1]) # Immediatly return if almost all the space has been measured if almost_equal( (np.sum(bounds_array[:, 1] - bounds_array[:, 0]), mpf_wrapper(1.0)) ): return np.random.uniform(0, 1, 1), bound_probabilities if mp.mp.dps > 15: logger_warning( "Resampling is performed at standard precision 16 digits" + "at least in np.random.choice." ) # Sample unsampled areas. First, prepare array for sampling array_for_sampling = [] last_bound = 0 last_idx = 0 while not almost_equal((last_bound, mpf_wrapper(1.0))): # Skip if interval already measured if last_idx < len(bounds_array) and almost_equal( (last_bound, bounds_array[last_idx, 0]) ): last_bound = bounds_array[last_idx, 1] last_idx += 1 # Save interval else: if 0 < last_idx < len(bounds_array): array_for_sampling.append( [bounds_array[last_idx - 1, 1], bounds_array[last_idx, 0]] ) last_bound = bounds_array[last_idx, 0] elif last_idx == len(bounds_array): array_for_sampling.append([bounds_array[last_idx - 1, 1], 1]) last_bound = 1 else: # Initial case array_for_sampling.append([0, bounds_array[last_idx, 0]]) last_bound = bounds_array[last_idx, 0] nparray_for_sampling = np.array(array_for_sampling) # Sample from which intervals you will sample sample_prob = nparray_for_sampling[:, 1] - nparray_for_sampling[:, 0] sample_prob[sample_prob < 0] = 0.0 sample_prob /= np.sum(sample_prob) intervals_idxs = np.random.choice( np.arange(len(array_for_sampling)), size=num_samples, replace=True, p=sample_prob, ) intervals_idxs, num_samples_per_interval = np.unique( intervals_idxs, return_counts=True ) # Finally perform uniform sampling samples = np.zeros(1) for int_idx, num_samples_int in zip(intervals_idxs, num_samples_per_interval): interval = nparray_for_sampling[int_idx, :] samples = np.hstack( ( samples, np.random.uniform( *interval, size=num_samples_int ), # type: ignore[call-overload] ) ) samples = samples[1:] # Sort the array samples = np.sort(samples) return samples, bound_probabilities def _projector( idxs: int | Sequence[int], shape: int | Sequence[int], xp: Any = np ) -> np.ndarray: """ Generate a projector of a given shape on the subspace identified by the indexes idxs Parameters ---------- idxs : int or array-like of ints Indexes where the diagonal of the projector is 1, i.e. identifying the projector subspace shape : int or array-like of ints Dimensions of the projector. If an int, it is assumed a square matrix xp : module handle Module handle for the creation of the projector. Possible are np (cpu) or cp (cpu). Default to np. """ logger_warning("DeprecationWarning: helper function `_projector` deprecated.") _idxs = resolve_scalar_list(idxs) if np.isscalar(shape): shape = (shape, shape) # type: ignore[assignment] inds = np.array(_idxs, dtype=int) projector = xp.zeros(shape) projector[inds, inds] = 1 return projector # type: ignore[no-any-return] def _projector_for_rho_i( idxs: int | Sequence[int], rho_i: _AbstractQteaTensor ) -> _AbstractQteaTensor: """ Generate a projector of a given shape on the subspace identified by the indexes idxs Parameters ---------- idxs : int or array-like of ints Indexes where the diagonal of the projector is 1, i.e. identifying the projector subspace rho_i : _AbstractQteaTensor Reduced density matrix for which a projector is needed. """ inds: Sequence[int] inds = [idxs] if np.isscalar(idxs) else idxs # type: ignore[list-item,assignment] projector = rho_i.zeros_like() for ii in inds: projector.set_diagonal_entry(ii, 1.0) return projector def _mp_precision_check(precision: int) -> tuple[Callable, Callable]: """ Based on the precision selected, gives a wrapper around the initialization of variables and almost equal check. In particolar, if `precision>15`, use mpmath library Parameters ---------- precision : int Precision of the computations Return ------ callable Initializer for variables callable Almost equal check for variables """ if precision > 15: # pylint: disable-next=unnecessary-lambda mpf_wrapper = lambda x: mp.mpf(x) almost_equal = lambda x: mp.almosteq( x[0], x[1], abs_eps=mp.mpf(10 ** (-precision)) ) else: mpf_wrapper = lambda x: x almost_equal = lambda x: np.isclose(x[0], x[1], atol=10 ** (-precision), rtol=0) return mpf_wrapper, almost_equal def _check_samples_in_bound_probs( samples: np.ndarray, bound_probabilities: dict ) -> tuple[np.ndarray, np.ndarray]: """ Check if the samples are falling in the probability intervals defined by the dictionary bound_probabilities, received as output by the OPES/unbiased sampling Parameters ---------- samples : np.ndarray List of samples bound_probabilities : dict Dictionary of bound probabilities, where the key is the measure and the values the intervals of probability Returns ------- np.ndarray(float) The probability sampled by samples, repeated the correct amount of times np.ndarray(float) The subset of the original samples not falling into the already measured intervals """ if len(bound_probabilities) == 0: return np.array([]), samples bound_probs = np.array(list(bound_probabilities.values())) left_bound = bound_probs[:, 0] right_bound = bound_probs[:, 1] probs = bound_probs[:, 1] - bound_probs[:, 0] new_samples: list[float] = [] def get_probs(sample: float, new_samples: list) -> float: # Careful, accessing variable outside of the scope of this function # Okay, because used directly condition = np.logical_and(sample < right_bound, sample > left_bound) if not any(condition): new_samples.append(sample) return -1.0 res = probs[condition] return res[0] # type: ignore[no-any-return] # get_probs = np.vectorize(get_probs) probablity_sampled = np.array([get_probs(ss, new_samples) for ss in samples]) probablity_sampled = probablity_sampled[probablity_sampled > 0].astype(float) return probablity_sampled, np.array(new_samples) def _random_uniform(lower: float, upper: float, num_samples: int) -> np.ndarray: """ Return a numpy array of num_samples samples distributed uniformly over the interval [lower, upper). If the mpmath precision is greater than 15 uses mpmath, otherwise uses numpy.random.uniform. Parameters ---------- lower : float Lower boundary of the output interval. All values generated will be greater than or equal to lower. upper : float Upper boundary of the output interval. All values generated will be less than upper. num_samples : int Number of sampled values. Returns ------- np.ndarray | np.array(mpf) Drawn samples. """ if mp.mp.dps <= 15: return np.random.uniform(lower, upper, num_samples) samples = np.array( [ mp.mpf( "0." + "".join([str(np.random.randint(10)) for _ in range(mp.mp.dps)]) ) for _ in range(num_samples) ] ) samples = lower + samples * (upper - lower) # type: ignore[operator] return samples # type: ignore[no-any-return]