Source code for qtealeaves.observables.correlation

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
The correlation observable measures the correlation between
two operators. It cannot be used for fermionic operators which
require a Jordan-Wigner transformation. Thus, the correlation
is of type ``<A_i B_j>``. The index ``i`` is running over
the rows of a matrix, the index ``jj`` over the columns.

( <A_1 B_1>   <A_1 B_2>   <A_1 B_3>   ... )
( <A_2 B_1>   <A_2 B_2>   <A_2 B_3>   ... )
( <A_3 B_1>   <A_3 B_2>   <A_3 B_3>   ... )
(    ...         ...         ...      ... )
"""

# pylint: disable=too-many-locals
import logging
from typing import TYPE_CHECKING, Any, Iterator, Self

import h5py
import numpy as np

from qtealeaves.emulator import ATTN, MPS, TTN, TTO, StateVector
from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError

from .tnobase import _TNObsBase

if TYPE_CHECKING:

    from qtealeaves.abstracttns.abstract_tn import _AbstractTN
    from qtealeaves.mpos.disentangler import DELayer
    from qtealeaves.operators import TNOperators
    from qtealeaves.tensors import TensorBackend

else:
    _AbstractTN = Any
    TNOperators = Any
    DELayer = Any
    TensorBackend = Any

__all__ = ["Correlation"]
logger = logging.getLogger(__name__)


[docs] class Correlation(_TNObsBase): """ The correlation observable measures the correlation between two operators. It cannot be used for fermionic operators which require a Jordan-Wigner transformation. Thus, the correlation is of type ``<A_i B_j>``. The index ``i`` is running over the rows of a matrix, the index ``jj`` over the columns. .. code-block:: ( <A_1 B_1> <A_1 B_2> <A_1 B_3> ... ) ( <A_2 B_1> <A_2 B_2> <A_2 B_3> ... ) ( <A_3 B_1> <A_3 B_2> <A_3 B_3> ... ) ( ... ... ... ... ) **Arguments** name : str Define a label under which we can find the observable in the result dictionary. operators : list of two strings Identifiers/strings for the operators to be measured. batch_size : int | None, optional None measures with a single iTPO. Any integer will be interpreted as batch size after every batch_size entries a new iTPO is created. The batch size addresses the problem of memory needs of the iTPO correlation measurement with all terms at once; lower batch sizes reduce the memory cost and increasing the computational cost to some extent. Default to None (Measure all correlations in one iTPO). """ # mypy triggered because of StateVector _measurable_ansaetze = (MPS, TTN, TTO, ATTN, StateVector) # type: ignore[assignment] def __init__(self, name: str, operators: list[str], batch_size: int | None = None): super().__init__(name) self.operators = [operators] self.batch_size = [batch_size]
[docs] @classmethod def empty(cls) -> Self: """ Documentation see :func:`_TNObsBase.empty`. """ obj = cls("", []) obj.name = [] obj.operators = [] obj.batch_size = [] return obj
def __iadd__(self, other: Any) -> Self: """ Documentation see :func:`_TNObsBase.__iadd__`. """ if isinstance(other, Correlation): self.name += other.name self.operators += other.operators self.batch_size += other.batch_size else: raise QTeaLeavesError( f"__iadd__ not defined for types {type(self)} and {type(other)}." ) return self # pylint: disable-next=too-many-branches,too-many-statements
[docs] def measure( self, state: _AbstractTN, operators: TNOperators, **kwargs: Any, ) -> dict[str, Any]: """ Run the measurement of two-sites correlations on each pair of sites of a given state and save it in the results buffer. Args: state : instance of class from :py:mod:`qtealeaves.emulator` The state to measure the observable on. operators : TNOperators The operators params : dict Parameters for current simulation. """ if len(self.name) == 0: return self.results_buffer if not self.check_measurable(state.__class__): logger.warning( "Correlation observable %s is not measurable for %s", self.name, str(state), ) return self.results_buffer params = kwargs.get("params", None) if params is None and state.has_de: raise QTeaLeavesError( "Parameters are needed for disentangling layers, but not provided." ) # Why only the effective projectors, but not the effective operators. While # isometrizing in meas_tensor_product, we still propagate them through? Why? tmp_eff_proj = state.eff_proj state.eff_proj = [] ini_iso_pos = state.iso_center # Measure diagonal entries first (otherwise overwriting results # in measurement mode error/warning) corr_diag = [] for kk, name_kk in enumerate(self.name): ops_list = [] for jj in range(state.num_sites): op_a = operators[(jj, self.operators[kk][0])] op_b = operators[(jj, self.operators[kk][1])] if op_a.ndim == 2: op_ab = op_a @ op_b else: # Assume rank-4 (but delta charge both times to the right op_ab = op_a.tensordot(op_b, ([2], [1])) op_ab.flip_links_update([0, 2]) op_ab.trace_one_dim_pair([0, 3]) op_ab.trace_one_dim_pair([1, 3]) ops_list.append(op_ab) corr_diag_ii = np.zeros(state.num_sites, dtype=np.complex128) for ii, elem in enumerate(state.meas_local(ops_list)): # if aTTN, then diagonal entries on sites # with disentanglers are not local anymore, so skip if state.has_de and (ii in state.de_layer.de_sites): # type: ignore[attr-defined] continue corr_diag_ii[ii] = elem corr_diag.append(corr_diag_ii) state.iso_towards(state.default_iso_pos, keep_singvals=True, trunc=True) hmpo = state.eff_op # convert the correlation matrix to iTPOS for measurement de_layer = state.de_layer if state.has_de else None # type: ignore[attr-defined] kk = 0 name_kk = self.name[kk] corr_kk = np.zeros((state.num_sites, state.num_sites), dtype=np.complex128) ii = 0 jj = 0 for itpo in self.to_itpo( operators, state.tensor_backend, state.num_sites, de_layer=de_layer ): if state.has_de: itpo = state.de_layer.contract_de_layer( # type: ignore[attr-defined] itpo, state.tensor_backend, params ) itpo.set_meas_status(do_measurement=True) state.eff_op = None # type: ignore[assignment] itpo.setup_as_eff_ops(state, measurement_mode=True) # Retrieve results from itpo measurement dict_by_tpo_id = itpo.collect_measurements() itpo_batch_size = len(dict_by_tpo_id) idx = 0 while idx < itpo_batch_size: # skip the diag terms (for aTTN diag terms without the disentangler) # pylint: disable-next=line-too-long if ii == jj and not (state.has_de and (ii in state.de_layer.de_sites)): # type: ignore[attr-defined] pass else: corr_kk[ii, jj] = dict_by_tpo_id[idx] idx += 1 jj += 1 if jj == state.num_sites: jj = 0 ii += 1 if ii == state.num_sites: # get the diagonal terms from local measurements for xx in range(state.num_sites): # pylint: disable-next=line-too-long if state.has_de and (xx in state.de_layer.de_sites): # type: ignore[attr-defined] continue corr_kk[xx, xx] = corr_diag[kk][xx] self.results_buffer[name_kk] = corr_kk ii = 0 kk += 1 if kk < len(self.name[kk]): name_kk = self.name[kk] corr_kk = np.zeros( (state.num_sites, state.num_sites), dtype=np.complex128 ) else: name_kk = None # type: ignore[assignment] corr_kk = None # type: ignore[assignment] if name_kk is not None: # The last term has not been stored yet, which is usually the case # as it is a diagonal terms and has only an iTPO measurement for # aTTNs and similar: get the diagonal terms from local measurements for xx in range(state.num_sites): # pylint: disable-next=line-too-long if state.has_de and (xx in state.de_layer.de_sites): # type: ignore[attr-defined] continue corr_kk[xx, xx] = corr_diag[kk][xx] self.results_buffer[name_kk] = corr_kk # Migrate counter if same MPO type if isinstance(hmpo, ITPO): hmpo.add_contraction_counters(state.eff_op) if (np.array(state.iso_center) != np.array(state.default_iso_pos)).any(): raise QTeaLeavesError( "Iso center moved.! Cannot re-install Hamiltonian MPO." ) state.eff_op = hmpo # restore the effective projectors if ini_iso_pos is not None: state.iso_towards(ini_iso_pos) state.eff_proj = tmp_eff_proj return self.results_buffer
[docs] def collect_operators(self) -> Iterator[tuple[str, str]]: """ Documentation see :func:`_TNObsBase.collect_operators`. """ for elem in self.operators: yield (elem[0], "l") yield (elem[1], "r")
[docs] def to_itpo( self, operators: TNOperators, tensor_backend: TensorBackend, num_sites: int, de_layer: DELayer | None = None, ) -> Iterator[ITPO]: """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. In the case of the aTTN the function takes care of diagonal terms of corr matrix with the disentanglers on them. These diagonal terms are stored in the order of looping over ii and jj. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state de_layer : DELayer or `None`, optional Disentangler layer for which the iTPO layer is created Default to `None` (standard TN with no disentanglers) Returns ------- ITPO The ITPO class Details ------- ITPOs can be costly in memory and we are able to tune their memory footprint. The batch size purely counts terms and can for example contain contributions for two correlation measurements. An example how to parse the output can be found in the `tn_simulations.py`. """ if de_layer is None: yield from self._to_itpo(operators, tensor_backend, num_sites) else: yield from self._to_itpo_de_layer( operators, tensor_backend, num_sites, de_layer )
def _to_itpo( self, operators: TNOperators, tensor_backend: TensorBackend, num_sites: int ) -> Iterator[ITPO]: """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state Returns ------- ITPO The ITPO class """ dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): key_a = self.operators[kk][0] key_b = self.operators[kk][1] for ii in range(num_sites): for jj in range(num_sites): if ii == jj: continue site_a = MPOSite( ii, key_a, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( jj, key_b, 1.0, 1.0, operators=operators, params={} ) # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO( [site_a, site_b], tensor_backend=tensor_backend ) dense_mpo_list.append(dense_mpo) if len(dense_mpo_list) == self.batch_size[kk]: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo def _to_itpo_de_layer( self, operators: TNOperators, tensor_backend: TensorBackend, num_sites: int, de_layer: DELayer, ) -> Iterator[ITPO]: """ Return an ITPO or ITPOs as iterator representing alltogether the correlation observables. In the case of the aTTN the function takes care of diagonal terms of corr matrix with the disentanglers on them. These diagonal terms are stored in the order of looping over ii and jj. The number of ITPOs can be tuned indirectly via the batch size, i.e., after each batch_size terms an ITPO is constructed and yielded plus one with the potentially remaining terms. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state de_layer : DELayer Disentangler layer for which the iTPO layer is created Returns ------- ITPO The ITPO class """ # only if aTTN dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): key_a = self.operators[kk][0] key_b = self.operators[kk][1] # get all operators.ops[] key_ab = None key_id = None for oset_name in operators.set_names: op_a = operators[(oset_name, key_a)] op_b = operators[(oset_name, key_b)] # get the operator a x b for the diag entries if op_a.ndim == 2: op_ab = op_a @ op_b else: # Assume rank-4 (but delta charge both times to the right # Assume that tensor_a and tensor_b have same link dimensions op_ab = op_a.tensordot(op_b, ([2], [1])) op_ab = op_ab.transpose([3, 0, 1, 4, 2, 5]) op_ab.fuse_links_update(4, 5) op_ab.fuse_links_update(0, 1, False) key_ab = str(id(op_ab)) if key_ab is None else key_ab operators[(oset_name, key_ab)] = op_ab # get the matching identity (is it not always in operators?) op_identity = op_a.eye_like(op_a.links[1]) op_identity.attach_dummy_link(0, is_outgoing=False) op_identity.attach_dummy_link(3, is_outgoing=True) key_id = str(id(op_identity)) if key_id is None else key_id operators[(oset_name, key_id)] = op_identity # fill the itpo with correct operators on correct sites for ii in range(num_sites): for jj in range(num_sites): if ii == jj and (ii in de_layer.de_sites): # Diagonal entries of correlation matrix for sites that have # a disentangler attached to it cannot longer be measured as # local terms, and thus the appropriate identity operator needs # to be added on the other site of the disentangler # (Warning: if we add them here, we should more or less have control # over the TPO-id, if they are added by the contract-DE-layer logic, # the TPO-id will be added at the end messing up our loop) where = np.where(de_layer.de_sites == ii) # we assume max 1 disentangler per site ind = [where[0][0], where[1][0]] site_identity = de_layer.de_sites[ind[0], abs(ind[1] - 1)] site_a = MPOSite( ii, key_ab, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( site_identity, key_id, 1.0, 1.0, operators=operators, params={}, ) elif ii == jj: continue else: site_a = MPOSite( ii, key_a, 1.0, 1.0, operators=operators, params={} ) site_b = MPOSite( jj, key_b, 1.0, 1.0, operators=operators, params={} ) # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO( [site_a, site_b], tensor_backend=tensor_backend ) dense_mpo_list.append(dense_mpo) if len(dense_mpo_list) == self.batch_size[kk]: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites within dense mpos are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) itpo.set_meas_status(do_measurement=True) yield itpo
[docs] def write_results( self, fh: h5py.File, state_ansatz: type[_AbstractTN], **kwargs: Any ) -> None: """ See :func:`_TNObsBase.write_results`. """ is_measured = self.check_measurable(state_ansatz) fg = fh.create_group(str(self), track_order=True) fg.attrs["is_measured"] = is_measured if is_measured: for name in self.name: res = self.results_buffer[name] if np.any(np.abs(np.imag(res)) > 1e-12): # write as complex fg.create_dataset(name, data=res) else: # write as real fg.create_dataset(name, data=np.real(res)) self.results_buffer = {}