Source code for qtealeaves.mpos.indexedtpo

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Indexed tensor product operators (iTPOs) for Hamiltonians or observables.
-------------------------------------------------------------------------
A Hamiltonian MPO (or any MPO operator) is often a sum of tensor product terms (TPOs),
where every TPO term contains n operators acting on n sites.

Very often, we have only few different operator tensors building the Hamiltonian:
the Ising Hamiltonian is, for example, built only from sigma_x and sigma_z operators.
Therefore, to save memory, we don't have to define our Hamiltonian MPO by saving every
operator at every site (as it is done in a DenseMPOList), but instead we can assign
every operator tensor an index ii and store only the information of type:
"operator ii acting on site x with the prefactor w", where ii points to e.g. sigma_x.
This way of representing MPOs is what we call the indexed TPO picture!

Another difference with respect to the DenseMPOs/DenseMPOLists is that local terms
are treated differently in the iTPO picture. All the local terms
acting on the same site are contracted into a single local term of same dimensions.

There are 3 classes in this module: `ITPOTerm`, `ITPOSites`, `ITPO`

ITPOTerm :      ITPOTerm contains all operators acting on a single site, each coming
                from a different TPO term in the MPO.
                The "Term" in the name is a bit misleading, as it does not refer to
                the TPO term which usually acts on more than one site, but it refers
                to a collection of operators acting on a single site.
                Additionally, ITPOTerm can contain one local term that is treated separately
                and is stored under ITPOTerm._local.

ITPO :          The full MPO, containing all its TPO terms. In general, it has two parts:
                ITPO.site_terms and ITPO.eff_ops. ITPO.site_terms are a class of ITPOSites
                (description below) and they represent the TPO terms coming from a Hamiltonian
                or any operator acting on the physical sites of the tensor network.
                ITPO.eff_ops on the other hand, represent all the TPO terms inside the effective
                operators around each tensor in a tensor network. Therefore, ITPO.eff_ops depend
                on a TN ansatz. In a simulation, ITPO.site_terms are usually the input Hamiltonian
                and ITPO.eff_ops are computed by contracting this Hamiltonian with tensors in a TN.

ITPOSites :     Represent the TPO terms coming from a Hamiltonian or any operator acting on
                physical sites of the tensor network. Stored as a list of ITPOTerm-s, such that
                there is one ITPOTerm per system site.

Some useful attributes/functions/remarks:

ITPOTerm :      - Acts on a single site, but there is no attribute inside of it which tell
                which site it is. This info is instead stored in ITPO/ITPOSites.
                - ITPOTerm._tensors : following the iTPO convention, this is a dictionary with
                all the tensors, where the key is the operator string which defines it
                - ITPOTerm._operators : dictionary, with the key being a TPO ID and the value is
                the corresponding operator string
                - ITPOTerm.weights : list of prefactors for every operator tensor in ITPO, ordered
                by TPO IDs
                - ITPOTerm._local : tensor of a local term, if any

