# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
The module contains an abstract tensor network, from which other tensor
networks can be derived.
"""
# pylint: disable=too-many-lines, too-many-statements, too-many-branches, too-many-locals, too-many-arguments
import abc
import inspect
import json
import logging
import os
import pickle
from collections.abc import Iterable, Iterator
from copy import deepcopy
from functools import partial
from time import time as tictoc
from typing import Any, Callable, Self, Sequence, TypeAlias, TypeVar
import mpmath as mp # type: ignore[import-untyped]
import numpy as np
from qtealeaves.convergence_parameters import TNConvergenceParameters
from qtealeaves.emulator.unitariesprojmeas import UnitarySetupProjMeas
from qtealeaves.mpos.abstracteffop import (
_AbstractEffectiveMpo,
_AbstractEffectiveOperators,
_AbstractEffectiveProjector,
)
from qtealeaves.solvers import KrylovInfo
from qtealeaves.tensors import TensorBackend
from qtealeaves.tensors.abstracttensor import (
_AbstractDataMover,
_AbstractQteaBaseTensor,
_AbstractQteaTensor,
)
from qtealeaves.tooling import QteaJsonEncoder, QTeaLeavesError, QTeaOPESError
from qtealeaves.tooling.mpisupport import MPI, TN_MPI_TYPES
from qtealeaves.tooling.type_alias import TnPos
__all__ = [
"postprocess_statedict",
"MPI",
"TN_MPI_TYPES",
]
logger = logging.getLogger(__name__)
# Cannot import them here, but improves clarity a bit
MPS: TypeAlias = "_AbstractTN" # pylint: disable=invalid-name
T = TypeVar("T")
# def default_optional(value: Optional[T], default: T) -> T:
def default_optional(value: T | None, default: T) -> T:
"""Helper to collapse an optional argument to its actual type."""
if value is None:
return default
return value
def resolve_scalar_list(value: T | Sequence[T]) -> Sequence[T]:
"""Helper to resolve a scalar and a list for typing."""
if np.isscalar(value):
return [value] # type: ignore[list-item]
return value # type: ignore[return-value]
# pylint: disable-next=dangerous-default-value
def logger_warning(*args: Any, storage: list = []) -> None:
"""Workaround to display warnings only once in logger."""
if args in storage:
return
storage.append(args)
logger.warning(*args)
# pylint: disable-next=too-many-public-methods, too-many-instance-attributes
[docs]
class _AbstractTN(abc.ABC):
"""
Abstract tensor network class with methods applicable to any
tensor network.
Parameters
----------
num_sites: int
Number of sites
convergence_parameters: :py:class:`TNConvergenceParameters`
Class for handling convergence parameters. In particular,
in the python TN simulator, we are interested in:
- the *maximum bond dimension* :math:`\\chi`;
- the *cut ratio* :math:`\\epsilon` after which the singular
values are neglected, i.e. if :math:`\\lamda_1` is the
bigger singular values then after an SVD we neglect all the
singular values such that
:math:`\\frac{\\lambda_i}{\\lambda_1}\\leq\\epsilon`
local_dim: int, optional
Local dimension of the degrees of freedom. Default to 2.
requires_singvals : boolean, optional
Allows to enforce SVD to have singular values on each link available
which might be useful for measurements, e.g., bond entropy (the
alternative is traversing the whole TN again to get the bond entropy
on each link with an SVD).
tensor_backend : `None` or instance of :class:`TensorBackend`
Default for `None` is :class:`QteaTensor` with np.complex128 on CPU.
"""
extension = "tn"
has_de = False
skip_tto = False
def __init__(
self,
num_sites: int,
convergence_parameters: TNConvergenceParameters,
local_dim: int = 2,
requires_singvals: bool = False,
tensor_backend: TensorBackend | None = None,
) -> None:
# Class attributes by arguments
self._num_sites = num_sites
self._local_dim = (
[local_dim] * num_sites if np.isscalar(local_dim) else local_dim
)
self._convergence_parameters = convergence_parameters
self._tensor_backend = default_optional(tensor_backend, TensorBackend())
# Other class attributes
self._iso_center = None
self.eff_op: _AbstractEffectiveOperators = None # type: ignore[assignment]
self.eff_proj: list[_AbstractEffectiveProjector] = []
# internal storage for last energy measurement for algorithms which
# need an estimate where it is sufficient to rely that this number
# is not too outdated
self._prev_energy: float = None # type: ignore[assignment]
# Selection of QR vs SVD
self._requires_singvals = requires_singvals
# store solver to be used
self._solver: type = None # type: ignore[assignment]
# Attributes for MPI
self.comm = None
# ML wants to use iso_towards without QR/SVD (which is one super-special
# use case of a binary label (or rounding approach) + single-tensor update.
self._decomposition_free_iso_towards = False
# Run checks on input
# -------------------
if not isinstance(convergence_parameters, TNConvergenceParameters):
raise TypeError(
"Convergence parameters must be TNConvergenceParameters class."
)
if not isinstance(self._tensor_backend, TensorBackend):
raise TypeError(
f"Passed wrong type {type(self._tensor_backend)} to backend."
)
if self._convergence_parameters.max_bond_dimension < 1:
raise ValueError("The minimum bond dimension for a product state is 1.")
if self._convergence_parameters.cut_ratio < 0:
raise ValueError("The cut_ratio value must be positive.")
# For definition in init here, it must be a list
if len(self.local_dim) != num_sites: # type: ignore[arg-type]
raise ValueError(
f"Length of local_dim {len(local_dim)} " # type: ignore[arg-type]
f"differs from num_sites {num_sites}."
)
if np.min(self.local_dim) < 2:
raise ValueError("Local dimension cannot be smaller than 2.")
# internal variable for flex-TDVP
self._timestep_mode_5_counter = 0
# cache for local density matrices
self._cache_rho: dict[int, _AbstractQteaTensor] = {}
# MPI initialization
self._initialize_mpi()
# --------------------------------------------------------------------------
# Properties
# --------------------------------------------------------------------------
@property
def convergence_parameters(self) -> TNConvergenceParameters:
"""Get the convergence settings from the TN."""
return self._convergence_parameters
@convergence_parameters.setter
def convergence_parameters(self, value: TNConvergenceParameters) -> None:
"""
Set the convergence settings from the TN. (no immediate effect, only
in next steps).
"""
self._convergence_parameters = value
@property
def data_mover(self) -> _AbstractDataMover:
"""Get the data mover od the tensor."""
return self._tensor_backend.datamover
@property
@abc.abstractmethod
def default_iso_pos(self) -> TnPos:
"""
Returns default isometry center position, e.g., for initialization
of effective operators.
"""
@property
def device(self) -> str:
"""Device where the tensor is stored."""
return self._tensor_backend.device
@property
def memory_device(self) -> str:
"""Return the memory device stored in the tensor backend."""
return self.tensor_backend.memory_device
@property
def computational_device(self) -> str:
"""Return the computational device stored in the tensor backend."""
return self.tensor_backend.computational_device
@property
def dtype(self) -> Any:
"""Data type of the underlying arrays."""
for tensor in self._iter_tensors():
return tensor.dtype
return None
[docs]
def dtype_to_char(self) -> str:
"""
Translate current data type of the tensor back to char C, D, S, Z, H, or I.
"""
for tensor in self._iter_tensors():
return tensor.dtype_to_char()
raise QTeaLeavesError("Empty tensor network.")
@property
def dtype_eps(self) -> float:
"""Data type's machine precision of the underlying arrays."""
for tensor in self._iter_tensors():
return tensor.dtype_eps
raise QTeaLeavesError("Empty tensor network.")
@property
def iso_center(self) -> TnPos | None:
"""Isometry center of the tensor network (int|tuple|None)"""
return self._iso_center
@iso_center.setter
def iso_center(self, value: TnPos | None) -> None:
"""Change the value of the iso center"""
if isinstance(value, (int, tuple)):
self._iso_center = value # type: ignore[assignment]
elif value is None:
self._iso_center = None
else:
self._iso_center = tuple(value)
@property
def iso_pos(self) -> TnPos:
"""Returns the position of the iso center, but throws error if None."""
if self.iso_center is None:
raise QTeaLeavesError("TN ansatz is not isometrized.")
return self.iso_center
@property
def has_symmetry(self) -> bool:
"""Check if TN is built out of symmetric tensors."""
for tensor in self._iter_tensors():
return tensor.has_symmetry
raise QTeaLeavesError("Empty tensor network.")
@property
def num_sites(self) -> int:
"""Number of sites property."""
return self._num_sites
@property
def local_dim(self) -> Sequence[int]:
"""Local dimension property"""
if isinstance(self._local_dim, int):
# Constant Hilbert space
return [self._local_dim] * self.num_sites
if isinstance(self._local_dim, np.ndarray):
# Potentially different Hilbert spaces via numpy array
return self._local_dim
if isinstance(self._local_dim[0], (int, np.int64, np.int32)):
# Potentially different Hilbert spaces via list, detect
# cases without symmetry by checking first entry for int
return self._local_dim # type: ignore[return-value]
# Case for symmetries
return [elem.shape for elem in self._local_dim] # type: ignore[misc,union-attr]
@property
def local_links(self) -> Any:
"""Return information on local link (for symmetries more than integer)."""
return self._local_dim
@property
def solver(self) -> type | None:
"""Return current solver for the TN."""
return self._solver
@solver.setter
def solver(self, value: type) -> None:
"""Set the solver, e.g., currently used for exp(-i H dt) |psi>."""
self._solver = value
@property
def tensor_backend(self) -> TensorBackend:
"""Return tensor backend stored for this TN-ansatz."""
return self._tensor_backend
[docs]
@staticmethod
def tn_mpi_types() -> dict[str, Any]:
"""Provide convenient access to the `TN_MPI_TYPES` for TN ansaetze."""
return TN_MPI_TYPES
# --------------------------------------------------------------------------
# Overwritten operators
# --------------------------------------------------------------------------
def __repr__(self) -> str:
"""
Return the class name as representation.
"""
return self.__class__.__name__
@abc.abstractmethod
def __getitem__(self, key: TnPos) -> _AbstractQteaTensor:
"""Overwriting the getitem, you can access tensors in the TN by their position.
Parameters
----------
key : TnPos
Position in the ansatz of which the tensor should be returned.
"""
def __len__(self) -> int:
"""
Provide number of sites in the TN.
"""
return self.num_sites
@abc.abstractmethod
def __setitem__(self, key: TnPos, value: _AbstractQteaTensor) -> None:
"""Set a tensor in the TN by its position.
Parameters
----------
key : TnPos
Position in the ansatz of which the tensor should be set.
value : _AbstractQteaTensor
New tensor to be set at the given position.
"""
# --------------------------------------------------------------------------
# classmethod, classmethod like
# --------------------------------------------------------------------------
[docs]
@classmethod
@abc.abstractmethod
def from_statevector(
cls,
statevector: np.ndarray,
local_dim: int = 2,
conv_params: TNConvergenceParameters | None = None,
tensor_backend: TensorBackend | None = None,
) -> Self:
"""Decompose statevector to tensor network of chosen type."""
[docs]
@classmethod
@abc.abstractmethod
def from_density_matrix(
cls,
rho: np.ndarray,
n_sites: int,
dim: int,
conv_params: TNConvergenceParameters,
tensor_backend: TensorBackend | None = None,
prob: bool = False,
) -> Self:
"""Decompose density matrix to tensor network of chosen type."""
[docs]
@classmethod
@abc.abstractmethod
def from_lptn(
cls,
lptn: Self,
conv_params: TNConvergenceParameters | None = None,
**kwargs: Any,
) -> Self:
"""Converts LPTN to tensor network of chosen type."""
[docs]
@classmethod
@abc.abstractmethod
def from_mps(
cls,
mps: Self,
conv_params: TNConvergenceParameters | None = None,
**kwargs: Any,
) -> Self:
"""Converts MPS to tensor network of chosen type."""
[docs]
@classmethod
@abc.abstractmethod
def from_ttn(
cls,
ttn: Self,
conv_params: TNConvergenceParameters | None = None,
**kwargs: Any,
) -> Self:
"""Converts TTN to tensor network of chosen type."""
[docs]
@classmethod
@abc.abstractmethod
def from_tto(
cls,
tto: Self,
conv_params: TNConvergenceParameters | None = None,
**kwargs: Any,
) -> Self:
"""Converts TTO to tensor network of chosen type."""
# pylint: disable=inconsistent-return-statements
[docs]
@classmethod
def from_x(
cls,
obj: Self,
conv_params: TNConvergenceParameters | None = None,
**kwargs: Any,
) -> Self:
"""
Converts tensor network of type `x` to chosen type.
Parameters
----------
obj: :py:class:`_AbstractTN`
object to convert to chosen type.
conv_params: :py:class:`TNConvergenceParameters`, optional
Input for handling convergence parameters
Default is None.
Return
------
new_obj: chosen type decomposition of obj.
If truncations are part of the underlying result, they
are skipped.
"""
if not isinstance(obj, _AbstractTN):
raise TypeError(
f"The input is not an abstract TN, but got type {type(obj)}."
)
from_x_dict: dict[str, Callable[[Any], Self | tuple]] = {
"mps": cls.from_mps,
"lptn": cls.from_lptn,
"ttn": cls.from_ttn,
"tto": cls.from_tto,
}
if obj.extension not in from_x_dict:
raise TypeError(
f"Input is of {type(obj)} and cannot be converted to {cls}."
)
result = from_x_dict[obj.extension](obj, conv_params, **kwargs) # type: ignore[call-arg]
if isinstance(result, cls):
return result
# This catches probably some TTO cases returning a tuple
# of the ansatz and the truncation
for elem in result: # type: ignore[union-attr]
if isinstance(elem, cls):
return elem
raise QTeaLeavesError("Method did not encounter _AbstractTN to return.")
# pylint: enable=inconsistent-return-statements
[docs]
@classmethod
@abc.abstractmethod
def product_state_from_local_states(
cls,
mat: np.ndarray | list[np.ndarray],
padding: Sequence | None = None,
convergence_parameters: TNConvergenceParameters | None = None,
tensor_backend: TensorBackend | None = None,
) -> Self:
"""Construct a product (separable) state in a suitable tensor network form, given the local
states of each of the sites."""
[docs]
@classmethod
def read_pickle(
cls,
filename: str,
tensor_backend: TensorBackend | None = None,
convergence_parameters: TNConvergenceParameters | None = None,
) -> Self:
"""
Read via pickle-module.
filename : str, path-like
Filename including path where to find the tensor network.
tensor_backend : :class:`TensorBackend` | None, optional
If not `None`, attribute `_tensor_backend` is set and
data type and device are converted.
Default to `None` (no conversion)
convergence_parameters: :py:class:`TNConvergenceParameters`
Class for handling convergence parameters.
Default to `None` (use default convergence parameters)
"""
ext = "pkl" + cls.extension
if not filename.endswith(ext):
raise QTeaLeavesError(
f"Filename {filename} not valid, extension should be {ext}."
)
with open(filename, "rb") as fh:
obj = pickle.load(fh)
if not isinstance(obj, cls):
raise TypeError(
f"Loading wrong tensor network ansatz: {type(obj)} vs {cls}."
)
if tensor_backend is not None:
obj.convert(dtype=tensor_backend.dtype, device=tensor_backend.memory_device)
# pylint: disable-next=protected-access
obj._tensor_backend = tensor_backend
if convergence_parameters is not None:
obj.convergence_parameters = convergence_parameters
else:
msg = (
"Reading pickled tensor network without specifying convergence parameters.",
"Convergence parameters are not saved in the pickle file.",
"This will set convergence parameters to deafult.",
)
logger.warning(msg)
return obj
[docs]
@classmethod
@abc.abstractmethod
def mpi_bcast(
cls, state: Self, comm: Any, tensor_backend: TensorBackend, root: int = 0
) -> Self:
"""
Broadcast a whole tensor network.
"""
[docs]
def copy(
self,
dtype: Any = None,
device: str | None = None,
*,
copy_convergence_parameters: bool = True,
) -> Self:
"""
Make a copy of a TN.
**Arguments**
dtype : target dtype of the new TN. Optional. Default to `None`.
device : target device of the new TN. Optional. Default to `None`.
copy_convergence_parameters : copy the convergence parameters from
the source state. Optional, by keyword-only. Default to `True`.
**Details**
The following attributes have a special treatment and are not present
in the copied object.
* convergence_parameters (see boolean flag)
* log file (filehandle)
* MPI communicator
"""
# Store attributes which cannot be pickled, so also potential problems
# with deepcopy
storage = self._store_attr_for_pickle()
obj = deepcopy(self)
self._restore_attr_for_pickle(storage)
if copy_convergence_parameters:
obj.convergence_parameters = deepcopy(self.convergence_parameters)
obj.convert(dtype, device)
return obj
[docs]
@abc.abstractmethod
def to_dense(self, true_copy: bool = False) -> Self:
"""Convert into a TN with dense tensors (without symmetries)."""
[docs]
@classmethod
@abc.abstractmethod
def read(
cls,
filename: str,
tensor_backend: TensorBackend,
cmplx: bool = True,
order: str = "F",
) -> Self:
"""Read a TN from a formatted file."""
# --------------------------------------------------------------------------
# Checks and asserts
# --------------------------------------------------------------------------
[docs]
def is_dtype_complex(self) -> bool:
"""Check if data type is complex based on one example tensor.."""
for tensor in self._iter_tensors():
return tensor.is_dtype_complex()
raise QTeaLeavesError("Empty tensor network.")
[docs]
@staticmethod
def assert_extension(obj: "_AbstractTN", extension: str) -> None:
"""
Asserts that `obj.extension` is equal to `extension`.
Parameters
----------
obj : any
Will raise error if obj is not an instance of :class:`_AbstractTN`
extension : str
Will check if the extension of the TN ansatz matches the extionsion
of the argument. If not, a type error is raised.
"""
if not isinstance(obj, _AbstractTN):
raise TypeError(f"Expected _AbstractTN, but got type {type(obj)}.")
if obj.extension != extension:
raise TypeError(f"Expected {extension}, but got type {type(obj)}.")
[docs]
def sanity_check(self) -> None:
"""
By default, we provide an empty sanity check method which can be
overwritten by each class to check properties of the network.
"""
# by default, no checks and no need to overwrite method in class
# --------------------------------------------------------------------------
# Abstract methods to be implemented
# --------------------------------------------------------------------------
[docs]
@abc.abstractmethod
def apply_projective_operator(
self, site: int, selected_output: int | None = None, remove: bool = False
) -> tuple[int, float]:
"""
Apply a projective operator to the site **site**, and give the measurement as output.
You can also decide to select a given output for the measurement, if the probability is
non-zero. Finally, you have the possibility of removing the site after the measurement.
.. warning::
Applying projective measurements/removing sites is ALWAYS dangerous. The information
of the projective measurement should be in principle carried over the entire mps,
by iteratively applying SVDs across all sites. However, this procedure is highly
suboptimal, since it is not always necessary and will be processed by the
following two-sites operators. Thus, the procedure IS NOT applied here. Take care
that entanglement measures through :class:`TNObsBondEntropy` may give incorrect
results right after a projective operator application. Furthermore, if working
with parallel approaches, projective operators should be treated with even more
caution, since they CANNOT be applied in parallel.
Parameters
----------
site: int
Index of the site you want to measure
selected_output: int, optional
If provided, the selected state is measured. Throw an error if the probability of the
state is 0
remove: bool, optional
If True, the measured index is traced away after the measurement. Default to False.
Returns
-------
meas_state: int
Measured state
state_prob : float
Probability of measuring the output state
"""
[docs]
@abc.abstractmethod
def build_effective_operators(self, measurement_mode: bool = False) -> None:
"""
Build the complete effective operator on each
of the links. Now assumes `self.eff_op` is set.
"""
@abc.abstractmethod
def _convert_singvals(self, dtype: Any, device: str | None) -> None:
"""Convert the singular values of the tensor network to dtype/device."""
@abc.abstractmethod
def _iter_physical_links(self) -> Iterator[tuple[int, ...]]:
"""
Gives an iterator through the physical links.
In the MPS, the physical links are connected to nothing,
i.e. we assign the tensor index -2
Return
------
Tuple[int]
The identifier from_tensor, to_tensor
"""
[docs]
@abc.abstractmethod
def get_bipartition_link(
self, pos_src: TnPos, pos_dst: TnPos
) -> tuple[Sequence[int], Sequence[int]]:
"""
Returns two sets of sites forming the bipartition of the system for
a loopless tensor network. The link is specified via two positions
in the tensor network.
"""
[docs]
@abc.abstractmethod
def get_pos_links(self, pos: TnPos) -> list[TnPos | None]:
"""
Return a list of positions where all links are leading to. Number
of entries is equal to number of links. Each entry contains the position
as accessible in the actual tensor network.
"""
[docs]
@abc.abstractmethod
def get_rho_i(self, idx: int) -> _AbstractQteaTensor:
"""
Get the reduced density matrix of the site at index idx
Parameters
----------
idx : int
Index of the site
"""
[docs]
@abc.abstractmethod
def get_tensor_of_site(self, idx: int) -> _AbstractQteaTensor:
"""
Generic function to retrieve the tensor for a specific site. Compatible
across different tensor network geometries.
Parameters
----------
idx : int
Return tensor containing the link of the local
Hilbert space of the idx-th site.
"""
[docs]
@abc.abstractmethod
def iso_towards(
self,
new_iso: TnPos,
keep_singvals: bool = False,
trunc: bool = False,
conv_params: TNConvergenceParameters | None = None,
move_to_memory_device: bool = True,
) -> list[np.ndarray]:
"""
Shift the isometry center to the tensor at the
corresponding position, i.e., move the isometry to a
specific tensor, that might not be a physical.
Parameters
----------
new_iso :
Position in the TN of the tensor which should be isometrized.
keep_singvals : bool, optional
If True, keep the singular values even if shifting the iso with a
QR decomposition. Default to False.
trunc : Boolean, optional
If `True`, the shifting is done via truncated SVD.
If `False`, the shifting is done via QR.
Default to `False`.
conv_params : :py:class:`TNConvergenceParameters`, optional
Convergence parameters to use for the SVD. If `None`, convergence
parameters are taken from the TTN.
Default to `None`.
move_to_memory_device : bool, optional
If True, when a mixed device is used, move the tensors that are not the
isometry center back to the memory device. Default to True.
Returns:
truncated_s_vals (list[np.ndarray): or numpy-like entries in list.
Singular values cut in the process of shifting the isometry center.
Details
-------
The tensors used in the computation will always be moved on the computational device.
For example, the isometry movement keeps the isometry center end the effective operators
around the center (if present) always on the computational device. If move_to_memory_device
is False, then all the tensors (effective operators) on the path from the old iso to the new
iso will be kept in the computational device. This is very useful when you iterate some
protocol between two tensors, or in general when two tensors are involved.
"""
[docs]
@abc.abstractmethod
def isometrize_all(self) -> None:
"""
Isometrize towards the default isometry position with no previous
isometry center, e.g., works as well on random states.
Returns
-------
None
"""
@abc.abstractmethod
def _iter_tensors(self) -> Iterator[_AbstractQteaTensor]:
"""Iterate over all tensors forming the tensor network (for convert etc)."""
[docs]
@abc.abstractmethod
def norm(self) -> float:
"""
Calculate the norm of the state.
"""
# We opt for typing with float only (no complex) as this works on
# all tensor networks independent of the underlying tensor data type
[docs]
@abc.abstractmethod
def scale(self, factor: float) -> None:
"""
Multiply the tensor network state by a scalar factor (in-place).
Parameters
----------
factor : float
Factor for multiplication of current tensor network state.
"""
# We opt for typing with float only (no complex) as this works on
# all tensor networks independent of the underlying tensor data type
[docs]
@abc.abstractmethod
def scale_inverse(self, factor: float) -> None:
"""
Multiply the tensor network state by the inverse of a scalar factor.
Parameters
----------
factor : float
Factor for multiplication of current tensor network state.
"""
# self.scale(1.0 / factor) # Division will be excuted on device
[docs]
@abc.abstractmethod
def set_singvals_on_link(self, pos_a: TnPos, pos_b: TnPos, s_vals: Any) -> None:
"""Update or set singvals on link via two positions."""
[docs]
@abc.abstractmethod
def site_canonize(self, idx: int, keep_singvals: bool = False) -> None:
"""
Shift the isometry center to the tensor containing the
corresponding site, i.e., move the isometry to a specific
Hilbert space. This method can be implemented independent
of the tensor network structure.
Parameters
----------
idx : int
Index of the physical site which should be isometrized.
keep_singvals : bool, optional
If True, keep the singular values even if shifting the iso with a
QR decomposition. Default to False.
"""
@abc.abstractmethod
def _update_eff_ops(self, id_step: Sequence[int]) -> None:
"""
Update the effective operators after the iso shift
Parameters
----------
id_step : List[int]
List with information of the iso moving path. The encoding
is specific to the ansatz.
Returns
-------
None
Updates the effective operators in place
"""
@abc.abstractmethod
def _partial_iso_towards_for_timestep(
self, pos: TnPos, next_pos: TnPos, no_rtens: bool = False
) -> tuple[_AbstractQteaTensor | int, TnPos, int, Sequence[int]]:
"""
Move by hand the iso for the evolution backwards in time
Parameters
----------
pos : Tuple[int] | int
Position of the tensor evolved
next_pos : Tuple[int] | int
Position of the next tensor to evolve
Returns
-------
_AbstractQteaTensor | int
The R tensor of the iso movement. integer if no_rtens = True
Tuple[int]
The position of the partner (the parent in TTNs)
int
The link of the partner pointing towards pos
List[int]
The update path to pass to _update_eff_ops
"""
# requires next_pos to be not `None` and actual TnPos
[docs]
@abc.abstractmethod
def default_sweep_order(self, skip_exact_rgtensors: bool = False) -> list[TnPos]:
"""
Default sweep order to be used in the ground state search/time evolution.
Arguments
---------
skip_exact_rgtensors : bool, optional
Allows to exclude tensors from the sweep which are at
full bond dimension and represent just a unitary
transformation. Usually set via the convergence
parameters and then passed here.
Default to `False`.
Returns
-------
List[int] | List[Tuple[int]]
The generator that you can sweep through
"""
raise NotImplementedError("This method is ansatz-specific")
[docs]
def default_sweep_order_back(
self, skip_exact_rgtensors: bool = False
) -> list[TnPos]:
"""Default sweep order backwards, e.g., for second-order methods."""
return self.default_sweep_order(skip_exact_rgtensors=skip_exact_rgtensors)[::-1]
[docs]
def filter_sweep_order(
self, sweep_order: Sequence[TnPos], skip_exact_rgtensors: bool
) -> Sequence[TnPos]:
"""Filter a sweep order with respect to exact rg tensors if flag active."""
if not skip_exact_rgtensors:
return sweep_order
default_order = self.default_sweep_order(skip_exact_rgtensors=True)
filtered_sweep_order = []
for elem in sweep_order:
if elem in default_order:
filtered_sweep_order.append(elem)
return filtered_sweep_order
[docs]
@abc.abstractmethod
def get_pos_partner_link_expansion(self, pos: TnPos) -> tuple[TnPos, int, int]:
"""
Get the position of the partner tensor to use in the link expansion
subroutine
Parameters
----------
pos : int | Tuple[int]
Position w.r.t. which you want to compute the partner
Returns
-------
int | Tuple[int]
Position of the partner
int
Link of pos pointing towards the partner
int
Link of the partner pointing towards pos
"""
[docs]
def get_projector_function(
self, pos: TnPos, pos_links: list[TnPos | None]
) -> Callable:
"""
Generates a function which locally projects out the effective projectors.
Used in the excited state search.
**Arguments**
pos : tuple[int]
The position of the tensor from which to project.
pos_links : list[tuple]
Position of neighboring tensors where the links in `tensor` lead to.
**Returns**
Callable : the function.
"""
# partial should return a pointer to the function with arguments applied, so it
# should not create a whole new function object
return partial(
self._project_out_projectors,
projectors=self.eff_proj,
pos=pos,
pos_links=pos_links,
)
@staticmethod
def _project_out_projectors(
tensor: _AbstractQteaTensor,
projectors: Sequence,
pos: TnPos,
pos_links: Sequence[TnPos | None],
) -> _AbstractQteaTensor:
"""
Takes the tensor and projects out all eff_projectors at the given position.
Used for the excited state search.
Does not renormalize the tensor to the initial norm! This seems to work better for the
excited state search (it produces lower overlaps).
**Arguments**
tensor : :class:`_AbstractQteaTensor`
The tensor from which to subtract the projection.
projectors : list
A list of projectors as effective operators.
position : tuple | int
The position of the tensor.
pos_links : list[tuple]
Position of neighboring tensors where the links in `tensor` lead to.
**Returns**
:class:`_AbstractQteaTensor`
"""
tabs: Callable = tensor.get_attr("abs") # type: ignore[assignment]
# calculate all overlaps first, sort them, and first orthogonalize
# the ones with the biggest overlap
overlaps = []
abs_overlaps = []
local_projectors = []
for projector in projectors:
local_projector = projector.contract_to_projector(
tensor=None, pos=pos, pos_links=pos_links
)
# move to the same device
local_projector.convert(device=tensor.device)
overlap = local_projector.dot(tensor)
overlaps.append(overlap)
abs_overlaps.append(tabs(overlap))
local_projectors.append(local_projector)
# and now orthogonalize starting with the largest overlap
sorted_ndx = sorted(enumerate(abs_overlaps), key=lambda x: x[1], reverse=True)
for ii, _ in sorted_ndx:
overlap = overlaps[ii]
local_projector = local_projectors[ii]
tensor.add_update(other=local_projector, factor_other=-1 * overlap)
# WARNING: I know you want to renormalize here, but it is wrong!
# You will get wrong results. Luka Feb 2024
return tensor
[docs]
@abc.abstractmethod
def to_statevector(
self, qiskit_order: bool = False, max_qubit_equivalent: int = 20
) -> _AbstractQteaTensor:
"""
Decompose a given TN into statevector form if pure.
Parameters
----------
qiskit_order : bool, optional
If true, the order is right-to-left. Otherwise left-to-right
(which is the usual order in physics). Default to False.
max_qubit_equivalent : int, optional
Maximum number of qubits for which the statevector is computed.
i.e. for a maximum hilbert space of 2**max_qubit_equivalent.
Default to 20.
Returns
-------
psi : instance of :class:`_AbstractQteaTensor`
The statevector of the system
Raises
------
Mixed state: if mixed-state representations are not pure, an
error will be raised.
"""
[docs]
@abc.abstractmethod
def write(self, filename: str, cmplx: bool = True) -> None:
"""Write the TN in python format into a FORTRAN compatible format."""
[docs]
@abc.abstractmethod
def apply_local_kraus_channel(
self, kraus_ops: dict[int, _AbstractQteaTensor]
) -> float:
"""
Apply local Kraus channels to tensor network
-------
Parameters
-------
kraus_ops : dict of :py:class:`_AbstractQTeaTensor`
Dictionary, keys are site indices and elements the corresponding 3-leg Kraus tensors
Returns
-------
singvals_cut: float
Sum of singular values discarded due to truncation.
"""
[docs]
@abc.abstractmethod
def get_substate(
self, first_site: int, last_site: int, truncate: bool = False
) -> Self:
"""
Returns the smaller TN built of tensors from `first_site` to `last_site`,
where sites refer to physical sites.
Parameters
----------
first_site : int
First (physical) site defining a range of tensors which will compose the new TN.
Python indexing assumed, i.e. counting starts from 0.
last_site : int
Last (physical) site of a range of tensors which will compose the new TN.
Python indexing assumed, i.e. counting starts from 0.
truncate : Bool
If False, tensors are returned as is, possibly with non-dummy links
on edge tensors. If True, the edges of will be truncated to dummy links.
Default to False.
"""
[docs]
@abc.abstractmethod
def kron(self, other: Self) -> Self:
"""
Concatenate two TN, taking the kronecker/outer product
of the two states. The bond dimension assumed is the maximum
between the two bond dimensions.
Parameters
----------
other : :py:class:`_AbstractTN`
TN to concatenate with self.
Returns
-------
:py:class:`_AbstractTN`
Concatenation of the first TTN with the second in order
"""
# --------------------------------------------------------------------------
# Methods that can be inherited
# --------------------------------------------------------------------------
def _apply_projective_operator_common(
self, site: int, selected_output: int
) -> tuple[_AbstractQteaTensor, int, float]:
"""
Execute common steps across different ansätze.
Returns
-------
rho_i : _AbstractQteaTensor
meas_state: integer
old_norm : norm before calculating rho_i and renormalizing
"""
if selected_output is not None and selected_output > self.local_dim[site] - 1:
raise ValueError("The seleted output must be at most local_dim-1")
# Set the orthogonality center
self.site_canonize(site, keep_singvals=True)
# Normalize
old_norm = self.norm()
self.scale_inverse(old_norm)
rho_i = self.get_rho_i(site)
probabilities = rho_i.diag(real_part_only=True, do_get=True) # type: ignore[attr-defined]
cumul_probs = np.cumsum(probabilities)
random_u = np.random.rand()
meas_state = None
for ii, cprob_ii in enumerate(cumul_probs):
if selected_output is not None and ii != selected_output:
continue
if cprob_ii >= random_u or selected_output == ii:
meas_state = ii
# state_prob = probabilities[ii]
break
if meas_state is None:
raise QTeaLeavesError("Did not run into measurement.")
return rho_i, meas_state, old_norm
[docs]
def checkpoint_copy_simulation_attr(self, src: Any) -> None:
"""Copy attributes linked to the simulation, like convergence parameters."""
self.convergence_parameters = src.convergence_parameters
self.solver = src.solver
if src.comm is not None:
raise ValueError("Checkpoints and MPI are not yet enabled.")
[docs]
def checkpoint_store(
self,
folder_name_output: str,
dyn_checkpoint_file: str | None,
int_str: str,
checkpoint_indicator_file: str,
is_dyn: bool = False,
jdic: Any = None,
) -> str:
"""
Store the tensor network as checkpoint.
**Arguments**
folder_name_output : str
Name of the output folder, where we store checkpoints.
dyn_checkpoint_file : str or `None`
Name of the previous checkpoint file, which can be deleted after
creating the new checkpoint.
int_str : str
Identifier containing integers as string to identify the checkpoint
when loading in a potential future run.
checkpoint_indicator_file: str
Path to file which indicates whether checkpoints exists.
is_dyn : bool, optional
Flag to indicate if checkpoint is for statics (False) or
dynamics (True).
Default to `False`.
jdic : json-compatible structure or `None`, optional
Store additional information as json.
Default to `None` (store nothing).
"""
prev_checkpoint_file = dyn_checkpoint_file
dyn_stat_switch = "dyn" if is_dyn else "stat"
dyn_checkpoint_file = os.path.join(
folder_name_output, f"TTN_{dyn_stat_switch}_{int_str}"
)
self.save_pickle(dyn_checkpoint_file)
if jdic is not None:
with open(dyn_checkpoint_file + ".json", "w+") as fh:
json.dump(jdic, fh, cls=QteaJsonEncoder)
# Delete previous checkpoint
if prev_checkpoint_file is not None:
ext = ".pkl" + self.extension
os.remove(prev_checkpoint_file + ext)
if os.path.isfile(prev_checkpoint_file + ".json"):
os.remove(prev_checkpoint_file + ".json")
if not os.path.isfile(checkpoint_indicator_file):
with open(checkpoint_indicator_file, "w+") as fh:
pass
return dyn_checkpoint_file
[docs]
def clear_cache_rho(self) -> None:
"""Clear cache of reduced density matrices."""
self._cache_rho = {}
[docs]
def convert(self, dtype: Any, device: str | None) -> None:
"""Convert the data type and device of a tensor network inplace."""
if isinstance(dtype, str):
dtype = self.dtype_from_char(dtype)
# Converting singular values relies on tensor backend of tensors,
# thus do it before converting the tensors
self._convert_singvals(dtype, device)
# convert all tensors in a tensor network
if (self.dtype != dtype) or (self.device != device):
for tensor in self._iter_tensors():
tensor.convert(dtype, device)
# convert all effective operators
if self.eff_op is not None:
self.eff_op.convert(dtype, device)
# convert all effective projectors
for proj in self.eff_proj:
proj.convert(dtype, device) # type: ignore[arg-type]
# convert all reduced density matrices
for _, rho in self._cache_rho.items():
# Convert is an in-place update
rho.convert(dtype, device)
[docs]
def move_pos(self, pos: TnPos, device: Any = None, stream: bool = False) -> None:
"""
Move just the tensor in position `pos` with the effective
operators insisting on links of `pos` on another device.
Other objects like effective projectors will be moved as
well. Acts in place.
Note: the implementation of the tensor backend can fallback
to synchronous moves only depending on its implementation.
Parameters
----------
pos : int | Tuple[int]
Integers identifying a tensor in a tensor network.
device : str, optional
Device where you want to send the QteaTensor. If None, no
conversion. Default to None.
stream : bool | stream | None, optional
If True, use a new stream for memory communication given
by the data mover. If False, use synchronous communication
with GPU. If not a boolean (and not None), we assume it
is a stream object compatible with the backend.
None is deprecated and equals to False.
Default to False (Use null stream).
"""
if device is None:
# Quick return as pointed out in the docstring.
return
if stream is None:
logger_warning("stream=None is deprecated, use False instead.")
stream = False
# Boolean logic for stream and sync are opposite
sync = not stream if isinstance(stream, bool) else stream
_ = stream
# Convert the tensor
self.data_mover.move(self[pos], device=device, sync=sync)
pos_links = self.get_pos_links(pos)
# Convert the effective operators if present
if self.eff_op is not None:
for pos_link in pos_links:
if (pos_link, pos) in self.eff_op:
eff_op = self.eff_op[(pos_link, pos)]
# Note: effective operators on one link can be a list of tensors,
# so we need to check if it is an iterable
# https://docs.python.org/3/library/collections.abc.html#id19
if isinstance(eff_op, Iterable):
for tensor in eff_op:
self.data_mover.move(tensor, device=device, sync=sync)
else:
self.data_mover.move(eff_op, device=device, sync=sync)
# Convert the effective projectors if present.
for proj in self.eff_proj:
for pos_link in pos_links:
# Also move the tensor of psi0 (tensor at pos must exist)
# Here, we just need the tensor and use move of the data_mover
# instead of move_pos
self.data_mover.move(
proj.psi0[pos], # type: ignore[index,arg-type]
device=device,
sync=sync,
)
# pylint: disable-next=protected-access
if (pos_link, pos) in proj:
self.data_mover.move(
proj[(pos_link, pos)], # type: ignore[index]
device=device,
sync=sync,
)
self._tensor_backend.tensor_cls.free_device_memory(device=device)
[docs]
def dtype_from_char(self, dtype: str) -> Any:
"""Resolve data type from chars C, D, S, Z and optionally H."""
for tensor in self._iter_tensors():
return tensor.dtype_from_char(dtype)
raise QTeaLeavesError("Querying on empty tensor network.")
[docs]
def normalize(self) -> None:
"""
Normalize the state depending on its current norm.
"""
self.scale_inverse(self.norm())
[docs]
def pre_timeevo_checks(self, raise_error: bool = False) -> bool:
"""Check if a TN ansatz is ready for time-evolution."""
is_check_okay = True
if not self.is_dtype_complex() and raise_error:
raise QTeaLeavesError("Trying time evolution with real state.")
if not self.is_dtype_complex():
is_check_okay = False
logger.warning("Trying to run time evolution with real state.")
return is_check_okay
[docs]
@staticmethod
def projector_attr() -> str | None:
"""Name as string of the projector class to be used with the ansatz.
Returns:
Name usable as `getattr(qtealeaves.mpos, return_value)` to
get the actual effective projector suitable for this class.
If no effective projector class is avaiable, `None` is returned.
"""
return None
[docs]
def set_cache_rho(self) -> None:
"""Cache the reduced density matrices for faster access."""
for ii in range(self.num_sites):
self._cache_rho[ii] = self.get_rho_i(ii)
def _store_attr_for_pickle(self) -> dict[str, Any]:
"""Return dictionary with attributes that cannot be pickled and unset them."""
storage = {
"conv_params": self._convergence_parameters,
"comm": self.comm,
"solver": self._solver,
"cache_rho": self._cache_rho,
}
self._convergence_parameters = None # type: ignore[assignment]
self.comm = None
self._solver = None # type: ignore[assignment]
self.clear_cache_rho()
return storage
def _restore_attr_for_pickle(self, storage: dict[str, Any]) -> None:
"""Restore attributed removed for pickle from dictionary."""
# Reset temporary removed attributes
self._convergence_parameters = storage["conv_params"]
self.comm = storage["comm"]
self._solver = storage["solver"]
self._cache_rho = storage["cache_rho"]
[docs]
def save_pickle(self, filename: str) -> None:
"""
Save class via pickle-module. A state is always saved on a host CPU
with tensor_backend.device set to "cpu". After saving, the state and the
tensor_backend are converted back to the original device.
**Details**
The following attributes have a special treatment and are not present
in the copied object.
* convergence_parameters
* log file (filehandle)
* MPI communicator
"""
# otherwise (TTN) run into exception
# "Need to isometrize to [0, 0], but at {self.iso_center}."
# in ttn_simulator.build_effective_operators
self.iso_towards(self.default_iso_pos)
# Temporary remove objects which cannot be pickled which
# included convergence parameters for lambda function and
# parameterized variables, the log file as file handle and
# the MPI communicator
storage = self._store_attr_for_pickle()
memory_device = self.tensor_backend.memory_device
computational_device = self.tensor_backend.computational_device
# Assume pickle needs to be on host.
# Internally an empty call if everything is already on cpu.
self.convert(None, "cpu")
ext = "pkl" + self.extension
if not filename.endswith(ext):
filename += "." + ext
with open(filename, "wb") as fh:
pickle.dump(self, fh)
if self.iso_center is None:
# Move everything, although this is not a standard case
self.convert(dtype=None, device=memory_device)
elif computational_device != memory_device:
# Mixed device, move back only the iso_center
self.move_pos(pos=self.iso_center, device=computational_device)
else:
# otherwise everything is moved, could be completely on the GPU
self.convert(dtype=None, device=memory_device)
self._restore_attr_for_pickle(storage)
# pylint: disable-next=unused-argument
[docs]
def permute_spo_for_two_tensors(
self,
spo_list: list,
theta: _AbstractQteaTensor,
link_partner: int, # pylint: disable=unused-argument
) -> tuple[list, _AbstractQteaTensor, Sequence[int] | None]:
"""Returns permuted SPO list, permuted theta, and the inverse permutation."""
return spo_list, theta, None
[docs]
def randomize(
self, noise: float | None = None, mpo_to_recalculate_eff_ops: Any = None
) -> None:
"""
Randomize all tensors in the tensor network.
Removes eff_ops and eff_proj.
If an mpo is passed as mpo_to_recalculate_eff_ops,
the effective operators are recalculated with it.
Parameters
----------
noise : float | None
The amount of noise added. None randomizes completely.
reset_eff_ops_mpo : mpo
Used to recalculate effective operator.
"""
for tens in self._iter_tensors():
old_norm = tens.norm_sqrt()
tens.randomize(noise=noise)
new_norm = tens.norm_sqrt()
tens *= old_norm / new_norm
# Also reset the effective operators and projectors.
self.eff_op = None # type: ignore[assignment]
self.eff_proj = []
# reset the iso position
self.iso_center = None
self.isometrize_all()
self.normalize()
# recalculate the effective operators
if mpo_to_recalculate_eff_ops is not None:
mpo_to_recalculate_eff_ops.setup_as_eff_ops(self)
# --------------------------------------------------------------------------
# Unsorted
# --------------------------------------------------------------------------
def _postprocess_singvals_cut(
self, singvals_cut: Any, conv_params: TNConvergenceParameters | None = None
) -> Any:
"""
Postprocess the singular values cut after the application of a
tSVD based on the convergence parameters. Either take the sum of
the squared singvals (if `conv_params.trunc_tracking_mode` is `"C"`) or the maximum
(if `conv_params.trunc_tracking_mode` is `"M"`).
Parameters
----------
singvals_cut : np.ndarray
Singular values cut in a tSVD
conv_params : TNConvergenceParameters, optional
Convergence parameters. If None, the convergence parameters
of the tensor network class is used, by default None
Returns
-------
float
The processed singvals
"""
if conv_params is None:
conv_params = self._convergence_parameters
# If no singvals was cut append a 0 to avoid problems
if len(singvals_cut) == 0:
return 0
if conv_params.trunc_tracking_mode == "M":
singvals_cut = singvals_cut.max()
elif conv_params.trunc_tracking_mode == "C":
if hasattr(singvals_cut, "sum"):
# Works for numpy, cupy, pytorch ...
singvals_cut = (singvals_cut**2).sum()
else:
# Tensorflow handling (example tensor to get attribute)
for tensor in self._iter_tensors():
break
else:
raise QTeaLeavesError(
"Empty tensor network cannot process singvals."
)
func_sum = tensor.get_attr("sum")
singvals_cut = func_sum(singvals_cut**2) # type: ignore[operator]
else:
raise QTeaLeavesError(
f"Unknown trunc_tracking_mode {conv_params.trunc_method}"
)
return singvals_cut
#########################################
########## MEASUREMENT METHODS ##########
#########################################
[docs]
def meas_local(
self, op_list: _AbstractQteaTensor | Sequence[_AbstractQteaTensor]
) -> np.ndarray:
"""
Measure a local observable along all sites of the MPS
Parameters
----------
op_list : list of :class:`_AbstractQteaTensor`
local operator to measure on each site
Return
------
measures : ndarray, shape (num_sites)
Measures of the local operator along each site
"""
if isinstance(op_list, _AbstractQteaTensor):
if len(set(self.local_dim)) != 1:
raise QTeaLeavesError(
"Trying to use single operator for non-unique Hilbert spaces."
)
op_list = [op_list for _ in range(self.num_sites)]
# Move all operators to the computational device.
# Assume self.get_rho(ii) is always on the computational device.
device_list = []
for ii, op in enumerate(op_list):
# The if check because the user could pass a numpy array as the op_list.
# In that case, we assume everything is on the same device.
if isinstance(op, _AbstractQteaTensor):
device_list.append(op.device)
op.convert(device=self.tensor_backend.computational_device)
else:
device_list.append(None)
# Always store on host
measures = np.zeros(self.num_sites)
# This subroutine can be parallelized if the singvals are stored using
# joblib
for ii in range(self.num_sites):
rho_i = self.get_rho_i(ii)
op = op_list[ii]
# Ensure that numpy/cupy operators are on the computational device.
# tensordot() should already check this, but it prints a warning
# "Converting tensor on the fly" that can be avoided by converting
# explicitly the array here.
# Note: op in the op_list is not overwritten
if not isinstance(op, _AbstractQteaTensor):
# NOTE: maybe there is a better place to make this conversion
op = self._tensor_backend.from_elem_array(
op,
dtype=rho_i.dtype,
device=self.tensor_backend.computational_device,
)
if op.ndim != 2:
# Need copy, otherwise input tensor is modified ("passed-by-pointer")
# This checks needs to be the 2nd one as `trace_one_dim_pair` is
# only a method of _AbstractQteaTensor
op = op.copy()
op.trace_one_dim_pair([0, 3])
expectation = rho_i.tensordot(op, ([0, 1], [1, 0]))
measures[ii] = np.real(expectation.get_entry())
# Move the operators back.
for ii, op in enumerate(op_list):
if isinstance(op, _AbstractQteaTensor):
op.convert(device=device_list[ii])
return measures
[docs]
def meas_magic(
self,
renyi_idx: int = 2,
num_samples: int | Sequence[int] = 1000,
return_probabilities: bool = False,
precision: int = 14,
) -> Any:
"""
Measure the magic of the state as defined
in https://arxiv.org/pdf/2303.05536.pdf, with a given number of samples.
To see how the procedure works see meas_unbiased_probabilities.
Parameters
----------
renyi_idx : int, optional
Index of the Rényi entropy you want to measure.
If 1, measure the Von Neumann entropy. Default to 2.
num_samples : int | List[int], optional
Number of random number sampled for the unbiased probability measurement.
If a List is passed, then the algorithm is run over several superiterations
and each entry on num_samples is the number of samples of a superiteration.
Default to 1000.
return_probabilities : bool, optional
If True, return the probability dict. Default to False.
precision: int, optional
Precision for the probability interval computation. Default to 14.
For precision>15 mpmath is used, so a slow-down is expected.
Returns
-------
float
The magic of the state (potentially also returning probabilities
based on the input arguments).
"""
_num_samples: Sequence[int]
if np.isscalar(num_samples):
_num_samples = [num_samples] # type: ignore[assignment,list-item]
else:
_num_samples = num_samples # type: ignore[assignment]
# Sample the state probabilities
opes_bound_probs: dict[str, tuple[float, float]] = {}
opes_probs = np.array([])
for num_samp in _num_samples:
# Sample the numbers
samples = np.random.rand(int(num_samp))
# Do not perform the computation for the already sampled numbers
probs, new_samples = _check_samples_in_bound_probs(
samples, opes_bound_probs
)
opes_probs = np.hstack((opes_probs, probs))
# Perform the sampling for the unseen samples
bound_probs = self.meas_unbiased_probabilities(
new_samples, mode="magic", precision=precision # type: ignore[arg-type]
)
opes_bound_probs.update(bound_probs)
# Add the sampled probability to the numpy array
probs, _ = _check_samples_in_bound_probs(new_samples, bound_probs)
opes_probs = np.hstack((opes_probs, probs))
# Compute the magic with the samples
magic = -self.num_sites * np.log(2)
# Pass from probability intervals to probability values
if renyi_idx > 1:
magic += np.log((opes_probs ** (renyi_idx - 1)).mean()) / (1 - renyi_idx)
else:
magic += -(np.log(opes_probs)).mean()
if return_probabilities:
return magic, opes_bound_probs
return magic
[docs]
def meas_projective(
self,
nmeas: int = 1024,
qiskit_convention: bool = False,
seed: int | None = None,
unitary_setup: UnitarySetupProjMeas | None = None,
do_return_probabilities: bool = False,
) -> Any:
"""
Perform projective measurements along the computational basis state
Parameters
----------
nmeas : int, optional
Number of projective measurements. Default to 1024.
qiskit_convention : bool, optional
If the sites during the measure are represented such that
|201> has site 0 with value one (True, mimicks bits ordering) or
with value 2 (False usually used in theoretical computations).
Default to False.
seed : int, optional
If provided it sets the numpy seed for the random number generation.
Default to None
unitary_setup : `None` or :class:`UnitarySetupProjMeas`, optional
If `None`, no local unitaries are applied during the projective
measurements. Otherwise, the unitary_setup provides local
unitaries to be applied before the projective measurement on
each site.
Default to `None`.
do_return_probabilities : bool, optional
If `False`, only the measurements are returned. If `True`,
two arguments are returned where the first are the
measurements and the second are their probabilities.
Default to `False`
Return
------
measures : dict[str, int]
Dictionary where the keys are the states while the values the number of
occurrences. The keys are separated by a comma if local_dim > 9.
(Potentially also 2nd return value with probabilities based on
input arguments.)
"""
if nmeas == 0:
return {}
if seed is not None and isinstance(seed, int):
np.random.seed(seed)
# Put in canonic form
self.site_canonize(0)
measures: Any = []
probabilities: Any = []
# Loop over number of measurements
for _ in range(nmeas):
state = np.zeros(self.num_sites, dtype=int)
temp_tens = deepcopy(self.get_tensor_of_site(0))
# Loop over tensors
cumulative_prob = 1.0
for ii in range(self.num_sites):
target_prob = np.random.rand()
measured_idx, temp_tens, prob_ii = self._get_child_prob(
temp_tens, ii, target_prob, unitary_setup, state, qiskit_convention
)
cumulative_prob *= prob_ii
# Save the measured state either with qiskit or
# theoretical convention
if qiskit_convention:
state[self.num_sites - 1 - ii] = measured_idx
else:
state[ii] = measured_idx
max_local_dim = np.max(self.local_dim)
if max_local_dim < 10:
measure_ii = np.array2string(
state, separator="", max_line_width=2 * self.num_sites
)[1:-1]
else:
measure_ii = np.array2string(
state, separator=",", max_line_width=2 * self.num_sites
)[1:-1]
probabilities.append(cumulative_prob)
# Come back to CPU if on GPU for list in measures (not needed since it is string)
measures.append(measure_ii)
measures = np.array(measures)
states, counts = np.unique(measures, return_counts=True)
probabilities = dict(zip(measures, probabilities))
measures = dict(zip(states, counts))
if do_return_probabilities:
return measures, probabilities
return measures
[docs]
def meas_unbiased_probabilities(
self,
num_samples: int | np.ndarray,
qiskit_convention: bool = False,
bound_probabilities: dict | None = None,
do_return_samples: bool = False,
precision: int = 15,
mode: str = "projection_z",
) -> Any:
"""
Compute the probabilities of measuring a given state if its probability
falls into the explored in num_samples values.
The functions divide the probability space in small rectangles, draw
num_samples random numbers and then follow the path until the end.
The number of states in output is between 1 and num_samples.
For a different way of computing the probability tree see the
function :py:func:`meas_even_probabilities` or
:py:func:`meas_greedy_probabilities`
Parameters
----------
num_samples : int | np.ndarray
Maximum number of states that could be measured.
(Alternatively, it can be a numpy-array passing directly the samples.)
qiskit_convention : bool, optional
If the sites during the measure are represented such that
|201> has site 0 with value one (True, mimics bits ordering) or
with value 2 (False usually used in theoretical computations).
Default to False.
probability_bounds : dict, optional
Bounds on the probability computed previously with this function,
i.e. if a uniform random number has value
`left_bound< value< right_bound` then you measure the state.
The dict structure is `{'state' : (left_bound, right_bound)}`.
If provided, it speed up the computations since the function will
skip values in the intervals already known. By default None.
do_return_samples : bool, optional
Enables, if `True`, to return the random number used for sampling
in addition to the `bound_probabilities`. If `False`, only the
`bound_probabilities` are returned.
Default to `False`
precision : int, optional
Decimal place precision for the mpmath package. It is only
used inside the function, and setted back to the original after
the computations. Default to 15.
If it is 15 or smaller, it just uses numpy.
mode : str, optional
Mode of the unbiased sampling. Default is "projection_z", equivalent
to sampling the basis states on the Z direction.
Possibilities:
- "projection_z"
- "magic"
Return
------
bound_probabilities : dict[str, tuple[float, float]]
Dictionary analogous to the `probability_bounds` parameter.
The keys are separated by a comma if local_dim > 9.
samples : np.ndarray
Random numbers from sampling, only returned if activated
by optional argument.
"""
# Use mpmath if precision is larger than 15
has_mp = precision > 15
# Handle internal cache; keep if possible: if bound probabilities
# are passed, it must be the same state and we can keep the
# cache.
do_clear_cache = bound_probabilities is None
# Always set gauge to site=0; even if cache is not cleared,
# the actual isometry center did not move
self.site_canonize(0)
# Normalize for quantum trajectories
old_norm = self.norm()
self.normalize()
get_children_prob: Callable
if mode == "projection_z":
local_dim = self.local_dim
get_children_prob = self._get_children_prob
initial_tensor = self.get_tensor_of_site(0)
elif mode == "magic":
local_dim = [
4,
] * self.num_sites
get_children_prob = self._get_children_magic
tmp_t = self.get_tensor_of_site(0)
initial_tensor = tmp_t.eye_like(tmp_t.links[0])
else:
raise ValueError(f"mode {mode} not available for unbiased sampling")
# ==== Initialize variables ====
# all_probs is a structure to keep track of already-visited nodes in
# the probability tree. The i-th dictionary of the list correspond to
# a state measured up to the i-th site. Each dictionary has the states
# as keys and as value the list [state_prob, state_tens]
all_probs: list[dict[str, tuple[float, _AbstractQteaTensor]]] = [
{} for _ in range(self.num_sites)
]
# Initialize precision
old_precision = mp.mp.dps
mp.mp.dps = precision
# This precision is pretty much independent of the numpy-datatype as
# it comes from multiplication. However, it is important when we sum
# for the intervals
mpf_wrapper, almost_equal = _mp_precision_check(precision)
# Sample uniformly in 0,1 the samples, taking into account the already
# sampled regions given by bound_probabilities
if np.isscalar(num_samples):
samples, bound_probabilities = _resample_for_unbiased_prob(
num_samples, bound_probabilities # type: ignore[arg-type]
)
else:
samples = num_samples # type: ignore[assignment]
bound_probabilities = (
{} if bound_probabilities is None else bound_probabilities
)
# ==== Routine ====
cum_prob = None
cum_probs = None
zero_probs_counter = 0
for idx, sample in enumerate(samples):
# If the sample is in an already sampled area continue
if idx > 0:
if cum_prob is None:
logger.warning(
"Error information QTeaOPESError at idx=%d with samples %s.",
idx,
str(samples[: idx + 1]),
)
raise QTeaOPESError("cum_prob not defined.")
if left_prob_bound < sample < left_prob_bound + cum_prob:
continue
# Set the current state to no state
curr_state = ""
# Set the current tensor to be measured to the first one
tensor = deepcopy(initial_tensor)
# Initialize the probability to 1
curr_prob = 1.0
# Initialize left bound of the probability interval. Arbitrary precision
left_prob_bound = mpf_wrapper(0.0)
# Loop over the sites
for site_idx in range(0, self.num_sites):
# Initialize new possible states, adding the digits of the local basis to
# the state measured up to now
if site_idx > 0:
states = [
curr_state + f",{ii}" for ii in range(local_dim[site_idx])
]
else:
states = [curr_state + f"{ii}" for ii in range(local_dim[site_idx])]
# Compute the children if we didn't already follow the branch
if not np.all([ss in all_probs[site_idx] for ss in states]):
# Remove useless information after the first cycle. This operation is
# reasonable since the samples are ascending, i.e. if we switch path
# we will never follow again the old paths.
if idx > 0:
all_probs[site_idx:] = [
{} for _ in range(len(all_probs[site_idx:]))
]
# Compute new probabilities
probs, tensor_list = get_children_prob(
tensor, site_idx, curr_state, do_clear_cache
)
# Clear cache only upon first iteration
do_clear_cache = False
# get probs to arbitrary precision
# if precision > 15:
# probs = mp.matrix(probs)
# Multiply by the probability of being in the parent state
# Multiplication is safe from the precision point of view
probs = curr_prob * probs # type: ignore[operator]
# Update probability tracker for next branch, avoiding
# useless additional computations
for ss, prob, tens in zip(states, probs, tensor_list):
all_probs[site_idx][ss] = [prob, tens] # type: ignore[assignment]
# Retrieve values if already went down the path
else:
probs = []
tensor_list = []
for prob, tens in all_probs[site_idx].values():
probs.append(prob)
tensor_list.append(tens)
# Select the next branch if we didn't reach the leaves
# according to the random number sampled
if site_idx < self.num_sites - 1:
cum_probs = np.cumsum(probs) # Compute cumulative
# Select first index where the sample is lower than the cumulative
try:
meas_idx = [
sample < float(cum_prob) for cum_prob in cum_probs
].index(True)
except ValueError as exc:
# If the sample is not smaller than any of the cum_probs,
# it must be the last sample (probably within machine precision)
eps = abs(sample - cum_probs[-1])
if eps > 10 * tensor_list[0].dtype_eps:
info_str = " with eps, dtype_eps, sample, cum_probs: "
info_values = ", ".join(
[
str(eps),
str(tensor_list[0].dtype_eps),
str(sample),
str(cum_probs),
]
)
# pylint: disable-next=logging-not-lazy
logger.warning(
"Error information QTeaOPESError"
+ info_str
+ info_values
)
raise QTeaOPESError(
"IndexError in OPES sampling not within machine precision."
) from exc
if eps > 0:
# Testing against exact zero on purpose, to warn if we are close
# to machine precision here
logger_warning(
"Sampling with OPES close to machine precision."
)
meas_idx = len(cum_probs) - 1
# Update run-time values based on measured index
tensor = deepcopy(tensor_list[meas_idx])
curr_state = states[meas_idx]
curr_prob = probs[meas_idx]
# Update value of the sample based on the followed path
cum_probs_idx = 0.0 if meas_idx == 0 else cum_probs[meas_idx - 1]
sample -= float(cum_probs_idx)
# Update left-boundary value with probability remaining on the left
# of the measured index
if meas_idx > 0:
if has_mp:
left_prob_bound += float(cum_probs[meas_idx - 1])
else:
left_prob_bound += cum_probs[meas_idx - 1]
# Save values if we reached the leaves
else:
cum_prob = mpf_wrapper(0.0)
for ss, prob in zip(states, probs):
if prob < 0:
msg = (
"Probably you need an higher precision."
+ "Measured negative probability of %s, set to 0."
)
logger.warning(msg, prob)
prob = 0
tmp = float(prob) if has_mp else prob
if not almost_equal((tmp, 0)):
bound_probabilities[ss] = (
left_prob_bound + cum_prob,
left_prob_bound + cum_prob + tmp,
)
else:
zero_probs_counter += 1
cum_prob += tmp
# For TTN with caching strategy (empty interface implemented
# also for any abstract tensor network)
all_probs = self.clear_cache(all_probs=all_probs, current_key=curr_state)
if zero_probs_counter > 0:
msg = (
f"The probability of {zero_probs_counter} samples was "
+ "smaller than the precision. No bounds will be associated."
)
logger.warning(msg)
# Rewrite with qiskit convention and remove commas if needed
bound_probabilities = postprocess_statedict(
bound_probabilities,
local_dim=self.local_dim[0], # Danger: assuming equal local dimension
qiskit_convention=qiskit_convention,
)
self.scale(old_norm)
mp.mp.dps = old_precision
if do_return_samples:
return bound_probabilities, samples
return bound_probabilities
[docs]
def sample_n_unique_states(
self,
num_unique: int,
exit_coverage: float = 0.9999,
ignore_opes_errors: bool = False,
**kwargs: Any,
) -> dict[str, tuple[float, float]]:
"""
Sample a given number of unique target states. This is the target number of
states, the actual number of states can differ.
**Arguments**
num_unique : int
Number of unique states to be sampled. This is a target number;
the actual number of sampled states might differ in the end.
exit_coverage : float, optional
Coverage at which sampling can stop even without reaching the
target number of unique states.
Default to 0.9999
ignore_opes_errors : bool, optional
Allows to ignore `QTeaOpesError` to return at least the samples
that have been sampled so far.
Default to False
kwargs : keyword arguments
Passed through to unbiased sampling, e.g., `qiskit_convention`,
`precision`, and `mode`. `bound_probabilities` is accepted if
called from MPI sampling (identified by left-right keys).
**Details**
The target number of unique states will not be reached if the probability of
the sampled states reaches the `exit_coverage`.
The target number of unique states will be overfulfilled in most other cases
as the last superiteration might generate slightly more states than needed.
"""
sampling_result = None
for key in kwargs:
if key not in [
"qiskit_convention",
"precision",
"mode",
"bound_probabilities",
]:
raise ValueError(f"Keyword argument `{key}` not allowed.")
# We want to reuse this function for sampling from MPI, but not necessarily
# other calls should be able to pass kwargs bound_probabilities. For MPI
# calls, we know either left or right must be set as key.
if key == "bound_probabilities":
sampling_result = kwargs["bound_probabilities"]
if ("left" not in sampling_result) and ("right" not in sampling_result):
raise QTeaLeavesError(
"Only MPI sampling allowed for bound_probailities."
)
if sampling_result is not None:
kwargs = deepcopy(kwargs)
del kwargs["bound_probabilities"]
# initial data set
try:
sampling_result = self.meas_unbiased_probabilities(
num_samples=num_unique, bound_probabilities=sampling_result, **kwargs
)
except QTeaOPESError:
if not ignore_opes_errors:
# Trigger exception anyway
raise
# User requests to ignore them, print to logger, and return empty dict
logger.warning("Initial OPES sampling failed.")
return {}
covered_probability = sum(
interval[1] - interval[0] for interval in sampling_result.values()
)
while (len(sampling_result) < num_unique) and (
covered_probability < exit_coverage
):
delta = num_unique - len(sampling_result)
num_samples = max(10, 2 * delta)
try:
sampling_result = self.meas_unbiased_probabilities(
num_samples=num_samples,
bound_probabilities=sampling_result,
**kwargs,
)
except QTeaOPESError:
if not ignore_opes_errors:
# Trigger exception anyway
raise
# User requests to ignore them, print to logger, and return what
# we have so far
logger.warning(
"OPES sampling failed in loop; returning most recent results."
)
return sampling_result # type: ignore[no-any-return]
covered_probability = sum(
interval[1] - interval[0] for interval in sampling_result.values()
)
return sampling_result # type: ignore[no-any-return]
[docs]
@staticmethod
def mpi_sample_n_unique_states(
state: "_AbstractTN",
num_unique: int,
comm: Any,
tensor_backend: TensorBackend,
cache_size: int | None = None,
cache_clearing_strategy: str | None = None,
filter_func: Callable | None = None,
mpi_final_op: str | None = None,
root: int = 0,
**kwargs: Any,
) -> dict[str, tuple[float, float]]:
"""
Sample a target number of unique states. This is the target number of
states, the actual number of states can differ.
**Arguments**
state : instance of :class:`_AbstractTN`
State to be sampled from; needs to exist only on root and will
be broadcasted via MPI to all other threads.
num_unique : int
Number of unique states to be sampled. This is a target number;
the actual number of sampled states might differ in the end.
comm : MPI-communicator from mpi4py
Communicator of threads to be used for sampling.
tensor_backend : :class:`TensorBackend`
Tensor backend used for state, which will be needed to build
up the state during bcast.
cache_size : int, optional
Cache size limit for the sampling (bytes) per MPI-thread.
Default to 1,000,000,000 (1GB).
cache_clearing_strategy : str, optional
The strategy to be used to clear the cache
within the sampling routine for TTN simulation.
The possibilities are "num_qubits" or "state".
Default to "num_qubits".
filter_func : callable or `None`, optional
Takes state string and probability boundaries as the two
arguments in this order and returns `True` / `False.
Filtering can reduce the workload before MPI-communication
of states.
Default to `None` (no filtering)
mpi_final_op : str or `None`
Either `None` or `mpi_gather` (root will contain all states)
or `mpi_all_gather` (all threads will contain all states)
Default to `None`.
root : int, optional
Thread-index of the MPI-thread holding the TN ansatz.
Default to 0.
ansatz : _AbstractTN (inside kwargs)
Ansatz is needed to broadcast the TN state to the other processes.
kwargs : keyword arguments
Passed through to unbiased sampling, e.g., `qiskit_convention`,
`precision`, and `mode`.
"""
for key in kwargs:
if key not in ["qiskit_convention", "precision", "mode", "ansatz"]:
raise ValueError(f"Keyword argument `{key}` not allowed.")
if mpi_final_op not in [None, "mpi_gather", "mpi_all_gather"]:
raise ValueError(f"Argument for mpi_final_op {mpi_final_op} not allowed.")
if MPI is None:
raise ImportError(
"Trying to call sampling with MPI, but MPI was not imported."
)
# We need a deepcopy of the keywork arguments here as we delete the key
# "ansatz". Deleting this key "ansatz" is necessary as
# ``sample_n_unique_states`` will check for superfluous keyword arguments.
# This choice is a bit peculiar, but allows to hide the keyword argument
# "ansatz" filled by the TN implementation (and not provided by the user).
kwargs = deepcopy(kwargs)
ansatz = kwargs["ansatz"]
exit_coverage = kwargs.get("exit_coverage", 0.9999)
del kwargs["ansatz"]
size = comm.Get_size()
rank = comm.Get_rank()
psi = ansatz.mpi_bcast(state, comm, tensor_backend, root=root)
if cache_size is not None:
psi.set_cache_limit_sampling(cache_size)
if cache_clearing_strategy is not None:
psi.set_cache_clearing_strategy_sampling(strategy=cache_clearing_strategy)
ranges = np.linspace(0, 1, size + 1)
# We divide the workload of sampling by dividing the interval into size
# subintervals evenly distributed. We use the sampling feature of defining
# an already sampled interval to block for each MPI-process the intervals
# of the other MPI processes. To identify the "special" intervals, they have
# keys "left" and "right".
sampling_result = {}
if rank > 0:
sampling_result["left"] = (0.0, ranges[rank])
if rank + 1 < size:
sampling_result["right"] = (ranges[rank + 1], 1.0)
total_num_unique_states = 0
total_covered_probability = 0
total_num_active = size
ii_active = 1
while (
total_num_unique_states < num_unique
and total_covered_probability < exit_coverage
and total_num_active > 0
):
if ii_active == 1:
num_unique_rank = int(
np.ceil((num_unique - total_num_unique_states) / total_num_active)
)
sampling_result = psi.sample_n_unique_states(
num_unique_rank, bound_probabilities=sampling_result, **kwargs
)
# Ignore left/right boundary, account for double counting states
# at boundary
ii_num_unique_states = len(sampling_result) - 3
ii_covered_probability = sum(
interval[1] - interval[0] for interval in sampling_result.values()
)
if ii_covered_probability >= exit_coverage:
ii_active = 0
ii_covered_probability -= sampling_result.get("left", [0.0, 0.0])[1]
ii_covered_probability -= 1.0 - sampling_result.get("right", [1.0, 1.0])[0]
# pylint: disable=c-extension-no-member
# Gather results
total_num_unique_states = comm.allreduce(ii_num_unique_states, op=MPI.SUM)
total_covered_probability = comm.allreduce(
ii_covered_probability, op=MPI.SUM
)
total_num_active = comm.allreduce(ii_active, op=MPI.SUM)
# pylint: enable=c-extension-no-member
if rank > 0:
del sampling_result["left"]
if rank + 1 < size:
del sampling_result["right"]
if filter_func is not None:
# Apply filter passed by user
keys_to_delete = []
for key, value in sampling_result.items():
if filter_func(key, value):
keys_to_delete.append(key)
# If the garbage collector kicks in during the loop ...
sampling_result[key] = None # type: ignore[assignment]
for key in keys_to_delete:
del sampling_result[key]
if mpi_final_op in ["mpi_gather", "mpi_all_gather"]:
# Collect everything on root
if comm.Get_rank() == root:
for ii in range(comm.Get_size()):
if ii == root:
continue
dict_ii = comm.recv(source=ii)
sampling_result.update(dict_ii)
else:
comm.send(sampling_result, root)
if mpi_final_op == "mpi_all_gather":
sampling_result = comm.bcast(sampling_result, root=root)
return sampling_result
def _get_children_prob(
self,
tensor: _AbstractQteaTensor,
site_idx: int,
curr_state: str,
do_clear_cache: bool,
) -> tuple[list[float], list[_AbstractQteaTensor]]:
"""
Compute the probability and the relative tensor state of all the
children of site `site_idx` in the probability tree
Parameters
----------
tensor : _AbstractQteaTensor
Parent tensor, with respect to which we compute the children
site_idx : int
Index of the parent tensor
curr_state : str
Comma-separated string tracking the current state of all
sites already done with their projective measurements.
do_clear_cache : bool
Flag if the cache should be cleared. Only read for first
site when a new meausrement begins.
Returns
-------
probabilities : list of floats
Probabilities of the children
tensor_list : list[_AbstractQteaTensor]
Child tensors, already contracted with the next site
if not last site.
"""
# Cannot implement it here, it highly depends on the TN
# geometry
raise NotImplementedError("This function has to be overwritten.")
def _get_children_magic(
self,
transfer_matrix: _AbstractQteaTensor,
site_idx: int,
curr_state: str,
do_clear_cache: bool,
) -> tuple[list[float], list[_AbstractQteaTensor]]:
"""
Compute the magic probability and the relative tensor state of all the
children of site `site_idx` in the probability tree, conditioned on
the transfer matrix
Parameters
----------
transfer_matrix : np.ndarray
Parent transfer matrix, with respect to which we compute the children
site_idx : int
Index of the parent tensor
curr_state : str
Comma-separated string tracking the current state of all
sites already done with their projective measurements.
do_clear_cache : bool
Flag if the cache should be cleared. Only read for first
site when a new measurement begins.
Returns
-------
probabilities : list of floats
Probabilities of the children
tensor_list : list of ndarray
Child tensors, already contracted with the next site
if not last site.
"""
# Cannot implement it here, it highly depends on the TN
# geometry
raise NotImplementedError("This function has to be overwritten.")
# pylint: disable=unused-argument
[docs]
def clear_cache(
self,
num_qubits_keep: int | None = None,
all_probs: list[dict[str, tuple[float, _AbstractQteaTensor]]] | None = None,
current_key: str | None = None,
) -> list[dict[str, tuple[float, _AbstractQteaTensor]]]:
"""
Clear cache until cache size is below cache limit again. This function
is empty and works for any tensor network without cache. If the inheriting
tensor network has a cache, it has to be overwritten.
**Arguments**
all_probs : list of dicts
Contains already calculated branches of probability tree. Each
TTN has to decide if they need to be cleaned up as well.
"""
# pylint: enable=unused-argument
# To-do: handle linter warning instead
# if self is None:
# # Never true, but prevent linter warning (needs self when
# # cache is actually defined) and unused arguments
# print("Args", num_qubits_keep, all_probs, current_key)
# return None
default: list[dict[str, tuple[float, _AbstractQteaTensor]]] = []
return default_optional(all_probs, default)
def _get_child_prob(
self,
tensor: _AbstractQteaTensor,
site_idx: int,
target_prob: float,
unitary_setup: UnitarySetupProjMeas | None,
curr_state: np.ndarray,
qiskit_convention: bool,
) -> tuple[int, _AbstractQteaTensor, float]:
"""
Compute which child has to be selected for a given target probability
and return the index and the tensor of the next site to be measured.
Parameters
----------
tensor : _AbstractQteaTensor
Tensor representing the site to be measured with a projective
measurement.
site_idx : int
Index of the site to be measured and index of `tensor`.
target_prob : scalar
Scalar drawn from U(0, 1) and deciding on the which projective
measurement outcome will be picked. The decision is based on
the site `site_idx` only.
unitary_setup : instance of :class:`UnitarySetupProjMeas` or `None`
If `None`, no local unitaries are applied. Otherwise,
unitary for local transformations are provided and applied
to the local sites.
curr_state : np.ndarray of rank-1 and type int
Record of current projective measurements done so far.
qiskit_convention : bool
Qiskit convention, i.e., ``True`` stores the projective
measurement in reverse order, i.e., the first qubit is stored
in ``curr_state[-1]``. Passing ``False`` means indices are
equal and not reversed.
Returns
-------
measured_idx : index of the selected state
tensor : projected and normalized tensor
prob_jj : probability of the measured state
"""
# Cannot implement it here, it highly depends on the TN
# geometry
raise NotImplementedError("This function has to be overwritten.")
[docs]
def compute_energy(self, pos: TnPos | None = None) -> float:
"""
Compute the energy of the TTN through the effective operator
at position pos.
Parameters
----------
pos : list, optional
If a position is provided, the isometry is first shifted to
that position and then the energy is computed. If None,
the current isometry center is computed, by default None
Returns
-------
float
Energy of the TTN
"""
if self.eff_op is None:
logger.warning(
"Tried to compute energy with no effective operators. Returning nan."
)
return np.nan
# Move the iso center if needed
if pos is not None:
self.iso_towards(pos)
else:
pos = self.iso_center
_pos = default_optional(pos, self.iso_pos)
self.move_pos(_pos, device=self._tensor_backend.computational_device)
# Retrieve the tensor at the isometry
tens = self[self.iso_pos]
# Get the list of operators to contract
pos_links = self.get_pos_links(_pos)
# Contract the tensor with the effective operators around
vec = self.eff_op.contract_tensor_lists(tens, _pos, pos_links)
# force a standard python float
energy = float(np.real(tens.get_of(tens.dot(vec))))
# Update internal storage
self._prev_energy = energy
# Force to return standard python float
return float(np.real(np.array(tens.get_of(energy))))
#########################################################################
######################### Optimization methods ##########################
#########################################################################
[docs]
def optimize_single_tensor_for_fit(self, pos: TnPos) -> _AbstractQteaTensor:
"""
Optimize the tensor at position `pos` based on the
effective operators loaded in the TTN
Parameters
----------
pos : list of ints or int
Position of the tensor in the TN
Returns
-------
tensor:
The contraction being the optimal local tensor
given the current environment and bond dimension.
"""
# Isometrise towards the desired tensor
self.iso_towards(pos)
pos_links = self.get_pos_links(pos)
phi = self.eff_op.phi_for_fit # type: ignore[attr-defined]
tensor = self.eff_op.contract_tensor_lists(phi[pos], pos, pos_links)
return tensor
[docs]
def optimize_single_tensor(self, pos: TnPos) -> float:
"""
Optimize the tensor at position `pos` based on the
effective operators loaded in the TTN
Parameters
----------
pos : list of ints or int
Position of the tensor in the TN
Returns
-------
float
Computed energy
"""
tic = tictoc()
# Isometrise towards the desired tensor
self.iso_towards(pos)
pos_links = self.get_pos_links(pos)
dim_problem = np.prod(self[pos].shape)
if dim_problem == 1:
# Nothing to do - ARPACK will fail
eigenvalue = self.compute_energy()
return eigenvalue
# If we have effective projectors, get the function to project them out.
# This is passed to the eig_api() as kwargs.
# The projectors act directly on the Lanczos vectors.
if len(self.eff_proj) > 0:
project_out_projectors = self.get_projector_function(
pos=pos, pos_links=pos_links
)
# orthogonalize the initial tensor and normalize
self[pos] = project_out_projectors(self[pos])
self[pos].normalize()
else:
project_out_projectors = None
# If noise is added, randomize the tensor
if self.convergence_parameters.noise != 0:
self[pos].randomize(noise=self.convergence_parameters.noise)
self[pos].normalize()
# solve with solver
eigenvalues, tensor = self[pos].eig_api(
self.eff_op.contract_tensor_lists,
self[pos].shape,
self._convergence_parameters,
args_func=(pos, pos_links),
kwargs_func={"pre_return_hook": project_out_projectors},
)
logger.info(
"Optimized tensor %-8s max chi: %-4d energy: %-19.14g time: %.1f",
pos,
max(tensor.shape),
eigenvalues[0],
tictoc() - tic,
)
self[pos] = tensor
# Update internal storage
self._prev_energy = eigenvalues[0]
return np.real(tensor.get_of(eigenvalues[0])) # type: ignore[no-any-return]
[docs]
def optimize_link_expansion(
self,
pos: TnPos,
pos_partner: TnPos,
link_self: int,
link_partner: int,
) -> float:
"""
Optimize a tensor pair via a space-link expansion.
**Arguments**
pos : int, tuple of ints (depending on TN)
position of tensor to be optimized
pos_partner : int, tuple of ints (depending on TN)
position of partner tensor, where link between
tensor and partner tensor will be randomly
expanded.
link_self : int
Link connecting to partner tensor (in tensor at `pos`)
link_partner : int
Link connecting to optimized tensors (in partner tensor).
requires_singvals : bool
Flag if calling methods upstream need singular values, i.e.,
want to replace QR with SVDs
Returns
-------
float
Computed energy
"""
if isinstance(pos, list):
raise QTeaLeavesError("Passing list as position")
if isinstance(pos_partner, list):
raise QTeaLeavesError("Passing list as partner position")
# Here it would be beneficial to implement the skip_exact_rgtensors, but
# we would need to add a data structure to flag which tensors are converged.
# After that, when moving the iso with svd we can easily understand if there
# is truncation
# _ = self._convergence_parameters.sim_params["skip_exact_rgtensors"]
self.iso_towards(pos_partner)
tensor = self[pos].copy()
tensor_partner = self[pos_partner].copy()
# If energy goes up and we want to reinstall original tensor
expansion_drop = self._convergence_parameters.sim_params["expansion_drop"]
if not expansion_drop == "f":
if self._prev_energy is None:
self._prev_energy = self.compute_energy()
prev_tensor = tensor.copy()
prev_tensor_partner = tensor_partner.copy()
link_dim = tensor.shape[link_self]
max_dim = link_dim + self._convergence_parameters.sim_params["min_expansion"]
links_copy_self = list(tensor.links).copy()
links_copy_self[link_self] = None
links_copy_self = tensor.set_missing_link(
links_copy_self, max_dim, are_links_outgoing=tensor.are_links_outgoing
)
links_copy_other = list(tensor_partner.links).copy()
links_copy_other[link_partner] = None
links_copy_other = tensor_partner.set_missing_link(
links_copy_other,
max_dim,
are_links_outgoing=tensor_partner.are_links_outgoing,
)
new_dim = min(
int(links_copy_self[link_self]),
int(links_copy_other[link_partner]),
max_dim,
)
if new_dim <= link_dim:
# cannot expand anything here
logger.debug("Saturated expansion, optimizing individually.")
# Have to do isostep and norm as well
self.iso_towards(pos, move_to_memory_device=False)
energy = self.optimize_single_tensor(pos)
return energy
self[pos], self[pos_partner] = tensor.expand_link_tensorpair(
tensor_partner,
link_self,
link_partner,
new_dim,
)
# Ideal implementation would be ...
# First iso_towards to pos_partner, as well as pos_partner
# after decision.
# Update of eff operators (internal iso_towards in space link
# expansion cannot truncate or update singvals)
# By move_to_memory_device=False we also keep the other
# tensor in device memory
self.iso_towards(pos, move_to_memory_device=False)
# Random entries destroyed normalization, to get valid
# eigenvalue in these intermediate steps, need to renormalize
self.normalize()
# Expansion cycles
# ----------------
# Same here, use QR as otherwise truncation kicks in potentially]
# undoing the expansion. Final iso_towards with QR or SVD follows
# after expansion cycles.
requires_singvals = self._requires_singvals
self._requires_singvals = False
for _ in range(self._convergence_parameters.sim_params["expansion_cycles"]):
self.iso_towards(pos, move_to_memory_device=False)
energy = self.optimize_single_tensor(pos)
self.iso_towards(pos_partner, move_to_memory_device=False)
energy = self.optimize_single_tensor(pos_partner)
# Reset value
self._requires_singvals = requires_singvals
# Decision on accepting update
# ----------------------------
if expansion_drop in ["f"] or energy <= self._prev_energy:
# We improved in energy or accept higher energies to escape local
# minima in this sweep
self.iso_towards(
pos,
keep_singvals=requires_singvals,
trunc=True,
conv_params=self._convergence_parameters,
)
# should we compute this? yes, the difference can be large ...
self.normalize()
energy = self.compute_energy(pos)
elif expansion_drop in ["o"]:
# Energy did not improve, but optimize locally
# pylint: disable-next=possibly-used-before-assignment
self[pos] = prev_tensor
# pylint: disable-next=possibly-used-before-assignment
self[pos_partner] = prev_tensor_partner
# Iso center before copy was at pos_partner
self.iso_center = pos_partner
self.iso_towards(
pos,
trunc=requires_singvals,
keep_singvals=requires_singvals,
)
energy = self.optimize_single_tensor(pos)
elif expansion_drop in ["d"]:
# Energy did not improve, reinstall previous tensors, discard
# step and do not optimize even locally
# pylint: disable-next=possibly-used-before-assignment
self[pos] = prev_tensor
# pylint: disable-next=possibly-used-before-assignment
self[pos_partner] = prev_tensor_partner
# Iso center before copy was at pos_partner
self.iso_center = pos_partner
self.iso_towards(
pos,
trunc=requires_singvals,
keep_singvals=requires_singvals,
)
else:
# There should be no other case
raise QTeaLeavesError(f"Unknown {expansion_drop=} scenario.")
# iso_towards does not normalize (maybe it does inside the truncate methods ...)
# but not normalization should be necessary if eigensolver returns
# basisvectors which should be normalized by default
return energy
[docs]
def optimize_two_tensors(
self, pos: TnPos, pos_partner: TnPos, link_self: int, link_partner: int
) -> float:
"""
Local ground-state search on two tensors simultaneously.
Parameters
----------
pos : Tuple[int] | int
Position in the TN of the tensor to time-evolve
pos_partner : int, tuple of ints (depending on TN)
position of partner tensor, where link between
tensor and partner tensor will be randomly
expandend.
link_self : int
Link connecting to partner tensor (in tensor at `pos`)
link_partner : int
Link connecting to optimized tensors (in partner tensor).
Returns
-------
float
Computed energy
"""
# Isometrize towards the desired tensor.
# We do this additional step to ensure they are both
# on the computational device
self.iso_towards(pos_partner)
self.iso_towards(pos, move_to_memory_device=False)
tens_a = self[pos]
tens_b = self[pos_partner]
is_a_outgoing = tens_a.are_links_outgoing[link_self]
theta = tens_a.tensordot(tens_b, ([link_self], [link_partner]))
# Build custom eff ops list
custom_ops: list[Any] = []
for ii, elem in enumerate(self.get_pos_links(pos)):
if ii == link_self:
continue
if elem is None:
custom_ops.append(None)
else:
custom_ops.append(self.eff_op[(elem, pos)])
for ii, elem in enumerate(self.get_pos_links(pos_partner)):
if ii == link_partner:
continue
if elem is None:
custom_ops.append(None)
else:
custom_ops.append(self.eff_op[(elem, pos_partner)])
custom_ops, theta, inv_perm = self.permute_spo_for_two_tensors(
custom_ops, theta, link_partner
)
if len(self.eff_proj) > 0:
raise NotImplementedError(
"""The effective projectors are not
implemented for two tensor optimization."""
)
eigenvalues, theta = theta.eig_api(
self.eff_op.contract_tensor_lists,
theta.shape,
self._convergence_parameters,
args_func=(None, None),
kwargs_func={"custom_ops": custom_ops},
)
if inv_perm is not None:
theta.transpose_update(inv_perm)
links_a = list(range(tens_a.ndim - 1))
links_b = list(range(tens_a.ndim - 1, theta.ndim))
tens_a, tens_b, svals, svals_cut = theta.split_svd(
links_a,
links_b,
contract_singvals="R",
conv_params=self._convergence_parameters,
is_link_outgoing_left=is_a_outgoing,
)
svals_cut = self._postprocess_singvals_cut(
singvals_cut=svals_cut, conv_params=self._convergence_parameters
)
svals_cut = theta.get_of(svals_cut)
self.set_singvals_on_link(pos, pos_partner, svals)
nn = tens_a.ndim
perm_a = list(range(link_self)) + [nn - 1] + list(range(link_self, nn - 1))
self[pos] = tens_a.transpose(perm_a)
nn = tens_b.ndim
perm_b = (
list(range(1, link_partner + 1)) + [0] + list(range(link_partner + 1, nn))
)
self[pos_partner] = tens_b.transpose(perm_b)
self.iso_towards(pos_partner, keep_singvals=True)
return np.real(tens_a.get_of(eigenvalues[0])) # type: ignore[no-any-return]
#########################################################################
######################## Time evolution methods #########################
#########################################################################
[docs]
def timestep_single_tensor(
self, pos: TnPos, next_pos: TnPos | None, sc: complex
) -> list[KrylovInfo]:
"""
Time step for a single-tensor update on a single tensor `exp(sc*Heff*dt)`.
Parameters
----------
pos : Tuple[int] | int
Position in the TN of the tensor to time-evolve
next_pos: Tuple[int] | int | None
Position in the TN of the next tensor to time-evolve. If is `None`,
we do not back-evolve in time.
sc : complex
Multiplicative factor in the exponent `exp(sc*Heff*dt)`
Return
------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
logger.info("Time-step at tensor %s-%s", pos, next_pos)
logger.debug("Time-step at tensor's norm %f, scalar %s", self.norm(), sc)
timestep_info = []
# Isometrize towards the desired tensor
self.iso_towards(pos)
pos_links = self.get_pos_links(pos)
if sc.imag == 0:
sc = sc.real
krylov_solver = self._solver(
self[pos],
sc,
self.eff_op.contract_tensor_lists,
self._convergence_parameters,
args_func=(pos, pos_links),
)
self[pos], conv = krylov_solver.solve()
timestep_info.append(conv)
if next_pos is not None:
# Have to evolve backwards
# ------------------------
#
# This is a bit inconvenient, because we have to shift the isometry
# center by hand as the r-tensor has to be evolved backwards in time.
(
rtens,
pos_partner,
link_partner,
path_elem,
) = self._partial_iso_towards_for_timestep(pos, next_pos)
# Retrieve operator from partner to iso center
ops_a = self.eff_op[(pos_partner, pos)]
# Path elem src layer-tensor-link, dst layer-tensor-link
self._update_eff_ops(path_elem)
# Needing just one operators, no idxs needed
ops_b = self.eff_op[(pos, pos_partner)]
# Assumed to be in the order of links
ops_list_reverse = [ops_b, ops_a]
krylov_solver = self._solver(
rtens,
-sc,
self.eff_op.contract_tensor_lists,
self._convergence_parameters,
args_func=(None, None),
kwargs_func={"custom_ops": ops_list_reverse},
)
rtens, conv = krylov_solver.solve()
timestep_info.append(conv)
tmp = rtens.tensordot(self[pos_partner], ([1], [link_partner]))
if link_partner == 0:
self[pos_partner] = tmp
else:
nn = self[pos_partner].ndim
perm = (
list(range(1, link_partner + 1))
+ [0]
+ list(range(link_partner + 1, nn))
)
self[pos_partner] = tmp.transpose(perm)
self.iso_center = pos_partner
# Move pos to the memory device
self.move_pos(pos, device=self._tensor_backend.memory_device, stream=True)
return timestep_info
[docs]
def timestep_two_tensors(
self, pos: TnPos, next_pos: TnPos | None, sc: complex, skip_back: bool
) -> list[KrylovInfo]:
"""
Time step for a single-tensor update on two tensors `exp(sc*Heff*dt)`.
Parameters
----------
pos : Tuple[int] | int
Position in the TN of the tensor to time-evolve
next_pos: Tuple[int] | int | None
Position in the TN of the next tensor to time-evolve
sc : complex
Multiplicative factor in the exponent `exp(sc*Heff*dt)`
skip_back : bool
Flag if backwards propagation of partner tensor can be skipped;
used for last two tensors, partner tensor must be next position
as well.
Return
------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
logger.debug("Time-step at tensor %s", pos)
timestep_info = []
if len(self.eff_proj) > 0:
# the effective projectors should not be here, as they are ignored.
raise QTeaLeavesError(
"Doing time evolution with self.eff_proj set."
+ "Remove eff_proj before starting the dynamics."
)
# Isometrize towards the desired tensor
self.iso_towards(pos)
# pos_partner, link_pos, link_partner = self.get_pos_partner_link_expansion(pos)
link_pos: int
(
link_pos,
pos_partner,
link_partner,
path_elem,
) = self._partial_iso_towards_for_timestep( # type: ignore[assignment]
pos, next_pos, no_rtens=True # type: ignore[arg-type]
)
tens_a = self[pos]
tens_b = self[pos_partner]
is_a_outgoing = tens_a.are_links_outgoing[link_pos]
theta = tens_a.tensordot(tens_b, ([link_pos], [link_partner]))
# Build custom eff ops list
custom_ops: list[Any] = []
for ii, elem in enumerate(self.get_pos_links(pos)):
if ii == link_pos:
continue
if elem is None:
custom_ops.append(None)
else:
custom_ops.append(self.eff_op[(elem, pos)])
for ii, elem in enumerate(self.get_pos_links(pos_partner)):
if ii == link_partner:
continue
if elem is None:
custom_ops.append(None)
else:
custom_ops.append(self.eff_op[(elem, pos_partner)])
custom_ops, theta, inv_perm = self.permute_spo_for_two_tensors(
custom_ops, theta, link_partner
)
krylov_solver = self._solver(
theta,
sc,
self.eff_op.contract_tensor_lists,
self._convergence_parameters,
args_func=(None, None),
kwargs_func={"custom_ops": custom_ops},
)
theta, conv = krylov_solver.solve()
timestep_info.append(conv)
if inv_perm is not None:
theta.transpose_update(inv_perm)
links_a = list(range(tens_a.ndim - 1))
links_b = list(range(tens_a.ndim - 1, theta.ndim))
tens_a, tens_b, svals, svals_cut = theta.split_svd(
links_a,
links_b,
contract_singvals="R",
conv_params=self._convergence_parameters,
is_link_outgoing_left=is_a_outgoing,
)
svals_cut = self._postprocess_singvals_cut(
singvals_cut=svals_cut, conv_params=self._convergence_parameters
)
svals_cut = theta.get_of(svals_cut)
self.set_singvals_on_link(pos, pos_partner, svals)
nn = tens_a.ndim
perm_a = list(range(link_pos)) + [nn - 1] + list(range(link_pos, nn - 1))
self[pos] = tens_a.transpose(perm_a)
nn = tens_b.ndim
perm_b = (
list(range(1, link_partner + 1)) + [0] + list(range(link_partner + 1, nn))
)
self[pos_partner] = tens_b.transpose(perm_b)
self._update_eff_ops(path_elem)
self.iso_center = pos_partner
# Move back to memory tensor at pos
self.move_pos(pos, device=self._tensor_backend.memory_device, stream=True)
if not skip_back:
# Have to evolve backwards
# ------------------------
pos_links = self.get_pos_links(pos_partner)
krylov_solver = self._solver(
self[pos_partner],
-sc,
self.eff_op.contract_tensor_lists,
self._convergence_parameters,
args_func=(pos_partner, pos_links),
)
self[pos_partner], conv = krylov_solver.solve()
timestep_info.append(conv)
elif pos_partner != next_pos:
raise QTeaLeavesError("Sweep order incompatible with two-tensor update.")
return timestep_info
[docs]
def timestep_single_tensor_link_expansion(
self, pos: TnPos, next_pos: TnPos, sc: complex
) -> list[KrylovInfo]:
"""
Time step for a single-tensor update on two tensors `exp(sc*Heff*dt)`.
Parameters
----------
pos : Tuple[int] | int
Position in the TN of the tensor to time-evolve
next_pos: Tuple[int] | int
Position in the TN of the next tensor to time-evolve
sc : complex
Multiplicative factor in the exponent `exp(sc*Heff*dt)`
Return
------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
logger.debug("Time-step with link expansion at tensor %s", pos)
timestep_info = []
requires_singvals = True
if next_pos is None:
return self.timestep_single_tensor(pos, next_pos, sc)
self.iso_towards(
pos,
trunc=requires_singvals,
keep_singvals=requires_singvals,
move_to_memory_device=True,
)
self.iso_towards(
next_pos,
trunc=requires_singvals,
keep_singvals=requires_singvals,
move_to_memory_device=False,
)
link_self: int
(
link_self,
pos_partner,
link_partner,
_,
) = self._partial_iso_towards_for_timestep( # type: ignore[assignment]
pos, next_pos, no_rtens=True
)
tensor = self[pos].copy()
tensor_partner = self[pos_partner].copy()
# Expand the tensors
option_a = (
tensor.shape[link_self]
+ self._convergence_parameters.sim_params["min_expansion"]
)
option_b = 2 * tensor.shape[link_self]
option_c = np.delete(list(tensor.shape), link_self).prod()
# pylint: disable-next=nested-min-max
new_dim = min(option_a, min(option_b, option_c))
self[pos], self[pos_partner] = tensor.expand_link_tensorpair(
tensor_partner, link_self, link_partner, new_dim, ctrl="Z"
)
# Update of eff operators (internal iso_towards in space link
# expansion cannot truncate or update singvals)
self.iso_towards(pos, move_to_memory_device=False)
# Expansion cycles
# ----------------
# Same here, use QR as otherwise truncation kicks in potentially]
# undoing the expansion. Final iso_towards with QR or SVD follows
# after expansion cycles.
exp_cycles = self._convergence_parameters.sim_params["expansion_cycles"]
sc_e = sc / exp_cycles
for ii in range(exp_cycles):
self.iso_towards(pos, move_to_memory_device=False)
conv = self.timestep_single_tensor(pos, pos_partner, sc_e)
timestep_info.extend(conv)
if exp_cycles > 1:
next_pos_partner = pos if ii < exp_cycles - 1 else None
conv = self.timestep_single_tensor(pos_partner, next_pos_partner, sc_e)
timestep_info.extend(conv)
# Evolve back the tensor at pos_partner
if exp_cycles > 1:
conv = self.timestep_single_tensor(pos_partner, None, -sc)
timestep_info.extend(conv)
self.iso_towards(
pos,
trunc=requires_singvals,
keep_singvals=requires_singvals,
move_to_memory_device=False,
)
self.iso_towards(
pos_partner,
trunc=requires_singvals,
keep_singvals=requires_singvals,
move_to_memory_device=True,
)
return timestep_info
# pylint: disable-next=too-many-return-statements
[docs]
def timestep(
self,
dt: float | complex,
mode: int,
sweep_order: Sequence[TnPos] | None = None,
sweep_order_back: Sequence[TnPos] | None = None,
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep.
Parameters
----------
dt : float | complex
Timestep
mode : int
Currently encoded are single-tensor TDVP first order (1), two-tensor
TDVP first order (2), two-tensor TDVP second order (3), and single-tensor
TDVP second order (4). A flex-TDVP as (5) is pending. The "_kraus" extension
for modes 1-4 includes dissipative evolution.
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
Details
-------
Flex-TDVP in the fortran implementation was using two-tensor updates
as long as the maximal bond dimension is not reached and then a ratio
of 9 single-tensor updates to 1 two-tensor update step.
"""
if len(self.eff_proj) > 0:
# the effective projectors should not be here, as they are ignored.
raise QTeaLeavesError(
"Doing time evolution with self.eff_proj set."
+ "Remove eff_proj before starting the dynamics."
)
dt_real: float
if mode == 1:
return self.timestep_mode_1(dt, sweep_order=sweep_order)
if mode == 2:
return self.timestep_mode_2(dt, sweep_order=sweep_order)
if mode == 3:
return self.timestep_mode_3(
dt,
sweep_order=sweep_order,
sweep_order_back=sweep_order_back,
)
if mode == 4:
return self.timestep_mode_4(
dt,
sweep_order=sweep_order,
sweep_order_back=sweep_order_back,
)
if mode == 5:
return self.timestep_mode_5(dt, sweep_order=sweep_order)
if mode == "1_kraus":
dt_real = dt.real if isinstance(dt, complex) else dt
return self.timestep_mode_1_kraus(dt_real, sweep_order=sweep_order)
if mode == "2_kraus":
dt_real = dt.real if isinstance(dt, complex) else dt
return self.timestep_mode_2_kraus(dt_real, sweep_order=sweep_order)
if mode == "3_kraus":
dt_real = dt.real if isinstance(dt, complex) else dt
return self.timestep_mode_3_kraus(
dt_real, sweep_order=sweep_order, sweep_order_back=sweep_order_back
)
if mode == "4_kraus":
dt_real = dt.real if isinstance(dt, complex) else dt
return self.timestep_mode_4_kraus(
dt_real, sweep_order=sweep_order, sweep_order_back=sweep_order_back
)
raise ValueError(f"Time evolution mode {mode} not available.")
[docs]
def timestep_mode_1(
self,
dt: float | complex,
sweep_order: Sequence[TnPos] | None = None,
normalize: bool = False,
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep (single-tensor update
1st order).
Parameters
----------
dt : float | complex
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
convergence_info = []
if sweep_order is None:
sweep_order = self.default_sweep_order()
for ii, pos in enumerate(sweep_order):
# 1st order update
next_pos = None if (ii + 1 == len(sweep_order)) else sweep_order[ii + 1]
convergence_info.extend(
self.timestep_single_tensor(pos, next_pos, -1j * dt)
)
if normalize:
self.normalize()
return convergence_info
[docs]
def timestep_mode_2(
self, dt: float | complex, sweep_order: Sequence[TnPos] | None = None
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep (two-tensor update
1st order).
Parameters
----------
dt : float | complex
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
timestep_info = []
if sweep_order is None:
sweep_order = self.default_sweep_order()
for ii, pos in enumerate(sweep_order):
# 1st order update
next_pos = None if (ii + 1 == len(sweep_order)) else sweep_order[ii + 1]
skip_back = ii + 2 == len(sweep_order)
timestep_info.extend(
self.timestep_two_tensors(pos, next_pos, -1j * dt, skip_back)
)
if skip_back:
break
return timestep_info
[docs]
def timestep_mode_3(
self,
dt: float | complex,
sweep_order: Sequence[TnPos] | None = None,
sweep_order_back: Sequence[TnPos] | None = None,
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep (two-tensor update
2nd order).
Parameters
----------
dt : float | complex
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
conv = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order)
if sweep_order_back is None:
sweep_order_back = self.default_sweep_order_back()
conv_back = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order_back)
return conv + conv_back
[docs]
def timestep_mode_4(
self,
dt: float | complex,
sweep_order: Sequence[TnPos] | None = None,
sweep_order_back: Sequence[TnPos] | None = None,
normalize: bool = False,
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep (single-tensor update
2nd order).
Parameters
----------
dt : float | complex
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
conv = self.timestep_mode_1(
0.5 * dt, sweep_order=sweep_order, normalize=normalize
)
if sweep_order_back is None:
sweep_order_back = self.default_sweep_order_back()
conv_back = self.timestep_mode_1(
0.5 * dt, sweep_order=sweep_order_back, normalize=normalize
)
return conv + conv_back
[docs]
def timestep_mode_5(
self,
dt: float | complex,
sweep_order: Sequence[TnPos] | None = None,
stride_two_tensor: int = 10,
) -> list[KrylovInfo]:
"""
Evolve the Tensor network for one timestep (mixed two-tensor and
one-tensor update, first order).
Parameters
----------
dt : float
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
stride_two_tensor: int
If maximum bond dimension is reached, do a two-tensor update
every `stride_two_tensor` steps.
Returns
-------
List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
"""
timestep_info = []
if sweep_order is None:
sweep_order = self.default_sweep_order()
idx = self._timestep_mode_5_counter
self._timestep_mode_5_counter += 1
# For the main loop, we always evolve the R-tensor back in time
skip_back = False
for ii, pos in enumerate(sweep_order[:-2]):
# Everything but the last two
next_pos = sweep_order[ii + 1]
link_pos: int
link_pos, _, _, _ = self._partial_iso_towards_for_timestep( # type: ignore[assignment]
pos, next_pos, no_rtens=True
)
is_link_full = self[pos].is_link_full(link_pos)
enforce_two_tensor = idx % stride_two_tensor == 0
do_two_tensor = (not is_link_full) or enforce_two_tensor
if do_two_tensor:
timestep_info.extend(
self.timestep_two_tensors(pos, next_pos, -1j * dt, skip_back)
)
else:
timestep_info.extend(
self.timestep_single_tensor(pos, next_pos, -1j * dt)
)
# Treat the last two tensors (cannot decide individually on update-scheme)
pos = sweep_order[-2]
next_pos = sweep_order[-1]
link_pos, pos_partner, link_partner, _ = (
self._partial_iso_towards_for_timestep( # type: ignore[assignment]
pos, next_pos, no_rtens=True
)
)
is_link_full_a = self[pos].is_link_full(link_pos)
is_link_full_b = self[pos_partner].is_link_full(link_partner)
is_link_full = is_link_full_a or is_link_full_b
enforce_two_tensor = idx % stride_two_tensor == 0
do_two_tensor = (not is_link_full) or enforce_two_tensor
if do_two_tensor:
skip_back = True
timestep_info.extend(
self.timestep_two_tensors(pos, next_pos, -1j * dt, True)
)
else:
timestep_info.extend(self.timestep_single_tensor(pos, next_pos, -1j * dt))
timestep_info.extend(self.timestep_single_tensor(next_pos, None, -1j * dt))
return timestep_info
[docs]
def timestep_mode_1_kraus(
self,
dt: float,
sweep_order: Sequence[TnPos] | None = None,
normalize: bool = False,
) -> tuple[list[KrylovInfo], Any]:
"""
Evolve the Tensor network for one timestep of the Lindblad master equation
(single-tensor update1st order) + Local dissipation.
Parameters
----------
dt : float
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
svals_cut : singular values cut in truncation.
"""
if not isinstance(self.eff_op, _AbstractEffectiveMpo):
raise QTeaLeavesError(
"Effective operator must be derived from _AbstractEffectiveMpo for Kraus channels."
)
kraus_ops = self.eff_op.get_local_kraus_operators(dt)
conv1 = self.timestep_mode_1(
0.5 * dt, sweep_order=sweep_order, normalize=normalize
)
self.apply_local_kraus_channel(kraus_ops=kraus_ops)
conv2 = self.timestep_mode_1(
0.5 * dt, sweep_order=sweep_order, normalize=normalize
)
conv = conv1 + conv2
return conv
[docs]
def timestep_mode_2_kraus(
self, dt: float, sweep_order: Sequence[TnPos] | None = None
) -> tuple[list[KrylovInfo], Any]:
"""
Evolve the Tensor network for one timestep of the Lindblad master equation
(two-tensor update 1st order) + Local dissipation.
Parameters
----------
dt : float
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
svals_cut : singular values cut in truncation.
"""
if not isinstance(self.eff_op, _AbstractEffectiveMpo):
raise QTeaLeavesError(
"Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates."
)
kraus_ops = self.eff_op.get_local_kraus_operators(dt)
conv1 = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order)
self.apply_local_kraus_channel(kraus_ops=kraus_ops)
conv2 = self.timestep_mode_2(0.5 * dt, sweep_order=sweep_order)
conv = conv1 + conv2
return conv
[docs]
def timestep_mode_3_kraus(
self,
dt: float,
sweep_order: Sequence[TnPos] | None = None,
sweep_order_back: Sequence[TnPos] | None = None,
) -> tuple[list[KrylovInfo], Any]:
"""
Evolve the Tensor network for one timestep of the Lindblad master equation
(two-tensor update 2nd order) + Local dissipation.
Parameters
----------
dt : float
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
svals_cut : singular values cut in truncation.
"""
if not isinstance(self.eff_op, _AbstractEffectiveMpo):
raise QTeaLeavesError(
"Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates."
)
kraus_ops = self.eff_op.get_local_kraus_operators(dt)
conv1 = self.timestep_mode_3(
0.5 * dt,
sweep_order=sweep_order,
sweep_order_back=sweep_order_back,
)
self.apply_local_kraus_channel(kraus_ops=kraus_ops)
conv2 = self.timestep_mode_3(
0.5 * dt, sweep_order=sweep_order, sweep_order_back=sweep_order_back
)
conv = conv1 + conv2
return conv
[docs]
def timestep_mode_4_kraus(
self,
dt: float,
sweep_order: Sequence[TnPos] | None = None,
sweep_order_back: Sequence[TnPos] | None = None,
normalize: bool = False,
) -> tuple[list[KrylovInfo], Any]:
"""
Evolve the Tensor network for one timestep of the Lindblad master equation
(single-tensor update 2nd order) + Local dissipation.
Parameters
----------
dt : float
Timestep
sweep_order : List[int] | None
Order in which we iterate through the network for the timestep.
If None, use the default in `self.default_sweep_order()`
sweep_order_back : List[int] | None
Order in which we iterate backwards through the network for the timestep.
If None, use the default in `self.default_sweep_order()[::-1]`
Returns
-------
conv : List[qtealeaves.solvers.krylovexp_solver.KrylovInfo]
Information about the convergence of each Krylov update.
svals_cut : singular values cut in truncation.
"""
if not isinstance(self.eff_op, _AbstractEffectiveMpo):
raise QTeaLeavesError(
"Effective operator must be derived from _AbstractEffectiveMpo for Kraus updates."
)
kraus_ops = self.eff_op.get_local_kraus_operators(dt)
conv1 = self.timestep_mode_4(
0.5 * dt,
sweep_order=sweep_order,
sweep_order_back=sweep_order_back,
normalize=normalize,
)
self.apply_local_kraus_channel(kraus_ops=kraus_ops)
conv2 = self.timestep_mode_4(
0.5 * dt,
sweep_order=sweep_order,
sweep_order_back=sweep_order_back,
normalize=normalize,
)
conv = conv1 + conv2
return conv
#########################################################################
########################## Observables methods ##########################
#########################################################################
#########################################################################
############################## MPI methods ##############################
#########################################################################
# pylint: disable=c-extension-no-member
def _initialize_mpi(self) -> None:
if (MPI is not None) and (MPI.COMM_WORLD.Get_size() > 1):
self.comm = MPI.COMM_WORLD # type: ignore[assignment]
# pylint: enable=c-extension-no-member
[docs]
def mpi_send_tensor(self, tensor: _AbstractQteaTensor, to_: int) -> None:
"""
Send the tensor to the process `to_`.
Parameters
----------
tensor : _AbstractQteaTensor
Tensor to send
to_ : int
Index of the process where to send the tensor
Returns
-------
None
"""
tensor.mpi_send(to_, self.comm)
[docs]
def mpi_receive_tensor(self, from_: int) -> _AbstractQteaTensor:
"""
Receive the tensor from the process `from_`.
Parameters
----------
from_ : int
Index of the process that sent the tensor
Returns
-------
xp.ndarray
Received tensor
"""
return self._tensor_backend.tensor_cls.mpi_recv(
from_, self.comm, self._tensor_backend
)
[docs]
def reinstall_isometry_parallel(self, *args: Any, **kwargs: Any) -> None:
"""
Reinstall the isometry in a parallel TN parallely
"""
# Empty on purpose: depends on TN ansatz and
# it is used ONLY for MPI-distributed ansatzes
[docs]
def reinstall_isometry_serial(self, *args: Any, **kwargs: Any) -> None:
"""
Reinstall the isometry in a parallel TN serially
"""
# Empty on purpose: depends on TN ansatz and
# it is used ONLY for MPI-distributed ansatzes
[docs]
@staticmethod
def matrix_to_tensorlist(
matrix: np.ndarray | _AbstractQteaBaseTensor,
n_sites: int,
dim: int,
conv_params: TNConvergenceParameters,
tensor_backend: TensorBackend | None = None,
) -> list[_AbstractQteaBaseTensor]:
"""
For a given matrix returns dense MPO form decomposing with SVDs
Parameters
----------
matrix : ndarray
Matrix to write in LPTN(MPO) format
n_sites : int
Number of sites
dim : int
Local Hilbert space dimension
conv_params : :py:class:`TNConvergenceParameters`
Input for handling convergence parameters.
In particular, in the LPTN simulator we
are interested in:
- the maximum bond dimension (max_bond_dimension)
- the cut ratio (cut_ratio) after which the
singular values in SVD are neglected, all
singular values such that
:math:`\\lambda` /:math:`\\lambda_max`
<= :math:`\\epsilon` are truncated
tensor_backend : instance of :class:`TensorBackend`
Default for `None` is :class:`QteaTensor` with np.complex128 on CPU.
Return
------
List[_AbstractQteaBaseTensor]
List of tensor, the MPO decomposition of the matrix
"""
if tensor_backend is None:
# prevents dangerous default value similar to dict/list
tensor_backend = TensorBackend()
work: _AbstractQteaBaseTensor
if not isinstance(matrix, tensor_backend.base_tensor_cls):
work = tensor_backend.tensor_cls.from_elem_array(matrix) # type: ignore[attr-defined]
else:
work = matrix
bond_dim = 1
tensorlist = []
for ii in range(0, n_sites - 1):
# dim dim**(n_sites-1)
# | ||
# O --[unfuse]--> O --[fuse upper and lower legs]-->
# | ||
#
# ==O== --[SVD, truncating]--> ==O-o-O==
#
# | |
# --[unfuse]--> O-O ---iterate
# | |
# dim dim**(n_sites-1)
work = work.reshape(
(
bond_dim,
dim,
dim ** (n_sites - 1 - ii),
dim,
dim ** (n_sites - 1 - ii),
),
)
tens_left, work, _, _ = work.split_svd(
[0, 1, 3], [2, 4], contract_singvals="R", conv_params=conv_params
)
tensorlist.append(tens_left)
bond_dim = deepcopy(work.shape[0])
work = work.reshape((work.shape[0], dim, dim, 1))
tensorlist.append(work)
return tensorlist
[docs]
def debug_device_memory(self) -> None:
"""
Write informations about the memory usage in each device,
and how many tensors are stored in each device.
This should not be used in performance simulations but only in debugging.
"""
# First we do this for tensors in the Tensor network
tensors_in_device: dict[str, int] = {}
memory_in_device: dict[str, int] = {}
for tens in self._iter_tensors():
if tens.device in tensors_in_device:
tensors_in_device[tens.device] += 1
memory_in_device[tens.device] += tens.getsizeof()
else:
tensors_in_device[tens.device] = 1
memory_in_device[tens.device] = tens.getsizeof()
nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum()
for (
device,
ntens,
) in tensors_in_device.items():
mem = memory_in_device[device]
logger.debug(
"TN tensors in %s are %d/%d for %d bytes",
device,
ntens,
nt_tot,
mem,
)
# Then we do the same for tensors in the effective operators
if self.eff_op is not None:
tensors_in_device = {}
memory_in_device = {}
for eff_ops_link in self.eff_op.values():
for tens in eff_ops_link:
if tens.device in tensors_in_device:
tensors_in_device[tens.device] += 1
memory_in_device[tens.device] += tens.getsizeof()
else:
tensors_in_device[tens.device] = 1
memory_in_device[tens.device] = tens.getsizeof()
nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum()
for (
device,
ntens,
) in tensors_in_device.items():
mem = memory_in_device[device]
logger.debug(
"Effective operators tensors in %s are %d/%d for %d bytes",
device,
ntens,
nt_tot,
mem,
)
# Count the memory for effective projectors
for proj in self.eff_proj:
tensors_in_device = {}
memory_in_device = {}
for tens in proj.values():
if tens.device in tensors_in_device:
tensors_in_device[tens.device] += 1
memory_in_device[tens.device] += tens.getsizeof()
else:
tensors_in_device[tens.device] = 1
memory_in_device[tens.device] = tens.getsizeof()
nt_tot = np.array(list(tensors_in_device.values()), dtype=int).sum()
for (
device,
ntens,
) in tensors_in_device.items():
mem = memory_in_device[device]
logger.debug(
"Effective operators tensors in %s are %d/%d for %d bytes",
device,
ntens,
nt_tot,
mem,
)
logger.debug(
"Used bytes in device memory: %d/%d",
self.data_mover.device_memory,
self.data_mover.mempool.total_bytes(), # type: ignore[attr-defined]
)
#########################################################################
############################## Measurement methods ######################
#########################################################################
[docs]
def meas_exp_value(self, mpo: Any) -> float:
"""
Measures the expectation value of a given MPO for a
TN state.
Arguments
---------
mpo : :py:class:`ITPO` or iterator
The MPO whose expectation value we want to measure.
The measurement has two modes. The first mode is passing the ITPO MPO,
in which case the entire mpo is measured at once.
The second mode is to pass an iterator which yields ITPO terms of the MPO
batch by batch, in which case the output will be the sum of expectation values
performed on each batch. The second mode is useful when the MPO to measure
contains a large number of ITPO terms, therefore measuring it in batches
escapes the memory issues.
Return
------
exp_value : float
Expectation value of the MPO.
"""
# save this info for later
iso_center = deepcopy(self.iso_center)
state_eff_op = self.eff_op
# Check if mpo is an iterator, if not then measure entire mpo at once
if not inspect.isgeneratorfunction(mpo):
# check tensor backend
mpo_tensor_backend = mpo.site_terms.construct_tensor_backend()
if mpo_tensor_backend.device != self.tensor_backend.device:
raise ValueError(
"MPO and state devices don't match: "
f"{mpo_tensor_backend.device} and {self.tensor_backend.device}."
)
if mpo_tensor_backend.tensor_cls != self.tensor_backend.tensor_cls:
raise ValueError(
"MPO and state tensor classes don't match: "
f"{mpo_tensor_backend.tensor_cls} and {self.tensor_backend.tensor_cls}."
)
if hasattr(self, "de_layer"):
# include the disentanglers into the itpo
mpo = self.de_layer.contract_de_layer(
mpo, self.tensor_backend, params=None
)
# Carry out the measurement of this sub-MPO
self.eff_op = None # type: ignore[assignment]
# compute eff ops
mpo.setup_as_eff_ops(self)
# measure
exp_value = self.compute_energy()
# otherwise measure in batches
else:
# measuring iteratively batch by batch
exp_value = 0.0
for sub_mpo in mpo():
if np.abs(exp_value) < 1e-8:
# check tensor backend on the first entry
mpo_tensor_backend = sub_mpo.site_terms.construct_tensor_backend()
if mpo_tensor_backend.device != self.tensor_backend.device:
raise ValueError(
"MPO and state devices don't match: "
f"{mpo_tensor_backend.device} and {self.tensor_backend.device}."
)
if mpo_tensor_backend.tensor_cls != self.tensor_backend.tensor_cls:
raise ValueError(
"MPO and state tensor classes don't match: "
f"{mpo_tensor_backend.tensor_cls} and {self.tensor_backend.tensor_cls}."
)
if hasattr(self, "de_layer"):
# include the disentanglers into the itpo
sub_mpo = self.de_layer.contract_de_layer(
sub_mpo, self.tensor_backend, params=None
)
# Carry out the measurement of this sub-MPO
self.eff_op = None # type: ignore[assignment]
# compute eff ops
sub_mpo.setup_as_eff_ops(self)
# measure
exp_value += self.compute_energy()
# restore effective operators and iso center to get state into
# previous form
self.eff_op = state_eff_op
_ = self.compute_energy()
if iso_center is not None:
self.iso_towards(iso_center)
return exp_value
[docs]
def postprocess_statedict(
state_dict: dict[str, Any], local_dim: int = 2, qiskit_convention: bool = False
) -> dict:
"""
Remove commas from the states defined as keys of statedict
and, if `qiskit_convention=True` invert the order of the
digits following the qiskit convention
Parameters
----------
state_dict : dict
State dictionary, which keys should be of the format
'd,d,d,d,d,...,d' with d from 0 to local dimension
local_dim : int or array-like of ints, optional
Local dimension of the sites. Default to 2
qiskit_convention : bool, optional
If True, invert the digit ordering to follow qiskit
convention
Return
------
dict
The postprocessed state dictionary
"""
max_local_dim = np.max(resolve_scalar_list(local_dim))
postprocecessed_state_dict = {}
for key, val in state_dict.items():
# If the maximum of the local_dim is <10
# remove the comma, since the definition
# is not confusing
if max_local_dim < 10:
key = key.replace(",", "")
# Invert the values if qiskit_convention == True
if qiskit_convention:
postprocecessed_state_dict[key[::-1]] = val
else:
postprocecessed_state_dict[key] = val
return postprocecessed_state_dict
def _resample_for_unbiased_prob(
num_samples: int, bound_probabilities: dict | None
) -> tuple[np.ndarray, dict]:
"""
Sample the `num_samples` samples in U(0,1) to use in the function
:py:func:`meas_unbiased_probabilities`. If `bound_probabilities`
is not None, then the function checks that the number of samples
outside the ranges already computed in bound_probabilities are
not in total num_samples. The array returned is sorted ascendingly
Parameters
----------
num_samples : int
Number of samples to be drawn for :py:func:`meas_unbiased_probabilities`
bound_probabilities : dict or None
See :py:func:`meas_unbiased_probabilities`.
Return
------
np.ndarray
Sorted samples in (0,1)
dict
Empty dictionary if bound_probabilities is None, otherwise the
bound_probabilities input parameter.
"""
if (bound_probabilities is None) or (len(bound_probabilities) == 0):
# Contains the boundary probability of measuring the state, i.e. if a uniform
# random number has value left_bound< value< right_bound then you measure the
# state. The dict structure is {'state' : [left_bound, right_bound]}
bound_probabilities = {}
samples = _random_uniform(0, 1, num_samples)
else:
# Prepare the functions to be used later on based on precision
mpf_wrapper, almost_equal = _mp_precision_check(mp.mp.dps)
# Go on and sample until you reach an effective number of num_samples,
# withouth taking into account those already sampled in the given
# bound_probabilities
bounds_array = np.zeros((len(bound_probabilities), 2))
for idx, bound in enumerate(bound_probabilities.values()):
bounds_array[idx, :] = bound
bounds_array = bounds_array[bounds_array[:, 0].argsort()]
if "left" in bound_probabilities and len(bound_probabilities) > 1:
bounds_array[0, 1] = min(bounds_array[0, 1], bounds_array[1, 0])
if "right" in bound_probabilities and len(bound_probabilities) > 1:
bounds_array[-1, 0] = max(bounds_array[-1, 0], bounds_array[-2, 1])
# Immediatly return if almost all the space has been measured
if almost_equal(
(np.sum(bounds_array[:, 1] - bounds_array[:, 0]), mpf_wrapper(1.0))
):
return np.random.uniform(0, 1, 1), bound_probabilities
if mp.mp.dps > 15:
logger_warning(
"Resampling is performed at standard precision 16 digits"
+ "at least in np.random.choice."
)
# Sample unsampled areas. First, prepare array for sampling
array_for_sampling = []
last_bound = 0
last_idx = 0
while not almost_equal((last_bound, mpf_wrapper(1.0))):
# Skip if interval already measured
if last_idx < len(bounds_array) and almost_equal(
(last_bound, bounds_array[last_idx, 0])
):
last_bound = bounds_array[last_idx, 1]
last_idx += 1
# Save interval
else:
if 0 < last_idx < len(bounds_array):
array_for_sampling.append(
[bounds_array[last_idx - 1, 1], bounds_array[last_idx, 0]]
)
last_bound = bounds_array[last_idx, 0]
elif last_idx == len(bounds_array):
array_for_sampling.append([bounds_array[last_idx - 1, 1], 1])
last_bound = 1
else: # Initial case
array_for_sampling.append([0, bounds_array[last_idx, 0]])
last_bound = bounds_array[last_idx, 0]
nparray_for_sampling = np.array(array_for_sampling)
# Sample from which intervals you will sample
sample_prob = nparray_for_sampling[:, 1] - nparray_for_sampling[:, 0]
sample_prob[sample_prob < 0] = 0.0
sample_prob /= np.sum(sample_prob)
intervals_idxs = np.random.choice(
np.arange(len(array_for_sampling)),
size=num_samples,
replace=True,
p=sample_prob,
)
intervals_idxs, num_samples_per_interval = np.unique(
intervals_idxs, return_counts=True
)
# Finally perform uniform sampling
samples = np.zeros(1)
for int_idx, num_samples_int in zip(intervals_idxs, num_samples_per_interval):
interval = nparray_for_sampling[int_idx, :]
samples = np.hstack(
(
samples,
np.random.uniform(
*interval, size=num_samples_int
), # type: ignore[call-overload]
)
)
samples = samples[1:]
# Sort the array
samples = np.sort(samples)
return samples, bound_probabilities
def _projector(
idxs: int | Sequence[int], shape: int | Sequence[int], xp: Any = np
) -> np.ndarray:
"""
Generate a projector of a given shape on the
subspace identified by the indexes idxs
Parameters
----------
idxs : int or array-like of ints
Indexes where the diagonal of the projector is 1,
i.e. identifying the projector subspace
shape : int or array-like of ints
Dimensions of the projector. If an int, it is
assumed a square matrix
xp : module handle
Module handle for the creation of the projector.
Possible are np (cpu) or cp (cpu). Default to np.
"""
logger_warning("DeprecationWarning: helper function `_projector` deprecated.")
_idxs = resolve_scalar_list(idxs)
if np.isscalar(shape):
shape = (shape, shape) # type: ignore[assignment]
inds = np.array(_idxs, dtype=int)
projector = xp.zeros(shape)
projector[inds, inds] = 1
return projector # type: ignore[no-any-return]
def _projector_for_rho_i(
idxs: int | Sequence[int], rho_i: _AbstractQteaTensor
) -> _AbstractQteaTensor:
"""
Generate a projector of a given shape on the
subspace identified by the indexes idxs
Parameters
----------
idxs : int or array-like of ints
Indexes where the diagonal of the projector is 1,
i.e. identifying the projector subspace
rho_i : _AbstractQteaTensor
Reduced density matrix for which a projector is needed.
"""
inds: Sequence[int]
inds = [idxs] if np.isscalar(idxs) else idxs # type: ignore[list-item,assignment]
projector = rho_i.zeros_like()
for ii in inds:
projector.set_diagonal_entry(ii, 1.0)
return projector
def _mp_precision_check(precision: int) -> tuple[Callable, Callable]:
"""
Based on the precision selected, gives
a wrapper around the initialization of
variables and almost equal check.
In particolar, if `precision>15`,
use mpmath library
Parameters
----------
precision : int
Precision of the computations
Return
------
callable
Initializer for variables
callable
Almost equal check for variables
"""
if precision > 15:
# pylint: disable-next=unnecessary-lambda
mpf_wrapper = lambda x: mp.mpf(x)
almost_equal = lambda x: mp.almosteq(
x[0], x[1], abs_eps=mp.mpf(10 ** (-precision))
)
else:
mpf_wrapper = lambda x: x
almost_equal = lambda x: np.isclose(x[0], x[1], atol=10 ** (-precision), rtol=0)
return mpf_wrapper, almost_equal
def _check_samples_in_bound_probs(
samples: np.ndarray, bound_probabilities: dict
) -> tuple[np.ndarray, np.ndarray]:
"""
Check if the samples are falling in the probability intervals
defined by the dictionary bound_probabilities, received as
output by the OPES/unbiased sampling
Parameters
----------
samples : np.ndarray
List of samples
bound_probabilities : dict
Dictionary of bound probabilities, where the key is the
measure and the values the intervals of probability
Returns
-------
np.ndarray(float)
The probability sampled by samples, repeated the correct
amount of times
np.ndarray(float)
The subset of the original samples not falling into the
already measured intervals
"""
if len(bound_probabilities) == 0:
return np.array([]), samples
bound_probs = np.array(list(bound_probabilities.values()))
left_bound = bound_probs[:, 0]
right_bound = bound_probs[:, 1]
probs = bound_probs[:, 1] - bound_probs[:, 0]
new_samples: list[float] = []
def get_probs(sample: float, new_samples: list) -> float:
# Careful, accessing variable outside of the scope of this function
# Okay, because used directly
condition = np.logical_and(sample < right_bound, sample > left_bound)
if not any(condition):
new_samples.append(sample)
return -1.0
res = probs[condition]
return res[0] # type: ignore[no-any-return]
# get_probs = np.vectorize(get_probs)
probablity_sampled = np.array([get_probs(ss, new_samples) for ss in samples])
probablity_sampled = probablity_sampled[probablity_sampled > 0].astype(float)
return probablity_sampled, np.array(new_samples)
def _random_uniform(lower: float, upper: float, num_samples: int) -> np.ndarray:
"""
Return a numpy array of num_samples samples distributed uniformly
over the interval [lower, upper). If the mpmath precision is greater
than 15 uses mpmath, otherwise uses numpy.random.uniform.
Parameters
----------
lower : float
Lower boundary of the output interval. All values generated will
be greater than or equal to lower.
upper : float
Upper boundary of the output interval. All values generated will
be less than upper.
num_samples : int
Number of sampled values.
Returns
-------
np.ndarray | np.array(mpf)
Drawn samples.
"""
if mp.mp.dps <= 15:
return np.random.uniform(lower, upper, num_samples)
samples = np.array(
[
mp.mpf(
"0." + "".join([str(np.random.randint(10)) for _ in range(mp.mp.dps)])
)
for _ in range(num_samples)
]
)
samples = lower + samples * (upper - lower) # type: ignore[operator]
return samples # type: ignore[no-any-return]