Source code for qtealeaves.observables.custom_correlation

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Custom observable
"""

# pylint: disable=too-many-locals
# pylint: disable=too-many-instance-attributes

import logging
from collections.abc import Generator, Iterator, Sequence
from typing import TYPE_CHECKING, Any, Self, cast

import h5py
import numpy as np

from qtealeaves.emulator import ATTN, MPS, TTN, TTO
from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError, map_selector

from .tnobase import _TNObsBase

if TYPE_CHECKING:
    from qtealeaves.abstracttns.abstract_tn import _AbstractTN
    from qtealeaves.operators import TNOperators
    from qtealeaves.tensors import TensorBackend
    from qtealeaves.tooling import HilbertCurveMap
else:
    _AbstractTN = Any
    TNOperators = Any
    TensorBackend = Any
    HilbertCurveMap = Any

__all__ = ["CustomCorrelation"]
logger = logging.getLogger(__name__)


[docs] class CustomCorrelation(_TNObsBase): """ The custom observable measures the term of arbitrary size over the arbitrary positions. Thus, the observable is of type ``<A_i B_j ... C_k>``. The results are stored in 1D array, with the order as provided in input. All operators in the measured n-body term must be placed on different sites, i != j != ... != k". If the system is 2d or 3d, user can provide the positions of measurements either in the already flattened, i.e. 1d indexing, or as 2d/3d indexing. In the latter case, additional arguments (see below) must be provided. Arguments --------- name : str Define a label under which we can find the observable in the result dictionary. operators : list (of length term_size) of strings Identifiers/strings for the operators to be measured. site_indices : list of lists On which sites we want to measure specified operators. E.g. if we want to measure 2-body correlators on sites [1,2] and [3,4], `site_indices=[[1,2],[3,4]]`. REMARK: counting starts at 0. batch_size : int | None, optional None measures with a single iTPO. Any integer will be interpreted as batch size after every batch_size entries a new iTPO is created. The batch size addresses the problem of memory needs of the iTPO correlation measurement with all terms at once; lower batch sizes reduce the memory cost and increasing the computational cost to some extent. Default to None (Measure all correlations in one iTPO). **If working with 2d/3d systems:** dim : 1, 2, or 3, optional Dimensionality of the lattice (1d, 2d, 3d) Default is 1. mapping : string or instance of :py:class:`HilbertCurveMap`, optional If dim != 1, which 2d/3d to 1d mapping to use. Possible inputs are: 'HilbertCurveMap', 'SnakeMap', and 'ZigZagMap'. Default is 'HilbertCurveMap'. lattice : list, optional If working with 2d or 3d systems, a lattice size must be given to compute the corresponding mapping to 1d. """ _measurable_ansaetze = (MPS, TTN, TTO, ATTN) # pylint: disable-next=too-many-arguments def __init__( self, name: str, operators: list[str], site_indices: list[list[int]] | list[list[Sequence[int]]] | None, batch_size: int | None = None, dim: int | None = 1, mapping: "str | HilbertCurveMap" = "HilbertCurveMap", lattice: list[int] | None = None, ): super().__init__(name) # store the parameters into lists to be able to # handle multiple measurements via __iadd__, except # for max_term_size which is a collective property if operators is not None: # how many sites per observable term, e.g. two-body or three-body term. self.term_size = [len(operators)] # max term size if there are more than one custom observable # defined in the simulation - look at __iadd__ function self.max_term_size = self.term_size[0] # run the input checks if self.term_size[0] == 1: raise ValueError( "We do not measure local terms as a custom correlations." ) else: self.term_size = [0] self.max_term_size = 0 self.operators = [operators] self.batch_size = [batch_size] # handling 2D and 3D if (dim is not None) and (site_indices is not None): # run the input checks if isinstance(site_indices[0][0], list) and (dim == 1): dim_sites = len(site_indices[0][0]) raise ValueError( f"Dimension of the given site indices input, dim = {dim_sites}, " "does not correspond to the 1d system." ) if dim > 1: if isinstance(site_indices[0][0], int): raise ValueError( "For 2d or 3d systems, the site indices must be given " "as 2d/3d indices, e.g. [[(0,0),(0,1)],[(1,0),(1,1)]]." ) if lattice is None: raise ValueError( "If lattice dimensionality is >1, the lattice size must " "be given as the input." ) # mypy cannot infer the type here, so we help it with a cast. # Another option would be to use an assert statement. site_indices = cast(list[list[Sequence[int]]], site_indices) # if system dimensionality is 2d or 3d, the site indices array needs # to be flattened to 1d site_indices = self.get_sites_1d(site_indices, dim, lattice, mapping) # check if operators match number of specified sites if site_indices is not None: if any(len(inds) != len(operators) for inds in site_indices): raise ValueError( "Must provide one operator per site in measurement positions." ) self.site_indices = [site_indices] else: # if site_indices is None, we cannot measure anything self.site_indices = []
[docs] @classmethod def empty(cls) -> Self: """ Documentation see :func:`_TNObsBase.empty`. """ obj = cls("", [], None) obj.name = [] obj.term_size = [] obj.operators = [] obj.site_indices = [] obj.batch_size = [] return obj
[docs] def measure( self, state: _AbstractTN, operators: TNOperators, **kwargs: Any ) -> dict[str, Any]: """ Documentation see :func:`_TNObsBase.measure`. """ if len(self.name) == 0: return self.results_buffer if not self.check_measurable(state.__class__): logger.warning( "Observable %s not measurable for %s ansatz.", self.name, str(state) ) return self.results_buffer params = kwargs.get("params", None) if params is None and state.has_de: raise QTeaLeavesError( "Parameters are needed for disentangling layers, but not provided." ) # Why only the effective projectors, but not the effective operators. While # isometrizing in meas_tensor_product, we still propagate them through? Why? tmp_eff_proj = state.eff_proj state.eff_proj = [] ini_iso_pos = state.iso_center state.iso_towards(state.default_iso_pos, keep_singvals=True, trunc=True) hmpo = state.eff_op # Custom_results is the container for the results. # It is a list of np arrays, whose shapes match the length of each measurement. custom_results: list[np.ndarray] = [ np.zeros(len(sites), dtype=np.complex128) for sites in self.site_indices ] # convert the correlators to itpo for measurement # See below for meaning of ndx_site and ndx_meas. ndx_site = 0 ndx_meas = 0 for itpo in self.to_itpo(operators, state.tensor_backend, state.num_sites): if state.has_de: # help the type checker a bit state = cast(ATTN, state) # include the disentanglers into the itpo itpo = state.de_layer.contract_de_layer( itpo, state.tensor_backend, params ) itpo.set_meas_status(do_measurement=True) state.eff_op = None # type: ignore[assignment] itpo.setup_as_eff_ops(state, measurement_mode=True) # Retrieve results from itpo measurement dict_by_tpo_id = itpo.collect_measurements() # Sort the entries of the dictionary according to the key. # This makes the loop below easier. list_by_tpo_id = [dict_by_tpo_id[key] for key in sorted(dict_by_tpo_id)] # The results are filled in one-by-one to support batches # dict_by_tpo_id is a dictionary with all resuts. # This loop accumulates the results into the proper # arrays in the custom_results list. # ndx_meas counts the different correlation measurements # ndx_site counts the entries for each measurement for result in list_by_tpo_id: custom_results[ndx_meas][ndx_site] = result ndx_site += 1 # When you get to the end of one observable, # reset ndx_site and increase kk to the next one. if ndx_site == len(self.site_indices[ndx_meas]): name_obs = self.name[ndx_meas] self.results_buffer[name_obs] = custom_results[ndx_meas] ndx_site = 0 ndx_meas += 1 # Migrate counter if same MPO type if isinstance(hmpo, ITPO): hmpo.add_contraction_counters(state.eff_op) if (np.array(state.iso_center) != np.array(state.default_iso_pos)).any(): raise QTeaLeavesError( "Iso center moved! Cannot re-install Hamiltonian MPO." ) state.eff_op = hmpo # restore the effective projectors if ini_iso_pos is not None: state.iso_towards(ini_iso_pos) state.eff_proj = tmp_eff_proj return self.results_buffer
def __iadd__(self, other: Any) -> Self: """ Documentation see :func:`_TNObsBase.__iadd__`. """ if isinstance(other, CustomCorrelation): self.name += other.name self.max_term_size = max(self.max_term_size, other.max_term_size) self.term_size += other.term_size self.operators += other.operators self.batch_size += other.batch_size self.site_indices += other.site_indices else: raise QTeaLeavesError( f"__iadd__ not defined for types {type(self)} and {type(other)}." ) return self
[docs] def collect_operators(self) -> Iterator[tuple[str, str]]: """ Documentation see :func:`_TNObsBase.collect_operators`. In the case of n-body operators with n>2, we choose that the bulk operators have label 'r'. However, note that this has limitations if the symmetry number changes. """ for elem in self.operators: yield (elem[0], "l") for subelem in elem[1:-1]: yield (subelem, "r") yield (elem[-1], "r")
[docs] def to_itpo( self, operators: "TNOperators", tensor_backend: "TensorBackend", num_sites: int ) -> Generator[ITPO, None, None]: """ Return an ITPO representing the custom correlation observable. Since custom corr don't handle diagonal terms, the function is same for TTN and aTTN. Parameters ---------- operators: TNOperators The operator class tensor_backend: instance of `TensorBackend` Tensor backend of the simulation num_sites: int Number of sites of the state Returns ------- ITPO The ITPO class """ dense_mpo_list = DenseMPOList() for kk, _ in enumerate(self.name): num_meas = len(self.site_indices[kk]) for ii in range(num_meas): position = self.site_indices[kk][ii] if len(position) != len(np.unique(np.array(position))): raise ValueError( f"For now cannot measure local (diagonal) terms with " f"custom correlations. All sites within a term must " f"be different. Got {position}" ) mpo_sites = [] for jj, key_op in enumerate(self.operators[kk]): site = MPOSite( position[jj], key_op, 1.0, 1.0, operators=operators, params={} ) mpo_sites.append(site) # Checked manually, must be a linter-problem with double-inheritance # pylint: disable-next=abstract-class-instantiated dense_mpo = DenseMPO(mpo_sites, tensor_backend=tensor_backend) dense_mpo_list.append(dense_mpo) # There are two conditions under which we want to yield: # 1) If the batch_size is filled. # 2) If batch_size is specified, and we are in the last point of the iteration. # The second enforces that each custom correlation is measured separately. # We assume this makes sense if the user asks for batches. condition1 = len(dense_mpo_list) == self.batch_size[kk] condition2 = self.batch_size[kk] is not None and ii == num_meas - 1 if condition1 or condition2: dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo # Start with new empty dense_mpo_list = DenseMPOList() if len(dense_mpo_list) == 0: # We finished exactly on the last term return # Sites are not ordered and we have to make links match anyways dense_mpo_list = dense_mpo_list.sort_sites() itpo = ITPO(num_sites) itpo.add_dense_mpo_list(dense_mpo_list) yield itpo
[docs] @staticmethod def get_sites_1d( site_indices: list[list[Sequence[int]]], dim: int, lattice: list[int] | int, mapping: "str | HilbertCurveMap", ) -> list[list[int]]: """ In the case of 2D, or 3D input of the measurement position sites, give back the flattened (mapped to 1d) array for measurement position sites. Parameters ---------- site_indices : list of measurement positions On which sites we want to measure specified operators. REMARK: counting starts at 0. dim : 2 or 3 Dimensionality of the lattice (2d, 3d) lattice : list, optional Lattice size. mapping : string or instance of :py:class:`HilbertCurveMap` If dim != 1, which 2d/3d to 1d mapping to use. Possible inputs are: 'HilbertCurveMap', 'SnakeMap', and 'ZigZagMap'. """ # get the mapping map_indices = map_selector(dim, lattice, mapping) # define the new flattened site index list site_indices_flattened = [] for sites in site_indices: # get the sites for each subterm sites_ii = [] for site in sites: sites_ii.append(map_indices[tuple(site)]) site_indices_flattened.append(sites_ii) return site_indices_flattened
[docs] def write_results( self, fh: h5py.File, state_ansatz: type[_AbstractTN], **kwargs: Any ) -> None: """ See :func:`_TNObsBase.write_results`. """ is_measured = self.check_measurable(state_ansatz) fg = fh.create_group(str(self), track_order=True) fg.attrs["is_measured"] = is_measured if is_measured: for name in self.name: res = self.results_buffer[name] if np.any(np.abs(np.imag(res)) > 1e-12): # write as complex fg.create_dataset(name, data=res) else: # write as real fg.create_dataset(name, data=np.real(res)) self.results_buffer = {}