ITPO :          - the easiest way to create an ITPO is to first create a DenseMPOList, initialize
                empty itpo = ITPO(num_sites), and then convert DenseMPOList to ITPO with
                itpo.add_dense_mpo_list(DenseMPOList)
                - ITPO.site_terms : list of ITPOTerms (= ITPOSites) that act on physical sites
                in a TN
                - ITPO.eff_ops : a dictionary of effective operators around every tensor in a tensor
                network. The keys in a dictionary specify which tensor we are looking at, with
                the convention differing for different TNs (e.g. for MPS, it's just the index of
                the tensor, for TTN it's (layer_idx, tensor_idx)). The value for each iterm in
                ITPO.eff_ops is again a dictionary containing three ITPOTerms, as there are
                three effective operators per tensor. The convention for keys of this dictionary
                again differs for different TNs, but (roughly speaking) it's supposed to specify
                whether the corresponding ITPOTerm is left, down, or right effective operator.
                Upon initialization, ITPO.eff_ops is an empty dictionary. The actual effective
                operators are computed once self.contr_to_eff_op() is called.

ITPOSites :     - Nothing much to say, is literally the list of ITPOTerms.

"""

# pylint: disable=protected-access
# pylint: disable=too-many-lines
# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods

import logging
from contextlib import nullcontext
from copy import deepcopy

import numpy as np

from qtealeaves.operators import TNOperators
from qtealeaves.tensors import TensorBackend
from qtealeaves.tensors.abstracttensor import _AbstractQteaTensor
from qtealeaves.tooling import QTeaLeavesError
from qtealeaves.tooling.parameterized import _ParameterizedClass
from qtealeaves.tooling.permutations import _transpose_idx
from qtealeaves.tooling.restrictedclasses import _RestrictedList

from .abstracteffop import _AbstractEffectiveMpo
from .densempos import DenseMPO, DenseMPOList, MPOSite, _duplicate_check_and_set

__all__ = ["ITPOTerm", "ITPOSites", "ITPO"]

logger = logging.getLogger(__name__)


[docs] class ITPOTerm(_ParameterizedClass): """Single iTPO term either for Hamiltonian or inside effective operators. ITPOTerm contains all operators acting on a single site, each coming from a different TPO term in the MPO. The "Term" in the name is a bit misleading, as it does not refer to the TPO term which usually acts on more than one site, but it refers to a collection of operators acting on a single site. Additionally, ITPOTerm can contain one local term that is treated separately and is stored under ITPOTerm._local. """ def __init__(self, do_indexing=True, enable_update=False): self._tensors = {} self._operators = {} self._link_inds = {} self._weights = {} self._local = None # Needed for measurements self._measurement_setup = False self._meas_vec = None # Needed for time evolution update mode self._enable_update = enable_update self._cache_update = {} # Needed to update while skipping contractions # -------------------------------------------- # Keys are TPO IDs, values are scalars self._prefactor = {} # Keys are TPO IDs, values are something parameterized (scalar, callable, str) self._pstrength = {} self._local_ops = [] self._local_prefactors = [] self._local_prefactors_num = [] self._local_pstrengths = [] self._local_oqs = [] self._apply_kraus = [] # internal flag to allow TPO without indexing self._do_indexing = do_indexing # tracker for computational effort self._contraction_counter = 0 # -------------------------------------------------------------------------- # Overwritten magic methods # -------------------------------------------------------------------------- def __iter__(self): """Iterator over all the tensors of the ITPOTerm""" yield from self._tensors.values() if self._local is not None: yield self._local def __repr__(self): """ User-friendly representation of object for print(). """ local = 0 if self._local is None else 1 str_repr = f"{self.__class__.__name__}" str_repr += f"(TPO_IDs={list(self._link_inds)}, num_local={local})" return str_repr # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def device(self): """Device where the tensor is stored.""" for _, elem in self._tensors.items(): return elem.device if self._local is not None: return self._local.device raise QTeaLeavesError("Running inquiery on empty iTPOTerm.") @property def enable_update(self): """Property if update of time-dependent couplings is enabled.""" return self._enable_update @enable_update.setter def enable_update(self, value): """Setter for property if update of time-dependent couplings is enabled.""" self._enable_update = value @property def dtype(self): """Data type of the underlying arrays.""" for _, elem in self._tensors.items(): return elem.dtype if self._local is not None: return self._local.dtype raise QTeaLeavesError("Running inquiry on empty iTPOTerm.") @property def idx_eye(self): """By convention, we will set the "eye" tensor at key -2.""" return -2 @property def has_oqs(self): """Return flag if the iTPOTerm contains any open system term.""" has_oqs = False for has_oqs_ii in self._local_oqs: has_oqs = has_oqs or has_oqs_ii return has_oqs @property def local_dim(self): """Return the local dimension of the ITPOTerm as int.""" if self._local is not None: return self._local.shape[0] for _, elem in self._tensors.items(): # First and last index are horizontal links return elem.shape[1] raise QTeaLeavesError("Running query on empty iTPOTerm.") # -------------------------------------------------------------------------- # classmethod, classmethod-like # --------------------------------------------------------------------------
[docs] def empty_copy(self, other=None, do_copy_meas_vec=False): """Make a copy of the settings of a term without entries.""" measurement_setup = self._measurement_setup if other is not None: measurement_setup = measurement_setup or other._measurement_setup obj = ITPOTerm(do_indexing=self._do_indexing, enable_update=self._enable_update) obj.set_meas_status(measurement_setup) if do_copy_meas_vec: obj._transfer_entries_measurement(self, other) return obj
# -------------------------------------------------------------------------- # Methods # --------------------------------------------------------------------------
[docs] def convert(self, dtype, device, stream=None): """Convert underlying array to the specified data type inplace.""" # convert tensors in n-body terms, n>1 for _, tensor in self._tensors.items(): tensor.convert(dtype, device, stream=stream) # convert local terms if self._local is not None: self._local.convert(dtype, device, stream=stream) # convert local operators for local_op in self._local_ops: local_op.convert(dtype, device)
[docs] def is_gpu(self, query=None): """Check if object itself or a device string `query` is a GPU.""" for _, tensor in self._tensors.items(): return tensor.is_gpu(query=query) if self._local is not None: return self._local.is_gpu(query=query) if query is not None: raise QTeaLeavesError("Running query on empty iTPO; no tensor to check.") # If there is no tensor, it should be save to say it is not on the GPU # This assumption will be fine in terms of conversions. return False
[docs] def copy(self): """Actual copy of instance.""" new = ITPOTerm() for key, value in self._tensors.items(): new._tensors[key] = value.copy() new._operators = deepcopy(self._operators) new._link_inds = deepcopy(self._link_inds) new._weights = deepcopy(self._weights) new._local = deepcopy(self._local) # Needed for measurements new._measurement_setup = deepcopy(self._measurement_setup) new._meas_vec = deepcopy(self._meas_vec) # Need to update while skipping contractions new._prefactor = deepcopy(self._prefactor) new._pstrength = deepcopy(self._pstrength) new._local_ops = deepcopy(self._local_ops) new._local_prefactors = deepcopy(self._local_prefactors) new._local_pstrengths = deepcopy(self._local_pstrengths) # internal flag to allow TPO without indexing new._do_indexing = deepcopy(self._do_indexing) new._enable_update = deepcopy(self._enable_update) new._cache_update = deepcopy(self._cache_update) # tracker for computational effort new._contraction_counter = deepcopy(self._contraction_counter) return new
[docs] def delete_or_cache_tensors(self, inds): """Delete or cache entries (used to remove terms which contracted to local.""" for idx in inds: if self._enable_update: self._cache_update[idx] = self._tensors[idx] del self._tensors[idx]
[docs] def get_max_tpo_id(self): """Get maximum TPO ID in this term.""" max_tpo_id = -1 for key in self._link_inds: max_tpo_id = max(max_tpo_id, key) return max_tpo_id
[docs] def iter_tpo_ids(self): """Iterator over all TPO IDs present.""" yield from self._link_inds
[docs] def sanity_check(self): """Quick set of checks that the iTPOTerm fulfills certain criteria.""" tensor = None for key, elem in self._tensors.items(): tensor = elem if tensor is not None: for key, elem in self._tensors.items(): if tensor.ndim != elem.ndim: raise QTeaLeavesError( "iTPO contains tensors of different rank", key, elem.shape, tensor.shape, ) if self._local is not None: if tensor.ndim - 2 != self._local.ndim: raise QTeaLeavesError( "iTPO mismatch in rank of local", key, self._local.shape, tensor.shape, )
[docs] def set_meas_status(self, do_measurement): """Set the measurement status of this iTPOTerm.""" if do_measurement: self._measurement_setup = True self._meas_vec = {} else: self._measurement_setup = False self._meas_vec = None
[docs] def stream(self, disable_streams=False): """ Define a stream for any operation Parameters ---------- disable_streams : bool, optional Allows to disable streams to avoid nested creation of streams. Globally, streams should be disabled via the `set_streams_qteatensors` function of the base tensor module. Default to False. Returns ------- Context manager, e.g., :class:`to.cuda.Stream` if on GPU :class:`nullcontext(AbstractContextManager)` otherwise """ # For symmetric tensors, we check all of them if we find any # context which is not the nullcontext. stream = None for _, elem in self._tensors.items(): stream = elem.stream(disable_streams=disable_streams) if not isinstance(stream, nullcontext): return stream if self._local is not None: stream = self._local.stream(disable_streams=disable_streams) if not isinstance(stream, nullcontext): return stream if stream is None: # We could opt as well for returning a nullcontext since # there should be hardly any operation profiting from # streams then anyway. raise QTeaLeavesError("Running inquiry on empty iTPOTerm.") return stream
@staticmethod def _synchronize_streams(streams): """Wait for all streams in the list to synchronize.""" # Synchronize all streams to ensure order for stream in streams: if not isinstance(stream, nullcontext): stream.synchronize()
[docs] def update_couplings(self, params): """Update the coupling with a new params dictionary.""" # Update couplings in TPO terms for key, value in self._pstrength.items(): strength = self.eval_numeric_param(value, params) prefactor = self._prefactor[key] total_scaling = 1.0 if strength is not None: total_scaling *= strength if prefactor is not None: total_scaling *= prefactor self._weights[key] = total_scaling # Update couplings in local terms self._local = None for ii, tensor in enumerate(self._local_ops): strength = self.eval_numeric_param(self._local_pstrengths[ii], params) prefactor = self.eval_numeric_param(self._local_prefactors[ii], params) self._local_prefactors_num[ii] = prefactor total_scaling = 1.0 if strength is not None: total_scaling *= strength if prefactor is not None: total_scaling *= prefactor if self._apply_kraus and self._apply_kraus[ii]: # first if statement to make sure we dont access an empty list continue if self._local_oqs[ii]: # The one-half originates from L rho Ldagger - 0.5 {Ldagger L, rho} # in the Lindblad equation. local = tensor.conj().tensordot(tensor, [[0, 1, 3], [0, 1, 3]]) local = local * (-0.5j * total_scaling) else: local = tensor * total_scaling local.remove_dummy_link(3) local.remove_dummy_link(0) if self._local is None: self._local = local else: self._local.add_update(local)
[docs] def add_local(self, operator, prefactor, strength, pstrength, is_oqs): """Add a local term to the iTPOTerm when building a Hamiltonian.""" self._local_ops.append(operator) self._local_prefactors.append(prefactor) self._local_prefactors_num.append(prefactor) self._local_pstrengths.append(pstrength) self._local_oqs.append(is_oqs) if strength is None: strength = 1.0 if is_oqs: # The initialization is assumed to be for the statics and # therefore we do not add the Lindblad terms return local = operator * (prefactor * strength) local.remove_dummy_link(3) local.remove_dummy_link(0) if self._local is None: self._local = local return self._local.add_update(local)
[docs] def add_term( self, tpo_id, operator, link_inds, prefactor, strength, pstrength, is_oqs ): """Add an interaction term to the iTPOTerm when building a Hamiltonian.""" idx_op = self.get_index_operator(operator) if tpo_id in self._operators: raise QTeaLeavesError("Cannot overwrite term with `add_term`.") self._operators[tpo_id] = idx_op self._link_inds[tpo_id] = link_inds if is_oqs: # The initialization is assumed to be for the statics and # therefore we do not add the Lindblad terms # because prefactor is accounted for only once in a term, # so is the imaginary unit self._weights[tpo_id] = 0 self._prefactor[tpo_id] = (-0.5j) * prefactor else: self._weights[tpo_id] = prefactor * strength self._prefactor[tpo_id] = prefactor self._pstrength[tpo_id] = pstrength
[docs] def collect_measurements(self): """Iterator to yield all iTPO-IDs and values of measurements.""" for key, value in self._meas_vec.items(): if isinstance(value, _AbstractQteaTensor): continue yield key, value
[docs] def run_measurements(self, ket, idx_out, link_weights): """Run the measurements on the iTPOTerm, i.e., on all stored local tensors.""" tmp = ket.conj() if link_weights is not None: link_weights_2 = link_weights**2 tmp.scale_link_update(link_weights_2, idx_out) cidx = list(range(tmp.ndim)) value_dict = {} for key, value in self._meas_vec.items(): elem = tmp.tensordot(value, (cidx, cidx)) value_dict[key] = elem.get_entry() self._meas_vec = value_dict
[docs] def get_index_operator(self, operator): """Get an index for an operator; created if not existing yet.""" if self._do_indexing: for key, tensor in self._tensors.items(): if operator == tensor: return key key = len(self._tensors) while key in self._tensors: key += 1 self._tensors[key] = operator return key
[docs] def get_index_copy_operator(self, idx): """Copy a tensor and return index of copy.""" key = len(self._tensors) while key in self._tensors: key += 1 if key in self._tensors: raise QTeaLeavesError("Key not new") self._tensors[key] = self._tensors[idx].copy() return key
def _transfer_entries(self, link_inds, weights, operators): """Internal function to transfer entries for links, weights, and operators.""" for key, value in link_inds.items(): self._link_inds[key] = value for key, value in weights.items(): self._weights[key] = value for key, value in operators.items(): self._operators[key] = value def _transfer_entries_measurement(self, other_a, other_b=None): if not self._measurement_setup: return # rint("Transfer meas entries keys", self._meas_vec.keys()) for key, value in other_a._meas_vec.items(): if isinstance(value, _AbstractQteaTensor): self._meas_vec[key] = value if other_b is not None: for key, value in other_b._meas_vec.items(): if isinstance(value, _AbstractQteaTensor): self._meas_vec[key] = value
[docs] def to_str(self, ind_offset=0): """String of values that are used for actual simulation.""" str_buffer = "\n" ind0 = " " * ind_offset ind4 = " " * (4 + ind_offset) if self._local is None: str_buffer += ind0 + "No local term.\n" else: tmp = self._local.flatten() if len(tmp) <= 32: str_buffer += ind0 + "Local term with " + str(tmp) + "\n" else: norm = self._local.norm() str_buffer += ind0 + "Local's norm is " + str(norm) + "\n" for key, weight in self._weights.items(): op_idx = self._operators[key] str_buffer += ind0 + f"TPO ID: {key} with weight {weight}\n" tmp = self._tensors[op_idx].flatten() if len(tmp) <= 32: str_buffer += ind4 + "Tensor is " + str(tmp) + "\n" else: norm = self._tensors[op_idx].norm() str_buffer += ind4 + "Tensor's norm is " + str(norm) + "\n" return str_buffer
[docs] def tensordot_with_tensor( self, tensor, cidx_self, cidx_tensor, perm_local_out=None, ctens=None ): """ Execute contraction of iTPOTerm with tensors. Uncontracted non-MPO (non-horizontal) legs of self go before uncontracted non-MPO legs of tensor. **Arguments** tensor : instance of :class:`_AbstractQteaTensor` Tensor in the contraction as right/second tensor. cidx_self : list of ints Contraction legs for full TPO tensor (local will auto-adapt) cidx_tensor : list of ints Contraction legs for the `tensor` perm_local_out : list of ints Permutation of output (full TPO tensor will auto-adapt) ctens : `None` or :class:`ITPOTerm` If present, update mode is activated which assumes that only a scalar weight has changed. **Returns** ctens : :class:`ITPOTerm` Result of contraction. """ if ctens is not None: ctens._transfer_entries({}, self._weights, {}) tasks = [] # Still have to contract local (as it is collapsed if self._local is not None: self._tensors[-1] = self._local ctens._tensors[-1] = None tasks.append(-1) else: ctens = self.empty_copy(do_copy_meas_vec=True) ctens._transfer_entries(self._link_inds, self._weights, self._operators) for key in self._tensors: ctens._tensors[key] = None if self._local is not None: self._tensors[-1] = self._local ctens._tensors[-1] = None tasks = list(self._tensors.keys()) # Execute contractions (parallelized via streams on GPU) # -------------------- num_tasks = len(tasks) streams = [self.stream(ii) for ii in range(num_tasks)] ctens._contraction_counter += len(tasks) for ii, task in enumerate(tasks): with streams[ii]: tens_a = self._tensors[task] if task == -1: cidx_self = [elem - 1 for elem in cidx_self] ctens._tensors[task] = tens_a.tensordot( tensor, (cidx_self, cidx_tensor), disable_streams=True, ) if task != -1: # Install order of MPO links (right now they are in the 1st half) num_ab = ctens._tensors[task].ndim idx_1 = tens_a.ndim - len(cidx_self) - 1 perm = list(range(num_ab)) perm.remove(idx_1) perm = perm + [idx_1] ctens._tensors[task].transpose_update(perm) if perm_local_out is not None: if task != -1: perm_out = ITPOTerm._local_perm_to_full_perm(perm_local_out) else: perm_out = perm_local_out ctens._tensors[task].transpose_update(perm_out) self._synchronize_streams(streams) if self._local is not None: ctens._local = ctens._tensors[-1] del ctens._tensors[-1] del self._tensors[-1] ctens.sanity_check() return ctens
[docs] def tensordot_with_tensor_left( self, tensor, cidx_tensor, cidx_self, perm_local_out=None, ctens=None ): """ Execute contraction of iTPOTerm with tensor. Uncontracted non-MPO (non-horizontal) legs of tensor go before uncontracted non-MPO legs of self. **Arguments** tensor : instance of :class:`_AbstractQteaTensor` Tensor in the contraction as right/second tensor. cidx_tensor : list of ints Contraction legs for the `tensor` cidx_self : list of ints Contraction legs for full TPO tensor (local will auto-adapt) perm_local_out : list of intes Permutation of output (full TPO tensor will auto-adapt) ctens : `None` or :class:`ITPOTerm` If present, update mode is activating which assumes that only a scalar weight has changed. **Returns** ctens : :class:`ITPOTerm` Result of contraction. """ if ctens is not None: ctens._transfer_entries({}, self._weights, {}) tasks = [] # Still have to contract local (as it is collapsed) if self._local is not None: self._tensors[-1] = self._local ctens._tensors[-1] = None tasks.append(-1) else: ctens = self.empty_copy(do_copy_meas_vec=True) ctens._transfer_entries(self._link_inds, self._weights, self._operators) for key in self._tensors: ctens._tensors[key] = None if self._local is not None: self._tensors[-1] = self._local ctens._tensors[-1] = None tasks = list(self._tensors.keys()) # Execute contractions (parallelized via streams on GPU) # -------------------- num_tasks = len(tasks) streams = [self.stream() for ii in range(num_tasks)] ctens._contraction_counter += len(tasks) for ii, task in enumerate(tasks): with streams[ii]: tens_b = self._tensors[task] if task == -1: cidx_self = [elem - 1 for elem in cidx_self] ctens._tensors[task] = tensor.tensordot( tens_b, (cidx_tensor, cidx_self), disable_streams=True, ) if task != -1: # Install order of MPO links (right now they are in the 2nd half) num_ab = ctens._tensors[task].ndim idx_0 = tensor.ndim - len(cidx_tensor) perm = list(range(num_ab)) perm.remove(idx_0) perm = [idx_0] + perm ctens._tensors[task].transpose_update(perm) if perm_local_out is not None: if task != -1: perm_out = ITPOTerm._local_perm_to_full_perm(perm_local_out) else: perm_out = perm_local_out ctens._tensors[task].transpose_update(perm_out) self._synchronize_streams(streams) if self._local is not None: ctens._local = ctens._tensors[-1] del ctens._tensors[-1] del self._tensors[-1] ctens.sanity_check() return ctens
[docs] def matrix_multiply( self, other, cidx_self, cidx_other, eye_a=None, eye_b=None, perm_local_out=None, ctens=None, ): """ Contract of two iTPOTerms. **Arguments** other : instance of :class:`iTPOTerm` Right / second term in the multiplication cidx_self : list of ints Contraction legs for full TPO tensor (local will auto-adapt) cidx_other : list of ints Contraction legs for full TPO tensor (local will auto-adapt) eye_a : instance of :class:`_AbstractQteaTensor` or None, optional If `self` is not a rank-4 iTPOTerm, pass what should be used as identity. Default to `None` eye_b : instance of :class:`_AbstractQteaTensor` or None, optional If `other` is not a rank-4 iTPOTerm, pass what should be used as identity. Default to `None` perm_local_out : list of ints or None, optional Permutation of output (full TPO tensor will auto-adapt) (MPO links will be permuted in first and last place automatically) Default to `None` ctens : `None` or :class:`ITPOTerm` If present, update mode is activating which assumes that only a scalar weight has changed. """ self._print_matrix_multiply_entry() self.sanity_check() other.sanity_check() contr_tasks, link_ids = self._generate_tpo_ids_for_contrs(other) unique_tasks = self._group_contr_tasks(other, contr_tasks, eye_a, eye_b) self._print_contr_tasks(contr_tasks, unique_tasks) task_keys = list(unique_tasks.keys()) num_tasks = len(task_keys) if ctens is not None: tensors = ctens._tensors for key, value in ctens._cache_update.items(): tensors[key] = value ctens._local = None local_contr_tasks = {} local_unique_tasks = {} for elem in [-10, -20]: if elem in contr_tasks: key = contr_tasks[elem] local_contr_tasks[elem] = key local_unique_tasks[key] = unique_tasks[key] local_task_keys = list(local_unique_tasks.keys()) num_local_tasks = len(local_unique_tasks) case_funcs = self.get_case_funcs() for ii in range(num_local_tasks): # Retrieve information key = local_task_keys[ii] case_id, op_idx_a, op_idx_b = key target_key = local_unique_tasks[key] # Carry out contraction func = case_funcs[case_id] tensor_ii = func( self._tensors[op_idx_a], other._tensors[op_idx_b], cidx_self, cidx_other, perm_local_out, ) # Set value ctens._tensors[target_key] = tensor_ii else: # Dictionary prepared with None values tensors = {} for _, value in unique_tasks.items(): tensors[value] = None # Carry out contractions (parallelized via streams on GPU) # ---------------------- streams = [self.stream() for ii in range(num_tasks)] case_funcs = self.get_case_funcs() for ii in range(num_tasks): with streams[ii]: # Retrieve information key = task_keys[ii] case_id, op_idx_a, op_idx_b = key target_key = unique_tasks[key] # Carry out contraction func = case_funcs[case_id] tensor_ii = func( self._tensors[op_idx_a], other._tensors[op_idx_b], cidx_self, cidx_other, perm_local_out, disable_streams=True, ) # Set value tensors[target_key] = tensor_ii self._synchronize_streams(streams) # Post-process # ------------ if ctens is None: ctens = self.empty_copy(other=other, do_copy_meas_vec=True) ctens._tensors = tensors ctens._contraction_counter += num_tasks else: ctens._transfer_entries_measurement(self, other_b=other) ctens._contraction_counter += num_local_tasks logger.debug("Tensors keys %s", tensors.keys()) logger.debug("." * 15) to_be_deleted = set() for key, value in contr_tasks.items(): if value[0] in [1, 34, 43, 53, 63, 99]: # Remaining TPO term - does not contract to local ctens._operators[key] = unique_tasks[value] ctens._link_inds[key] = link_ids[key] ctens._weights[key] = self._weights.get(key, 1.0) * other._weights.get( key, 1.0 ) continue # Contraction to local # .................... op_idx = unique_tasks[value] to_be_deleted.add(op_idx) logger.debug( "Local term %s %s %s", op_idx, value, ctens._tensors[op_idx].shape ) weight = self._weights.get(key, 1.0) * other._weights.get(key, 1.0) if ctens._measurement_setup: if value[0] in [7, 8]: ctens._meas_vec[key] = ctens._tensors[op_idx].copy() * weight if weight == 0.0: # Here we can check against exact zero, e.g., matching zero weights # coming from a compression continue if ctens._local is None: ctens._local = ctens._tensors[op_idx] * weight else: ctens._local.add_update(ctens._tensors[op_idx], factor_other=weight) # Remove local terms from tensors ctens.delete_or_cache_tensors(to_be_deleted) # Cleanup temporary items in original TPOs if self._local is not None: del self._tensors[-1] if other._local is not None: del other._tensors[-1] if self.idx_eye in self._tensors: del self._tensors[self.idx_eye] if other.idx_eye in other._tensors: del other._tensors[other.idx_eye] ctens.sanity_check() if ctens._measurement_setup: logger.debug("ctens._meas_vec %s", ctens._meas_vec.keys()) return ctens
def _generate_tpo_ids_for_contrs(self, other): """Returns dict where keys are TPO-ID and values are case-ID and op-IDs.""" # Do we profit from local_only=False argument as in fortran??? # ordering is match_rl, match_lr, match_ll, match_rr: 34, 43 itpo_contr_ids = { (True, False, False, False): 34, (False, True, False, False): 43, (False, False, True, False): 53, (False, False, False, True): 63, (True, True, False, False): 7, (False, False, True, True): 8, } tasks = {} link_ids = {} for key, op_idx_a in self._operators.items(): op_idx_b = other._operators.get(key, None) if op_idx_b is None: tasks[key] = (1, op_idx_a, self.idx_eye) link_ids[key] = self._link_inds[key] continue match_rl = self._link_inds[key][1] == other._link_inds[key][0] match_lr = self._link_inds[key][0] == other._link_inds[key][1] match_ll = self._link_inds[key][0] == other._link_inds[key][0] match_rr = self._link_inds[key][1] == other._link_inds[key][1] key_bool = (match_rl, match_lr, match_ll, match_rr) case_id = itpo_contr_ids[key_bool] tasks[key] = (case_id, op_idx_a, op_idx_b) link_id = [] if case_id in [34, 63]: link_id.append(self._link_inds[key][0]) if case_id in [43, 53]: link_id.append(self._link_inds[key][1]) if case_id in [43, 63]: link_id.append(other._link_inds[key][0]) if case_id in [34, 53]: link_id.append(other._link_inds[key][1]) link_ids[key] = link_id for key, op_idx_b in other._operators.items(): op_idx_a = self._operators.get(key, None) if op_idx_a is not None: continue tasks[key] = (99, self.idx_eye, op_idx_b) link_ids[key] = other._link_inds[key] # Add local terms as well as task if self._local is not None: self._tensors[-1] = self._local tasks[-10] = (-10, -1, self.idx_eye) if other._local is not None: other._tensors[-1] = other._local tasks[-20] = (-20, self.idx_eye, -1) return tasks, link_ids def _group_contr_tasks(self, other, contr_tasks, eye_a, eye_b): """Returns dict where keys are case-ID/op-ID tuple and value is new tensor index.""" # Aim for reproducible order which we need for update keys = sorted(list(contr_tasks.keys())) unique_tasks = {} idx = 0 for key in keys: value = contr_tasks[key] if value in unique_tasks: continue unique_tasks[value] = idx idx += 1 self._tensors[self.idx_eye] = eye_a other._tensors[self.idx_eye] = eye_b return unique_tasks
[docs] def filter_tpo_ids(self, filter_tpo_ids): """ Removes all operators that do not have a tpo-id given by filter_tpo_ids. Parameters ---------- filter_tpo_ids : list[ int ] Tpo-ids that are not removed. Returns ------- filtered_op : ITPOTerm The operator with filtered ITPO-ids. """ filtered_op = self.empty_copy() # the tensors and local objects have to be copied filtered_op._tensors = self._tensors.copy() # Keys are TPO IDs, values are scalars filtered_op._prefactor = self._prefactor.copy() filtered_op._local_ops = self._local_ops.copy() filtered_op._local_prefactors = self._local_prefactors.copy() filtered_op._local_pstrengths = self._local_pstrengths.copy() filtered_op._local_oqs = self._local_oqs.copy() if self._local is None: filtered_op._local = None else: filtered_op._local = self._local.copy() for tpo_id in self.iter_tpo_ids(): if tpo_id in filter_tpo_ids: if tpo_id in self._operators.keys(): filtered_op._operators[tpo_id] = self._operators[tpo_id] if tpo_id in self._link_inds.keys(): filtered_op._link_inds[tpo_id] = self._link_inds[tpo_id] if tpo_id in self._weights.keys(): filtered_op._weights[tpo_id] = self._weights[tpo_id] if tpo_id in self._prefactor.keys(): filtered_op._prefactor[tpo_id] = self._prefactor[tpo_id] if tpo_id in self._pstrength.keys(): filtered_op._pstrength[tpo_id] = self._pstrength[tpo_id] return filtered_op
@staticmethod def _local_perm_to_full_perm(perm): """Generate a full permutation keeping first and last link in place.""" full_perm = [0] for elem in perm: full_perm.append(elem + 1) nn = len(full_perm) full_perm.append(nn) return full_perm
[docs] @staticmethod def get_case_funcs(): """ Construct the mapping between contraction cases as integers and their functions. **Details** The cases and their IDs are a one-to-one copy from the fortran code. At the beginning, we had even more of them with every possible combination of contracting rank-3 and rank-4 MPOs with 0, 1, and 2 matching horizontal links. Now, they contain only rank-4 MPOs with 1 and matching horizontal links plus the local term rules at -10, -20. To simplify coding, the cases have each their own function (where in fortran it was still a select case). Copy-pasted from each-functions docstring: 1) only left has TPO-ID 7) lr and rl match to local term 8) ll and rr match to local term 34) match rl and keeping as TPO-ID 43) match lr and keeping as TPO-ID 53) match ll and keeping as TPO-ID 63) match rr and keeping as TPO-ID 99) only right has TPO-ID -10) local term in left -20) local term in right """ case_funcs = { 1: ITPOTerm._contr_1, 7: ITPOTerm._contr_7, 8: ITPOTerm._contr_8, 34: ITPOTerm._contr_34, 43: ITPOTerm._contr_43, 53: ITPOTerm._contr_53, 63: ITPOTerm._contr_63, 99: ITPOTerm._contr_99, -10: ITPOTerm._contr_10, -20: ITPOTerm._contr_20, } return case_funcs
@staticmethod def _contr_1(tens_a, eye_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 1: only left has TPO-ID""" # Offset by one for missing MPO link cidx_b = [ii - 1 for ii in cidx_b] if eye_b is not None: num_a = tens_a.ndim num_b = eye_b.ndim tmp = tens_a.tensordot( eye_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # permutation to move MPO links to first and last place num_ab = num_a + num_b - 2 * len(cidx_a) idx_1 = num_a - len(cidx_a) - 1 perm = list(range(num_ab)) perm.remove(idx_1) perm = perm + [idx_1] tmp.transpose_update(perm) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp # There must be an actual identity on eye_b, if we contract # over ... # # * zero links: outer product, need to construct identity # * one link: contraction with identity does not do anything # * two links: trace?? if len(cidx_a) == 1: # Contraction with identity does not do anything, but we have # to check permutations num_a = tens_a.ndim ordered = list(range(num_a)) order = deepcopy(ordered) order.remove(cidx_a[0]) order = order[:-1] + cidx_a + [order[-1]] order = np.array(order) if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) final = order[full_perm] ordered = np.array(ordered) do_permute = not np.all(final == ordered) else: do_permute = True if do_permute: tmp = tens_a.transpose(order) if perm_out is not None: tmp.transpose_update(full_perm) else: tmp = tens_a.copy() return tmp raise NotImplementedError("Contr ID 1 without eye_b.") @staticmethod def _contr_7(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 7: lr and rl match to local term.""" # | | # ---o---o--- # | | | | # | | # |_________| num_a = tens_a.ndim num_b = tens_b.ndim cidx_a = [0] + cidx_a + [num_a - 1] cidx_b = [num_b - 1] + cidx_b + [0] tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) if perm_out is not None: tmp.transpose_update(perm_out) return tmp @staticmethod def _contr_8(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 8: ll and rr match to local term.""" # ________ # | | # |-O-| |-O-| # |______| num_a = tens_a.ndim num_b = tens_b.ndim cidx_a = [0] + cidx_a + [num_a - 1] cidx_b = [0] + cidx_b + [num_b - 1] tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) if perm_out is not None: tmp.transpose_update(perm_out) return tmp @staticmethod def _contr_10(tens_a, eye_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id -10: local term in left.""" # Offset by one for missing MPO link in both cidx_a = [ii - 1 for ii in cidx_a] cidx_b = [ii - 1 for ii in cidx_b] if eye_b is not None: num_a = tens_a.ndim tmp = tens_a.tensordot( eye_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # Permutation from argument if perm_out is not None: # full_perm = iTPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(perm_out) return tmp # There must be an actual identity on eye_b, if we contract # over ... # # * zero links: outer product, need to construct identity # * one link: contraction with identity does not do anything # * two links: trace?? if len(cidx_a) == 1: # Contraction with identity does not do anything, but we have # to check permutations num_a = tens_a.ndim ordered = list(range(num_a)) order = deepcopy(ordered) order.remove(cidx_a[0]) order = order + cidx_a order = np.array(order) if perm_out is not None: final = order[perm_out] ordered = np.array(ordered) do_permute = not np.all(final == ordered) else: do_permute = True if do_permute: tmp = tens_a.transpose(order) if perm_out is not None: tmp.transpose_update(perm_out) else: tmp = deepcopy(tens_a) return tmp raise NotImplementedError("Contr ID -10 without eye_b.") @staticmethod def _contr_20(eye_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id -20: local term in right.""" # Offset by one for missing MPO link in both cidx_a = [ii - 1 for ii in cidx_a] cidx_b = [ii - 1 for ii in cidx_b] if eye_a is not None: num_b = tens_b.ndim tmp = eye_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # Permutation from argument if perm_out is not None: tmp.transpose_update(perm_out) return tmp # There must be an actual identity on eye_a, if we contract # over ... # # * zero links: outer product, need to construct identity # * one link: contraction with identity does not do anything # * two links: trace?? if len(cidx_b) == 1: # Contraction with identity does not do anything, but we have # to check permutations num_b = tens_b.ndim ordered = list(range(num_b)) order = deepcopy(ordered) order.remove(cidx_b[0]) order = cidx_b + order order = np.array(order) if perm_out is not None: final = order[perm_out] ordered = np.array(ordered) do_permute = not np.all(final == ordered) else: do_permute = True if do_permute: tmp = tens_b.transpose(order) if perm_out is not None: tmp.transpose_update(perm_out) else: tmp = deepcopy(tens_b) return tmp raise NotImplementedError("Contr ID 20 without eye_a.") @staticmethod def _contr_34(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 34: match rl and keeping as TPO-ID""" # | | # --o---o-- # | | num_a = tens_a.ndim cidx_a = cidx_a + [num_a - 1] cidx_b = cidx_b + [0] tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # No permutation required (first and last link already MPO links) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp @staticmethod def _contr_43(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 43: match lr and keeping as TPO-ID""" # | | # |--o- -o--| # | | | | # | | # |___________| num_a = tens_a.ndim num_b = tens_b.ndim cidx_a = cidx_a + [0] cidx_b = cidx_b + [num_b - 1] tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # Permutation to move MPO links to first and last place num_ab = num_a + num_b - 2 * len(cidx_a) idx_0 = num_ab - len(cidx_a) - 1 idx_1 = num_ab - len(cidx_a) perm = list(range(num_ab)) perm.remove(idx_0) perm.remove(idx_1) perm = [idx_0] + perm + [idx_1] tmp.transpose_update(perm) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp @staticmethod def _contr_53(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 53: match ll and keeping as TPO-ID""" # |-O- |-O- # |_____| num_a = tens_a.ndim num_b = tens_b.ndim cidx_a = [0] + cidx_a cidx_b = [0] + cidx_b tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # Permutation to move MPO links to first and last place num_ab = num_a + num_b - 2 - 2 * len(cidx_a) idx_0 = num_a - len(cidx_a) - 1 perm = list(range(num_ab)) perm.remove(idx_0) perm = [idx_0] + perm tmp.transpose_update(perm) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp @staticmethod def _contr_63(tens_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 63: match rr and keeping as TPO-ID""" # -O-| -O-| # |_____| num_a = tens_a.ndim num_b = tens_b.ndim cidx_a = cidx_a + [num_a - 1] cidx_b = cidx_b + [num_b - 1] tmp = tens_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # Permutation to move MPO links to first and last place num_ab = num_a + num_b - 2 - 2 * len(cidx_a) idx_1 = num_a - len(cidx_a) perm = list(range(num_ab)) perm.remove(idx_1) perm = perm + [idx_1] tmp.transpose_update(perm) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp @staticmethod def _contr_99(eye_a, tens_b, cidx_a, cidx_b, perm_out, disable_streams=False): """Contraction for case-id 99: only right has TPO-ID.""" # Offset by one for missing MPO link cidx_a = [ii - 1 for ii in cidx_a] if eye_a is not None: num_a = eye_a.ndim num_b = tens_b.ndim tmp = eye_a.tensordot( tens_b, (cidx_a, cidx_b), disable_streams=disable_streams ) # permutation to move MPO links to first and last place num_ab = num_a + num_b - 2 * len(cidx_a) idx_0 = num_a - len(cidx_a) perm = list(range(num_ab)) perm.remove(idx_0) perm = [idx_0] + perm tmp.transpose_update(perm) # Permutation from argument if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) tmp.transpose_update(full_perm) return tmp # There must be an actual identity on eye_a, if we contract # over ... # # * zero links: outer product, need to construct identity # * one link: contraction with identity does not do anything # * two links: trace?? if len(cidx_b) == 1: # Contraction with identity does not do anything, but we have # to check permutations num_b = tens_b.ndim ordered = list(range(num_b)) order = deepcopy(ordered) order.remove(cidx_b[0]) order = [order[0]] + cidx_b + order[1:] order = np.array(order) if perm_out is not None: full_perm = ITPOTerm._local_perm_to_full_perm(perm_out) final = order[full_perm] ordered = np.array(ordered) do_permute = not np.all(final == ordered) else: do_permute = True if do_permute: tmp = tens_b.transpose(order) if perm_out is not None: tmp.transpose_update(full_perm) else: tmp = deepcopy(tens_b) return tmp raise NotImplementedError("Contr ID 99 without eye_a.") def _print_matrix_multiply_entry(self): """Print summary of self when entering matrix_multiply.""" logger.debug("\n\n %s", "*" * 80) logger.debug("matrix_multiply") for _, tensor in self._tensors.items(): logger.debug("Tensor self %s", tensor.shape) if self._local is not None: logger.debug("Local self %s", self._local.shape) @staticmethod def _print_contr_tasks(contr_tasks, unique_tasks): """Print a summary of the contraction tasks found.""" logger.debug("contr_tasks: TPO-ID, (case-ID, op_idx_a, op_idx_b)") for key, value in contr_tasks.items(): logger.debug(" > %s %s", key, value) logger.debug("unique_tasks: (case-ID, op_idx_a, op_idx_b), new_op_idx") for key, value in unique_tasks.items(): logger.debug(" - %s %s", key, value)
[docs] class ITPOSites(_RestrictedList): """ ITPOSites contains the physical terms = list ITPOTerms in the Hamiltonian or any operator acting on physical sites of a TN. There's one ITPOTerm per system site. """ class_allowed = ITPOTerm def __init__(self, num_sites, do_indexing, enable_update): super().__init__() for _ in range(num_sites): self.append(ITPOTerm(do_indexing=do_indexing, enable_update=enable_update)) @property def has_oqs(self): """Flag if MPO has Lindblad terms (just present, not looking at coupling).""" has_oqs = False for elem in self: has_oqs = has_oqs or elem.has_oqs return has_oqs @property def local_dim(self): """Return the local dimensions via the :class:`ITPOTerm`s as list[int].""" return [elem.local_dim for elem in self]
[docs] def update_couplings(self, params): """Load couplings from an update params dictionary.""" for elem in self: elem.update_couplings(params)
[docs] def get_max_tpo_id(self): """Loop over all sites to get the maximal TPO ID.""" max_tpo_id = -1 for elem in self: max_tpo_id = max(max_tpo_id, elem.get_max_tpo_id()) return max_tpo_id
[docs] def add_dense_mpo_list(self, dense_mpo_list): """Add terms from a :class:`DenseMPOList` to the iTPO sites.""" tpo_id = -1 for _, mpo in enumerate(dense_mpo_list): if len(mpo) == 1: # Local term jj = mpo[0].site self[jj].add_local( mpo[0].operator, mpo[0].weight, mpo[0].strength, mpo[0].pstrength, mpo.is_oqs, ) continue tpo_id += 1 for jj, site in enumerate(mpo): link_inds = (jj, jj + 1) if jj + 1 == len(mpo): link_inds = (jj, 0) kk = site.site self[kk].add_term( tpo_id, site.operator, link_inds, site.weight, site.strength, site.pstrength, mpo.is_oqs, ) for elem in self: elem.sanity_check()
[docs] def construct_tensor_backend(self): """ Construct the tensor backend. Finds a tensor_like, which is either a local_ops tensor or a tensor from the first interaction term, and parses its tensor_backend. """ tensor_like = None # first try through the local terms for elem in self: if tensor_like is None: # if exists, take the first _local_ops if len(elem._local_ops) > 0: tensor_like = elem._local_ops[0] # If no locals, try the first interaction term by # reading from the first non-empty term as `itpo_term` if tensor_like is None: itpo_term = None op_ind = None for itpo_term in self: try: op_ind = itpo_term._operators[0] break except KeyError: pass if op_ind is None: raise ValueError("Encountered an empty iTPO.") tensor_like = itpo_term._tensors[op_ind] # Now tensor_like should be set. # Use it to construct the tensor_backend tensor_class = tensor_like.__class__ if tensor_like.base_tensor_cls is None: base_tensor_cls = tensor_class else: base_tensor_cls = tensor_like.base_tensor_cls datamover = base_tensor_cls.get_default_datamover() tensor_backend = TensorBackend( tensor_class, base_tensor_cls, tensor_like.device, tensor_like.dtype, datamover=datamover, ) return tensor_backend
[docs] def to_dense_mpo_list_unparameterized(self, op_dict, prefix_op_key=""): """ Convert site terms into dense MPO list while loosing access to the parameterization. If you need to maintain parameterization, use `to_dense_mpo_list`. Arguments --------- op_dict : :class:`TNOperators` Dictionary with the operators, can be modified inplace to add more operators. prefix_op_key : str, optional Prefix to the operators key to avoid duplicates with different representation of the operator. Default to "" (empty string, no prefix) Returns ------- mpo : :class:`DenseMPOList` MPO represented as DenseMPOList instead of an ITPO. Parameterization cannot be updated, i.e., weights are fixed now. """ dense_mpo_list = DenseMPOList() tensor_backend = self.construct_tensor_backend() def _add_prefix(key, prefix=prefix_op_key): """Add prefix to operator name if given.""" # We could do this a bit nicer with a leading underscore etc. if len(prefix) == 0: return key return prefix + key # Cover local terms for ii, elem in enumerate(self): if elem._local is None: continue # Everything has been collapsed into the local matrix / operator prefactor = 1.0 pstrength = None key = _add_prefix(f"_local__{ii}") key = _duplicate_check_and_set(op_dict, key, elem._local) site = MPOSite(ii, key, pstrength, prefactor, operators=op_dict) # Open question how to retrieve this information is_oqs = False # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO([site], is_oqs=is_oqs, tensor_backend=tensor_backend) dense_mpo_list.append(dense_mpo) # And the interactions max_tpo_id = self.get_max_tpo_id() for ii in range(max_tpo_id + 1): sites = [] for jj, elem in enumerate(self): if ii not in elem._operators: # TPO ID not present in this term continue sites.append(jj) if len(sites) == 0: # For some reason, TPO ID not active continue # Now we add all the sites # Open question how to retrieve this information is_oqs = False # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO([], is_oqs=is_oqs, tensor_backend=tensor_backend) for jj in sites: elem = self[jj] prefactor = elem._weights[ii] pstrength = None key = _add_prefix(f"_tpo_interaction_{ii}_{jj}") key = _duplicate_check_and_set( op_dict, key, elem._tensors[elem._operators[ii]] ) site = MPOSite(jj, key, pstrength, prefactor, operators=op_dict) dense_mpo.append(site) dense_mpo._singvals.append(None) dense_mpo_list.append(dense_mpo) return dense_mpo_list
[docs] def to_dense_mpo_list(self, params): """Convert site terms into dense MPO list.""" dense_mpo_list = DenseMPOList() tensor_backend = self.construct_tensor_backend() op_dict = TNOperators() # Cover local terms for ii, elem in enumerate(self): for jj, ops in enumerate(elem._local_ops): prefactor = elem._local_prefactors[jj] pstrength = elem._local_pstrengths[jj] key = f"_local__{ii}_{jj}" op_dict[key] = ops site = MPOSite( ii, key, pstrength, prefactor, operators=op_dict, params=params ) # Open question how to retrieve this information is_oqs = False # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO( [site], is_oqs=is_oqs, tensor_backend=tensor_backend ) dense_mpo_list.append(dense_mpo) # And the interactions max_tpo_id = self.get_max_tpo_id() for ii in range(max_tpo_id + 1): sites = [] for jj, elem in enumerate(self): if ii not in elem._operators: # TPO ID not present in this term continue sites.append(jj) if len(sites) == 0: # For some reason, TPO ID not active continue # Now we add all the sites # Open question how to retrieve this information is_oqs = False # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO([], is_oqs=is_oqs, tensor_backend=tensor_backend) for jj in sites: elem = self[jj] prefactor = elem._prefactor[ii] pstrength = elem._pstrength[ii] ops = elem._operators[ii] key = f"_tpo_interaction_{ii}_{jj}" op_dict[key] = elem._tensors[ops] site = MPOSite( jj, key, pstrength, prefactor, operators=op_dict, params=params ) dense_mpo.append(site) dense_mpo._singvals.append(None) dense_mpo_list.append(dense_mpo) return dense_mpo_list, op_dict
[docs] def to_str(self): """Generate a string with information on he Hamiltonian (site terms).""" str_buffer = "\n\n" + "=" * 80 + "\n" str_buffer += f"iTPO on {len(self)} sites\n" for ii, site in enumerate(self): str_buffer += f"Site {ii}\n" str_buffer += site.to_str() if ii + 1 == len(self): str_buffer += "=" * 80 + "\n\n" else: str_buffer += "-" * 40 + "\n\n" return str_buffer
[docs] def trace(self): """ Compute the trace, i.e., of the underlying Hamilonian on the physical sites stored here. Returns ------- operator_trace : float | complex The number corresponds to the trace of the operator. """ # the unparameterized version is sufficient as it is a construct and # forget approach op_dict = TNOperators() dense_mpo_list = self.to_dense_mpo_list_unparameterized(op_dict) return dense_mpo_list.trace(self.local_dim)
[docs] class ITPO(_AbstractEffectiveMpo): """ iTPO with Hamiltonian and effective operators, e.g., for ground state search. Consists of the full Hamiltonian (ITPO.site_terms) + effective operators for every site and position (ITPO.eff_ops). ITPO.site_terms are a class of ITPOSites and they represent the TPO terms coming from a Hamiltonian or any operator acting on the physical sites of the tensor network. ITPO.eff_ops depend on a TN ansatz. In a simulation, ITPO.site_terms are usually the input Hamiltonain, and ITPO.eff_ops are computed by contracting this Hamiltonian with tensors in a TN. **Arguments** num_sites : int Number of sites in the system, e.g., qubits. do_compress : bool, optional Flag if compression should be activated (True). Default ``False`` (no compression). do_indexing : bool, optional Flag if indexing should be used (True) or if running as TPO (False). Default to ``True`` enable_update : bool, optional Flag if smart update of time-dependent parameters should be activated (True). Activation also caches additional states on top of the effective operators. Default to `False` (no smart update). **Details** The indexed tensor product operator comes in different flavors: * TPO : without indexing, pass `do_indexing` flag. * iTPO : with indexing, only unique operators are contracted (default) * iuTPO : iTPO with smart update for time-evolution: pass flag `enable_update` on initialization, moreover `do_update` has to be set and unset when updating the coupling of the Hamiltonian. * icTPO : iTPO with compression, set flag `do_compress`, which is beneficial for systems with many interactions. * iucTPO : iTPO with compression and smart update for time-dependent parameters. """ # pylint: disable-next=super-init-not-called def __init__( self, num_sites, do_compress=False, do_indexing=True, enable_update=False ): self.site_terms = ITPOSites( num_sites, do_indexing=do_indexing, enable_update=enable_update ) self.eff_ops = {} # tracking self._contraction_counter = {} # for convert, compression mode self._tensor_network = None # store mode on indexing / updating here as well self._do_indexing = do_indexing self._enable_update = enable_update self._do_update = False # for compression mode, convert self._do_compress = do_compress self._compressible = {} # -------------------------------------------------------------------------- # Overwritten magic methods # -------------------------------------------------------------------------- def __repr__(self): """ User-friendly representation of object for print(). """ eff_ops = {} if len(self.eff_ops) != 0: eff_ops = f"<dict with {len(self.eff_ops)} keys>" str_repr = f"{self.__class__.__name__}(num_sites={self.num_sites}, " str_repr += f"site_terms={self.site_terms}, eff_ops={eff_ops})" return str_repr # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def device(self): """Device where the tensor is stored.""" for _, elem in self.eff_ops.items(): return elem.device raise QTeaLeavesError("Running inquiry on empty effective operator.") @property def dtype(self): """Data type of the underlying arrays.""" for _, elem in self.eff_ops.items(): return elem.dtype raise QTeaLeavesError("Running inquiry on empty effective operator.") @property def do_update(self): """Status of the flag for doing update of effective operators.""" return self._do_update @do_update.setter def do_update(self, value): """Set status of the flag for doing update of effective operators.""" self._do_update = value @property def has_oqs(self): """Flag if MPO has Lindblad terms (just present, not looking at coupling).""" return self.site_terms.has_oqs @property def num_sites(self): """Return the number of sites in the underlying system.""" return len(self.site_terms) # -------------------------------------------------------------------------- # Overwritten operators # -------------------------------------------------------------------------- def __contains__(self, idxs) -> bool: """Check if entry inside the effective operators.""" return idxs in self.eff_ops def __getitem__(self, key): """Get an entry from the effective operators.""" return self.eff_ops[key] def __setitem__(self, key, value): """Set an entry from the effective operators.""" if not isinstance(value, ITPOTerm): raise TypeError("`site_term` must be an iTPOTerm") self.eff_ops[key] = value # -------------------------------------------------------------------------- # Abstract effective operator methods requiring implementation here # --------------------------------------------------------------------------
[docs] def contr_to_eff_op(self, tensor, pos, pos_links, idx_out): """ Contract operator lists with tensors T and Tdagger to effective operator. **Arguments** tensor : :class:`_AbstractQteaTensor` Tensor to be contracted to effective operator. pos : int, tuple (depending on TN) Position of tensor. pos_links : list of int, tuple (depending on TN) Position of neighboring tensors where the links in `tensor` lead to. idx_out : int Uncontracted link to be used for effective operator. """ ops_list, idx_list, key, ikey = self._helper_contract_to_eff_op( pos, pos_links, idx_out ) c_counter = self._contraction_counter.get(key, 0) # Different loop starts are beneficial if idx_out > np.max(idx_list): # From left to right idx_start = 0 stride = 1 elif idx_out < np.min(idx_list): # Looping backwards requires flipping links - avoid for now idx_start = 0 stride = 1 ## From right to left # idx_start = len(idx_list) - 1 # stride = -1 else: # To get contractable stuff, we start right # of the gap and move rightwards stride = 1 for ii, elem in enumerate(idx_list): if elem > idx_out: idx_start = ii # Contract tree tensor with sparse MPO cidx = idx_list[idx_start] perm_out = _transpose_idx(tensor.ndim, cidx) # perm_out[perm_out == tensor.ndim - 1] += 1 # perm_out = [tensor.ndim - 1] + list(perm_out) + [tensor.ndim + 1] ctens, ukey = self.get_ctens_for_update(key, -1) ctens = ops_list[idx_start].tensordot_with_tensor_left( tensor, [cidx], [2], perm_local_out=perm_out, ctens=ctens ) self.set_ctens_for_update(ukey, ctens) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 ii = idx_start for jj in range(len(idx_list) - 1): ii += stride if ii == len(idx_list): ii = 0 cached_ctens, ukey = self.get_ctens_for_update(key, jj) if stride > 0: mat_b = ops_list[ii] # Need offset of one as we have link to the left now cidx_a = [idx_list[ii] + 1] cidx_b = [2] perm_out = _transpose_idx(tensor.ndim, idx_list[ii]) # perm_out = _transpose_idx(ctens.ndim - 1, cidx_a[0]) # perm_out = list(perm_out) + [ctens.ndim - 1] # print('ctens shape: ', ctens._tensors[0].shape) # print('matb shape: ', mat_b._tensors[0].shape) ctens = ctens.matrix_multiply( mat_b, cidx_a, cidx_b, eye_a=tensor, perm_local_out=perm_out, ctens=cached_ctens, ) else: mat_a = ops_list[ii] # Need offset of one as we have link to the left now (cidx_b) cidx_a = [2] cidx_b = [idx_list[ii] + 1] perm_out = _transpose_idx(tensor.ndim, idx_list[ii]) # perm_out = _transpose_idx(ctens.ndim - 2, idx_list[ii]) # perm_out += 1 # perm_out = [0] + list(perm_out) + [ctens.ndim - 1] ctens = mat_a.matrix_multiply( ctens, cidx_a, cidx_b, eye_b=tensor, perm_local_out=perm_out, ctens=cached_ctens, ) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 self.set_ctens_for_update(ukey, ctens) # Contract with complex conjugated cidx_a = tensor._invert_link_selection([idx_out]) cidx_b = [ii + 1 for ii in cidx_a] # We get a four-link tensor # perm_out = [1, 0, 2, 3] cached_ctens, ukey = self.get_ctens_for_update(key, -2) ctens = ctens.tensordot_with_tensor_left( tensor.conj(), cidx_a, cidx_b, ctens=cached_ctens, # , perm_local_out=perm_out ) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 self.set_ctens_for_update(ukey, ctens) if (not ctens._measurement_setup) and ikey in self.eff_ops: del self.eff_ops[ikey] if ctens._measurement_setup and key in self.eff_ops: raise QTeaLeavesError("Potentially overwriting measurement results.") self.eff_ops[key] = ctens self._contraction_counter[key] = c_counter if self._do_compress: self.compress(key)
[docs] def contract_tensor_lists( self, tensor, pos, pos_links, custom_ops=None, pre_return_hook=None ): """ Linear operator to contract all the effective operators around the tensor in position `pos`. Used as a matrix-vector multiplication function in solvers. **Arguments** ------------- tensor : :class:`_AbstractQteaTensor` Tensor to be contracted to effective operator. pos : int, tuple (depending on TN) Position of tensor. pos_links : list of int, tuple (depending on TN) Position of neighboring tensors where the links in `tensor` lead to. custom_ops : `None` or list of :class:`ITPOTerm` Ordered list of iTPO terms for tensor, which should be used instead of information in `pos` and `pos_links`. pre_return_hook : `None` - not used for ITPO **Return** ---------- ctens : :class:`_AbstractQteaTensor` The tensor contracted with effective operators. """ if custom_ops is None: ops_list = [] idx_list = [] for ii, pos_link in enumerate(pos_links): if pos_link is None: continue pos_jj = self.eff_ops[(pos_link, pos)] ops_list.append(pos_jj) idx_list.append(ii) else: # Required for time evolution backwards step on R-tensor # and two-tensor update; None entries must be acceptable ops_list = [] idx_list = [] for ii, elem in enumerate(custom_ops): if elem is None: continue ops_list.append(elem) idx_list.append(ii) # Find the best start of the loop with an sparse MPO with just one # row if possible idx_start = 0 # DIFF to SPO: for ii, elem in enumerate(ops_list): # DIFF to SPO: if elem._sp_mat.shape[0] == 1: # DIFF to SPO: idx_start = ii # DIFF to SPO: break # Contract tree tensor with sparse MPO cidx = idx_list[idx_start] perm_out = _transpose_idx(tensor.ndim, cidx) # DIFF to SPO: perm_out[perm_out == tensor.ndim - 1] += 1 # DIFF to SPO: perm_out = [tensor.ndim - 1] + list(perm_out) + [tensor.ndim + 1] ctens = ops_list[idx_start].tensordot_with_tensor_left( tensor, [cidx], [2], perm_local_out=perm_out ) c_counter = self._contraction_counter.get(pos, 0) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 ii = idx_start for _ in range(len(idx_list) - 1): ii += 1 if ii == len(idx_list): ii = 0 op_ii = ops_list[ii] cidx_a = [idx_list[ii] + 1] cidx_b = [2] perm_out = _transpose_idx(tensor.ndim, idx_list[ii]) # DIFF to SPO: perm_out = _transpose_idx(ctens.ndim - 1, cidx_a[0]) # DIFF to SPO: perm_out = list(perm_out) + [ctens.ndim - 1] ctens = ctens.matrix_multiply( op_ii, cidx_a, cidx_b, eye_a=tensor, perm_local_out=perm_out ) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 # DIFF to SPO: if len(ctens._operators) > 0: raise QTeaLeavesError("contract_tensor_lists does not contract to local.") ctens = ctens._local self._contraction_counter[pos] = c_counter if pre_return_hook is not None: ctens = pre_return_hook(ctens) return ctens
[docs] def convert(self, dtype, device): """ Convert underlying array to the specified data type inplace. Original site terms data type is preserved, while the device is converted. """ if (self.dtype == dtype) and (self.device == device): return # We could detect up-conversion and down-conversion. Only for # conversion to higher precisions, we have to copy from the # site terms again which are in double precision if self._tensor_network is None: raise QTeaLeavesError("convert needs tensor network to be set.") for ii, key in enumerate(self._tensor_network._iter_physical_links()): # convert the site_terms to the correct device, but keep dtype same # because we don't want to lose any info about Hamiltonian terms self.site_terms[ii].convert(dtype=None, device=device) # dtype of the corresponding eff op will be converted in a loop below self[key] = self.site_terms[ii].copy() for _, elem in self.eff_ops.items(): elem.convert(dtype, device)
[docs] def trace(self): """ Compute the trace of an ITPO, i.e., of the underlying Hamilonian on the physical sites. Returns ------- operator_trace : float | complex The number corresponds to the trace of the operator. """ return self.site_terms.trace()
[docs] def mpo_product( self, other, self_conj=False, self_transpose=False, other_conj=False, other_transpose=False, ): """ Compute the product of two ITPOs. The order of the product is self*other. Parameters ---------- other : :py:class:`ITPO` Representing the operator that we are multiplying to self. `other` is the right-hand-side operator in the multiplication. self_conj : Boolean, optional Tells if self needs to be complex conjugated. Default is False. self_transpose : Boolean, optional Tells if self needs to be transposed. Default is False. other_conj : Boolean, optional Tells if other needs to be complex conjugated. Default is False. other_transpose : Boolean, optional Tells if other needs to be transposed. Default is False. Returns ------- mpo_product : :py:class:`ITPO` Representing the products of the operators represented by self and other. Details ------- This function is potentially costly in terms of memory and computation time. The ITPO obtained from this function cannot be changed by parameterization. """ operators = TNOperators() self_mpo_list = self.site_terms.to_dense_mpo_list_unparameterized( operators, prefix_op_key="_self" ) other_mpo_list = other.site_terms.to_dense_mpo_list_unparameterized( operators, prefix_op_key="_other" ) product_mpo_list = self_mpo_list.mpo_product( other_mpo_list, operators, self_conj=self_conj, self_transpose=self_transpose, other_conj=other_conj, other_transpose=other_transpose, ) product_itpo = ITPO(self.num_sites) product_itpo.add_dense_mpo_list(product_mpo_list) return product_itpo
# -------------------------------------------------------------------------- # Overwriting methods from parent class # --------------------------------------------------------------------------
[docs] def print_summary(self): """Print summary of computational effort.""" mode_str = f"(indexing={self._do_indexing}, " mode_str += f"compress={self._do_compress})" logger.debug("%s Contraction summary iTPO %s %s", "=" * 20, mode_str, "=" * 20) total = 0 for key, value in self._contraction_counter.items(): logger.debug("Count %s = %d", key, value) total += value logger.debug("Total contractions: %d", total) logger.debug("^" * 60)
[docs] def add_contraction_counters(self, other): """Add a contraction counter in-place.""" for key, value in other._contraction_counter.items(): if key in self._contraction_counter: self._contraction_counter[key] += value else: self._contraction_counter[key] = value
# -------------------------------------------------------------------------- # Methods specific to iTPO # --------------------------------------------------------------------------
[docs] def collect_measurements(self, num_terms=None): """Collect the measurements from measurement setup of iTPO.""" if num_terms is None: results = {} else: results = np.zeros(num_terms, dtype=np.complex128) for _, elem in self.eff_ops.items(): for key, value in elem.collect_measurements(): results[key] = value return results
[docs] def set_meas_status(self, do_measurement=True): """Set the measurement status for all iTPOTerms in iTPOSites.""" for elem in self.site_terms: elem.set_meas_status(do_measurement)
[docs] def compress(self, pos_tuple): """ Compress iTPOTerm at a given position. **Arguments** pos_tuple : tuple position of the effective operator to be compressed via two tensor positions **Details** Considering an eight-site MPS-like structure with all-to-all two-body interactions the terms by default at link between sites 6 and 7 are * 1 acting at 7 and 8 * 2 acting at 7 and 8 * 3 acting at 7 and 8 * 4 acting at 7 and 8 * 5 acting at 7 and 8 * 6 acting at 7 and 8 and will be compressed to * (1 + 2 + 3 + 4 + 5 + 6) acting at 7 * (1 + 2 + 3 + 4 + 5 + 6) acting at 8 Thus, compression makes potentially sense for many interactions and if more than half of the system is integrated into an effective operator. The latter is checked to avoid unnecessary execution. """ if self._tensor_network is None: raise QTeaLeavesError("iTPO was not set up for compression.") if not self._compressible.get(pos_tuple, True): # From previous evaluation, we can skip here return # Sanity check on incoming tensor self.eff_ops[pos_tuple].sanity_check() # Define tolerance hard-coded here tol = 1e-12 sites_idx_eff, _ = self._tensor_network.get_bipartition_link( pos_tuple[0], pos_tuple[1] ) num_sites_eff = len(sites_idx_eff) if 2 * num_sites_eff <= self.num_sites: # No benefit of compression self._compressible[pos_tuple] = False return orig_list = [] for key, value in self.eff_ops[pos_tuple]._operators.items(): orig_list.append(value) nn_orig = len(set(orig_list)) # Status will keep track of # no key: TPO ID not treated yet # 1 : compressed as destination # 2 : compressed as (one of the) source(s) # 3 : compression not possible # 4 : zero weight, status = {} for tpo_id in self.eff_ops[pos_tuple].iter_tpo_ids(): status_tpo_id = status.get(tpo_id, 0) if status_tpo_id > 0: # Already visited - skip continue if abs(self.eff_ops[pos_tuple]._weights[tpo_id]) < tol: # Weight zero (can be compressed, but we cannot use this one # as the one to keep) status[tpo_id] = 4 continue dst_sites = [] dst_ops = [] dst_weights = self.eff_ops[pos_tuple]._weights[tpo_id] for ii, key in enumerate(self._tensor_network._iter_physical_links()): if ii in sites_idx_eff: # Sites are already in effective operator continue if tpo_id not in self.eff_ops[key]._link_inds: # TPO ID not present continue dst_weights *= self.eff_ops[key]._weights[tpo_id] dst_sites.append(ii) dst_ops.append(self.eff_ops[key]._operators[tpo_id]) if abs(dst_weights) < tol: # Same as before (non-trivial case where another weight # is zero on one of the terms of this interaction) status[tpo_id] = 4 continue # The inner loop over TPO IDs searches for a matching partner # to compress with for the `tpo_id` of the outer loop idx_tensor = None for idx in self.eff_ops[pos_tuple].iter_tpo_ids(): if idx == tpo_id: continue status_idx = status.get(idx, 0) if status_idx in [1, 2, 3]: continue src_sites = [] src_ops = [] src_weights = self.eff_ops[pos_tuple]._weights[idx] for ii, key in enumerate(self._tensor_network._iter_physical_links()): if ii in sites_idx_eff: continue if idx not in self.eff_ops[key]._link_inds: continue src_weights *= self.eff_ops[key]._weights[idx] src_sites.append(ii) src_ops.append(self.eff_ops[key]._operators[idx]) if len(dst_sites) != len(src_sites): continue if dst_sites != src_sites: continue if dst_ops != src_ops: continue # We actually can compress ... if status_tpo_id == 0: # For first combination, we also have to add tensor as copy (cannot # overwrite existing tensor as it might be used in another term) idx_tensor = self.eff_ops[pos_tuple]._operators[tpo_id] idx_tensor = self.eff_ops[pos_tuple].get_index_copy_operator( idx_tensor ) self.eff_ops[pos_tuple]._operators[tpo_id] = idx_tensor status_tpo_id = 1 status[tpo_id] = 1 if idx_tensor is None: raise QTeaLeavesError("Check algorithm.") status[idx] = 2 weight = src_weights / dst_weights idx_tensor_src = self.eff_ops[pos_tuple]._operators[idx] if weight != 0.0: self.eff_ops[pos_tuple]._tensors[idx_tensor].add_update( self.eff_ops[pos_tuple]._tensors[idx_tensor_src], factor_other=weight, ) self.eff_ops[pos_tuple]._weights[idx] = 0.0 self.eff_ops[pos_tuple]._operators[idx] = idx_tensor if status.get(tpo_id, 0) == 0: # Then, any compression is impossible here status[tpo_id] = 3 comp_list = [] for key, value in self.eff_ops[pos_tuple]._operators.items(): comp_list.append(value) nn_comp = len(set(comp_list)) self._compressible[pos_tuple] = nn_comp < nn_orig # Sanity check outgoing tensor self.eff_ops[pos_tuple].sanity_check()
[docs] def update_couplings(self, params): """Load couplings from an update params dictionary.""" self.site_terms.update_couplings(params) if self.do_update: # Transfer entries for ii, key in enumerate(self._tensor_network._iter_physical_links()): self[key] = self.site_terms[ii].copy() self[key].convert( self._tensor_network.dtype, self._tensor_network.device )
# print("update itposite", self[key]._local.elem)
[docs] def get_ctens_for_update(self, key, identifier): """ Extract a tensor for update of time-dependent coupling from cache. **Arguments** key : immutable Key from effective operator to be built serving as base key. identifier : Extension to identity step when building effective operator. **Returns** ctens : instance of :class:`ITPOTerm` or None Retrieve tensor from cache if updates are enabled. ukey : str or `None` Key for storing tensor again. """ if not self._enable_update: return None, None ukey = tuple(list(key) + [identifier]) ctens = self.eff_ops.get(ukey, None) if self._do_update: return ctens, ukey return None, ukey
[docs] def set_ctens_for_update(self, ukey, ctens): """Set a tensor for time-dependent coupling updates (if enabled).""" if not self._enable_update: return self.eff_ops[ukey] = ctens
[docs] def add_dense_mpo_list(self, dense_mpo_list): """Add terms from a :class:`DenseMPOList` to the iTPO sites.""" self.site_terms.add_dense_mpo_list(dense_mpo_list)
[docs] def to_dense_mpo_list(self, params, do_initialize=True): """Return the dense MPO form of the site terms, requires params dict.""" dense_mpo_list, operators = self.site_terms.to_dense_mpo_list(params) if do_initialize: dense_mpo_list.initialize(operators, params) return dense_mpo_list
[docs] def setup_as_eff_ops(self, tensor_network, measurement_mode=False): """Set this sparse MPO as effective ops in TN and initialize.""" self._tensor_network = tensor_network for ii, key in enumerate(tensor_network._iter_physical_links()): self[key] = self.site_terms[ii].copy() self[key].convert( dtype=tensor_network.dtype, device=tensor_network.tensor_backend.memory_device, ) # Last chance to change iso center if tensor_network.iso_center is None: logger.warning("Isometrizing TN on the fly in `build_effective_operators`.") tensor_network.isometrize_all() if tensor_network.iso_center != tensor_network.default_iso_pos: tensor_network.iso_towards(tensor_network.default_iso_pos) if tensor_network.extension in ["tto", "lptn"]: for term in self.site_terms: term._apply_kraus = term._local_oqs.copy() tensor_network.eff_op = self tensor_network.build_effective_operators(measurement_mode=measurement_mode)
[docs] def values(self): # -> dict_values: """Return the values stored in the effective ops dictionary. Returns: values (dict_values) """ return self.eff_ops.values()
[docs] def to_str(self): """Generate a string with information on the effective operators.""" str_buffer = "\n\n" + "=" * 80 + "\n" str_buffer += "iTPOs for effective operators\n" str_buffer += "Sites:\n" ind4 = " " * 4 tracker = {} for ii, key in enumerate(self._tensor_network._iter_physical_links()): str_buffer += ind4 + f"site ii = {ii}" str_buffer += self.eff_ops[key].to_str(ind_offset=4) str_buffer += ind4 + "-" * 40 + "\n\n" tracker[key] = None for key, value in self.eff_ops.items(): if key in tracker: continue str_buffer += f"Link {str(key)}\n" str_buffer += value.to_str(ind_offset=4) str_buffer += ind4 + "-" * 40 + "\n\n" str_buffer += "=" * 80 + "\n\n" return str_buffer
[docs] def get_local_kraus_operators(self, dt: float) -> dict[int, _AbstractQteaTensor]: """ Constructs local Kraus operators from local Lindblad operators. ------- Parameters ------- dt : float, timestep Returns ------- kraus_ops : dict of :py:class:`_AbstractQTeaTensor` Dictionary, keys are site indices and elements the corresponding 3-leg kraus tensors """ kraus_ops = {} if self._tensor_network is None: tensor_backend = TensorBackend() else: tensor_backend = self._tensor_network.tensor_backend if self._tensor_network.has_symmetry: raise QTeaLeavesError("Not implemented for symmetries...") # pylint: disable-next=redefined-builtin tabs, tdiag, tsqrt = ( self.site_terms[0]._local_ops[0].get_attr("abs", "diag", "sqrt") ) for site_idx, siteterm in enumerate(self.site_terms): if not any(siteterm._apply_kraus): continue key = site_idx dim = siteterm._local_ops[0].shape[1] lindblad_super = tensor_backend.tensor_cls( links=[dim**2, dim**2], ctrl="Z", dtype=tensor_backend.dtype, device=tensor_backend.memory_device, ) for idx, operator in enumerate(siteterm._local_ops): if siteterm._apply_kraus[idx]: prefactor = siteterm._local_prefactors_num[idx] pstrength = siteterm._local_pstrengths[idx] lindblad_op = ( prefactor * pstrength * operator.reshape([operator.shape[1], operator.shape[2]]) ) lindblad_super.convert(device=lindblad_op.device) # construct lindblad super-operator id_dim = tensor_backend.tensor_cls( links=[dim, dim], ctrl="1", dtype=tensor_backend.dtype, device=lindblad_op.device, ) ll_term1 = lindblad_op.kron(lindblad_op.conj()) ll_term2 = ( lindblad_op.conj() .transpose((1, 0)) .tensordot(lindblad_op, contr_idx=[[1], [0]]) .kron(id_dim) ) ll_term3 = id_dim.kron( lindblad_op.transpose((1, 0)).tensordot( lindblad_op.conj(), contr_idx=[[1], [0]] ) ) lindblad_super += ll_term1 - 0.5 * ll_term2 - 0.5 * ll_term3 lindblad_super = lindblad_super.expm(prefactor=dt) lindblad_super.reshape_update((dim, dim, dim, dim)) lindblad_super.transpose_update((0, 2, 1, 3)) lindblad_super.reshape_update((dim**2, dim**2)) lam, vecs = lindblad_super.eig() inds = [] for indx, lam_indx in enumerate(lam.elem): if tabs(lam_indx) > lam.dtype_eps: inds.append(indx) lam._elem = tdiag(tsqrt(lam.elem)) kraus_tensor = vecs.conj().tensordot(lam, contr_idx=[[1], [0]]) kraus_tensor.reshape_update((dim, dim, dim**2)) kraus_tensor.transpose_update((2, 0, 1)) kraus_tensor = kraus_tensor[inds, :, :] kraus_ops[key] = kraus_tensor return kraus_ops