# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
The correlation observable measures the correlation between
two operators. It cannot be used for fermionic operators which
require a Jordan-Wigner transformation. Thus, the correlation
is of type ``<A_i B_j>``. The index ``i`` is running over
the rows of a matrix, the index ``jj`` over the columns.
( <A_1 B_1> <A_1 B_2> <A_1 B_3> ... )
( <A_2 B_1> <A_2 B_2> <A_2 B_3> ... )
( <A_3 B_1> <A_3 B_2> <A_3 B_3> ... )
( ... ... ... ... )
"""
# pylint: disable=too-many-locals
import logging
from typing import TYPE_CHECKING, Any, Iterator, Self
import h5py
import numpy as np
from qtealeaves.emulator import ATTN, MPS, TTN, TTO, StateVector
from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError
from .tnobase import _TNObsBase
if TYPE_CHECKING:
from qtealeaves.abstracttns.abstract_tn import _AbstractTN
from qtealeaves.mpos.disentangler import DELayer
from qtealeaves.operators import TNOperators
from qtealeaves.tensors import TensorBackend
else:
_AbstractTN = Any
TNOperators = Any
DELayer = Any
TensorBackend = Any
__all__ = ["Correlation"]
logger = logging.getLogger(__name__)
[docs]
class Correlation(_TNObsBase):
"""
The correlation observable measures the correlation between
two operators. It cannot be used for fermionic operators which
require a Jordan-Wigner transformation. Thus, the correlation
is of type ``<A_i B_j>``. The index ``i`` is running over
the rows of a matrix, the index ``jj`` over the columns.
.. code-block::
( <A_1 B_1> <A_1 B_2> <A_1 B_3> ... )
( <A_2 B_1> <A_2 B_2> <A_2 B_3> ... )
( <A_3 B_1> <A_3 B_2> <A_3 B_3> ... )
( ... ... ... ... )
**Arguments**
name : str
Define a label under which we can find the observable in the
result dictionary.
operators : list of two strings
Identifiers/strings for the operators to be measured.
batch_size : int | None, optional
None measures with a single iTPO. Any integer will be interpreted
as batch size after every batch_size entries a new iTPO is created.
The batch size addresses the problem of memory needs of the iTPO
correlation measurement with all terms at once; lower batch sizes
reduce the memory cost and increasing the computational cost to
some extent.
Default to None (Measure all correlations in one iTPO).
"""
# mypy triggered because of StateVector
_measurable_ansaetze = (MPS, TTN, TTO, ATTN, StateVector) # type: ignore[assignment]
def __init__(self, name: str, operators: list[str], batch_size: int | None = None):
super().__init__(name)
self.operators = [operators]
self.batch_size = [batch_size]
[docs]
@classmethod
def empty(cls) -> Self:
"""
Documentation see :func:`_TNObsBase.empty`.
"""
obj = cls("", [])
obj.name = []
obj.operators = []
obj.batch_size = []
return obj
def __iadd__(self, other: Any) -> Self:
"""
Documentation see :func:`_TNObsBase.__iadd__`.
"""
if isinstance(other, Correlation):
self.name += other.name
self.operators += other.operators
self.batch_size += other.batch_size
else:
raise QTeaLeavesError(
f"__iadd__ not defined for types {type(self)} and {type(other)}."
)
return self
# pylint: disable-next=too-many-branches,too-many-statements
[docs]
def measure(
self,
state: _AbstractTN,
operators: TNOperators,
**kwargs: Any,
) -> dict[str, Any]:
"""
Run the measurement of two-sites correlations on each pair of sites
of a given state and save it in the results buffer.
Args:
state : instance of class from :py:mod:`qtealeaves.emulator`
The state to measure the observable on.
operators : TNOperators
The operators
params : dict
Parameters for current simulation.
"""
if len(self.name) == 0:
return self.results_buffer
if not self.check_measurable(state.__class__):
logger.warning(
"Correlation observable %s is not measurable for %s",
self.name,
str(state),
)
return self.results_buffer
params = kwargs.get("params", None)
if params is None and state.has_de:
raise QTeaLeavesError(
"Parameters are needed for disentangling layers, but not provided."
)
# Why only the effective projectors, but not the effective operators. While
# isometrizing in meas_tensor_product, we still propagate them through? Why?
tmp_eff_proj = state.eff_proj
state.eff_proj = []
ini_iso_pos = state.iso_center
# Measure diagonal entries first (otherwise overwriting results
# in measurement mode error/warning)
corr_diag = []
for kk, name_kk in enumerate(self.name):
ops_list = []
for jj in range(state.num_sites):
op_a = operators[(jj, self.operators[kk][0])]
op_b = operators[(jj, self.operators[kk][1])]
if op_a.ndim == 2:
op_ab = op_a @ op_b
else:
# Assume rank-4 (but delta charge both times to the right
op_ab = op_a.tensordot(op_b, ([2], [1]))
op_ab.flip_links_update([0, 2])
op_ab.trace_one_dim_pair([0, 3])
op_ab.trace_one_dim_pair([1, 3])
ops_list.append(op_ab)
corr_diag_ii = np.zeros(state.num_sites, dtype=np.complex128)
for ii, elem in enumerate(state.meas_local(ops_list)):
# if aTTN, then diagonal entries on sites
# with disentanglers are not local anymore, so skip
if state.has_de and (ii in state.de_layer.de_sites): # type: ignore[attr-defined]
continue
corr_diag_ii[ii] = elem
corr_diag.append(corr_diag_ii)
state.iso_towards(state.default_iso_pos, keep_singvals=True, trunc=True)
hmpo = state.eff_op
# convert the correlation matrix to iTPOS for measurement
de_layer = state.de_layer if state.has_de else None # type: ignore[attr-defined]
kk = 0
name_kk = self.name[kk]
corr_kk = np.zeros((state.num_sites, state.num_sites), dtype=np.complex128)
ii = 0
jj = 0
for itpo in self.to_itpo(
operators, state.tensor_backend, state.num_sites, de_layer=de_layer
):
if state.has_de:
itpo = state.de_layer.contract_de_layer( # type: ignore[attr-defined]
itpo, state.tensor_backend, params
)
itpo.set_meas_status(do_measurement=True)
state.eff_op = None # type: ignore[assignment]
itpo.setup_as_eff_ops(state, measurement_mode=True)
# Retrieve results from itpo measurement
dict_by_tpo_id = itpo.collect_measurements()
itpo_batch_size = len(dict_by_tpo_id)
idx = 0
while idx < itpo_batch_size:
# skip the diag terms (for aTTN diag terms without the disentangler)
# pylint: disable-next=line-too-long
if ii == jj and not (state.has_de and (ii in state.de_layer.de_sites)): # type: ignore[attr-defined]
pass
else:
corr_kk[ii, jj] = dict_by_tpo_id[idx]
idx += 1
jj += 1
if jj == state.num_sites:
jj = 0
ii += 1
if ii == state.num_sites:
# get the diagonal terms from local measurements
for xx in range(state.num_sites):
# pylint: disable-next=line-too-long
if state.has_de and (xx in state.de_layer.de_sites): # type: ignore[attr-defined]
continue
corr_kk[xx, xx] = corr_diag[kk][xx]
self.results_buffer[name_kk] = corr_kk
ii = 0
kk += 1
if kk < len(self.name[kk]):
name_kk = self.name[kk]
corr_kk = np.zeros(
(state.num_sites, state.num_sites), dtype=np.complex128
)
else:
name_kk = None # type: ignore[assignment]
corr_kk = None # type: ignore[assignment]
if name_kk is not None:
# The last term has not been stored yet, which is usually the case
# as it is a diagonal terms and has only an iTPO measurement for
# aTTNs and similar: get the diagonal terms from local measurements
for xx in range(state.num_sites):
# pylint: disable-next=line-too-long
if state.has_de and (xx in state.de_layer.de_sites): # type: ignore[attr-defined]
continue
corr_kk[xx, xx] = corr_diag[kk][xx]
self.results_buffer[name_kk] = corr_kk
# Migrate counter if same MPO type
if isinstance(hmpo, ITPO):
hmpo.add_contraction_counters(state.eff_op)
if (np.array(state.iso_center) != np.array(state.default_iso_pos)).any():
raise QTeaLeavesError(
"Iso center moved.! Cannot re-install Hamiltonian MPO."
)
state.eff_op = hmpo
# restore the effective projectors
if ini_iso_pos is not None:
state.iso_towards(ini_iso_pos)
state.eff_proj = tmp_eff_proj
return self.results_buffer
[docs]
def collect_operators(self) -> Iterator[tuple[str, str]]:
"""
Documentation see :func:`_TNObsBase.collect_operators`.
"""
for elem in self.operators:
yield (elem[0], "l")
yield (elem[1], "r")
[docs]
def to_itpo(
self,
operators: TNOperators,
tensor_backend: TensorBackend,
num_sites: int,
de_layer: DELayer | None = None,
) -> Iterator[ITPO]:
"""
Return an ITPO or ITPOs as iterator representing alltogether the
correlation observables. In the case of the aTTN the function takes
care of diagonal terms of corr matrix with the disentanglers on them.
These diagonal terms are stored in the order of looping over ii and jj.
The number of ITPOs can be tuned indirectly via the batch size, i.e.,
after each batch_size terms an ITPO is constructed and yielded plus
one with the potentially remaining terms.
Parameters
----------
operators: TNOperators
The operator class
tensor_backend: instance of `TensorBackend`
Tensor backend of the simulation
num_sites: int
Number of sites of the state
de_layer : DELayer or `None`, optional
Disentangler layer for which the iTPO layer is created
Default to `None` (standard TN with no disentanglers)
Returns
-------
ITPO
The ITPO class
Details
-------
ITPOs can be costly in memory and we are able to tune their memory
footprint. The batch size purely counts terms and can for example
contain contributions for two correlation measurements. An example
how to parse the output can be found in the `tn_simulations.py`.
"""
if de_layer is None:
yield from self._to_itpo(operators, tensor_backend, num_sites)
else:
yield from self._to_itpo_de_layer(
operators, tensor_backend, num_sites, de_layer
)
def _to_itpo(
self, operators: TNOperators, tensor_backend: TensorBackend, num_sites: int
) -> Iterator[ITPO]:
"""
Return an ITPO or ITPOs as iterator representing alltogether the
correlation observables.
The number of ITPOs can be tuned indirectly via the batch size, i.e.,
after each batch_size terms an ITPO is constructed and yielded plus
one with the potentially remaining terms.
Parameters
----------
operators: TNOperators
The operator class
tensor_backend: instance of `TensorBackend`
Tensor backend of the simulation
num_sites: int
Number of sites of the state
Returns
-------
ITPO
The ITPO class
"""
dense_mpo_list = DenseMPOList()
for kk, _ in enumerate(self.name):
key_a = self.operators[kk][0]
key_b = self.operators[kk][1]
for ii in range(num_sites):
for jj in range(num_sites):
if ii == jj:
continue
site_a = MPOSite(
ii, key_a, 1.0, 1.0, operators=operators, params={}
)
site_b = MPOSite(
jj, key_b, 1.0, 1.0, operators=operators, params={}
)
# Checked manually, must be a linter-problem with double-inheritance
# pylint: disable-next=abstract-class-instantiated
dense_mpo = DenseMPO(
[site_a, site_b], tensor_backend=tensor_backend
)
dense_mpo_list.append(dense_mpo)
if len(dense_mpo_list) == self.batch_size[kk]:
dense_mpo_list = dense_mpo_list.sort_sites()
itpo = ITPO(num_sites)
itpo.add_dense_mpo_list(dense_mpo_list)
yield itpo
# Start with new empty
dense_mpo_list = DenseMPOList()
if len(dense_mpo_list) == 0:
# We finished exactly on the last term
return
# Sites are not ordered and we have to make links match anyways
dense_mpo_list = dense_mpo_list.sort_sites()
itpo = ITPO(num_sites)
itpo.add_dense_mpo_list(dense_mpo_list)
yield itpo
def _to_itpo_de_layer(
self,
operators: TNOperators,
tensor_backend: TensorBackend,
num_sites: int,
de_layer: DELayer,
) -> Iterator[ITPO]:
"""
Return an ITPO or ITPOs as iterator representing alltogether the
correlation observables. In the case of the aTTN the function takes
care of diagonal terms of corr matrix with the disentanglers on them.
These diagonal terms are stored in the order of looping over ii and jj.
The number of ITPOs can be tuned indirectly via the batch size, i.e.,
after each batch_size terms an ITPO is constructed and yielded plus
one with the potentially remaining terms.
Parameters
----------
operators: TNOperators
The operator class
tensor_backend: instance of `TensorBackend`
Tensor backend of the simulation
num_sites: int
Number of sites of the state
de_layer : DELayer
Disentangler layer for which the iTPO layer is created
Returns
-------
ITPO
The ITPO class
"""
# only if aTTN
dense_mpo_list = DenseMPOList()
for kk, _ in enumerate(self.name):
key_a = self.operators[kk][0]
key_b = self.operators[kk][1]
# get all operators.ops[]
key_ab = None
key_id = None
for oset_name in operators.set_names:
op_a = operators[(oset_name, key_a)]
op_b = operators[(oset_name, key_b)]
# get the operator a x b for the diag entries
if op_a.ndim == 2:
op_ab = op_a @ op_b
else:
# Assume rank-4 (but delta charge both times to the right
# Assume that tensor_a and tensor_b have same link dimensions
op_ab = op_a.tensordot(op_b, ([2], [1]))
op_ab = op_ab.transpose([3, 0, 1, 4, 2, 5])
op_ab.fuse_links_update(4, 5)
op_ab.fuse_links_update(0, 1, False)
key_ab = str(id(op_ab)) if key_ab is None else key_ab
operators[(oset_name, key_ab)] = op_ab
# get the matching identity (is it not always in operators?)
op_identity = op_a.eye_like(op_a.links[1])
op_identity.attach_dummy_link(0, is_outgoing=False)
op_identity.attach_dummy_link(3, is_outgoing=True)
key_id = str(id(op_identity)) if key_id is None else key_id
operators[(oset_name, key_id)] = op_identity
# fill the itpo with correct operators on correct sites
for ii in range(num_sites):
for jj in range(num_sites):
if ii == jj and (ii in de_layer.de_sites):
# Diagonal entries of correlation matrix for sites that have
# a disentangler attached to it cannot longer be measured as
# local terms, and thus the appropriate identity operator needs
# to be added on the other site of the disentangler
# (Warning: if we add them here, we should more or less have control
# over the TPO-id, if they are added by the contract-DE-layer logic,
# the TPO-id will be added at the end messing up our loop)
where = np.where(de_layer.de_sites == ii)
# we assume max 1 disentangler per site
ind = [where[0][0], where[1][0]]
site_identity = de_layer.de_sites[ind[0], abs(ind[1] - 1)]
site_a = MPOSite(
ii, key_ab, 1.0, 1.0, operators=operators, params={}
)
site_b = MPOSite(
site_identity,
key_id,
1.0,
1.0,
operators=operators,
params={},
)
elif ii == jj:
continue
else:
site_a = MPOSite(
ii, key_a, 1.0, 1.0, operators=operators, params={}
)
site_b = MPOSite(
jj, key_b, 1.0, 1.0, operators=operators, params={}
)
# Checked manually, must be a linter-problem with double-inheritance
# pylint: disable-next=abstract-class-instantiated
dense_mpo = DenseMPO(
[site_a, site_b], tensor_backend=tensor_backend
)
dense_mpo_list.append(dense_mpo)
if len(dense_mpo_list) == self.batch_size[kk]:
dense_mpo_list = dense_mpo_list.sort_sites()
itpo = ITPO(num_sites)
itpo.add_dense_mpo_list(dense_mpo_list)
yield itpo
# Start with new empty
dense_mpo_list = DenseMPOList()
if len(dense_mpo_list) == 0:
# We finished exactly on the last term
return
# Sites within dense mpos are not ordered and we have to make links match anyways
dense_mpo_list = dense_mpo_list.sort_sites()
itpo = ITPO(num_sites)
itpo.add_dense_mpo_list(dense_mpo_list)
itpo.set_meas_status(do_measurement=True)
yield itpo
[docs]
def write_results(
self, fh: h5py.File, state_ansatz: type[_AbstractTN], **kwargs: Any
) -> None:
"""
See :func:`_TNObsBase.write_results`.
"""
is_measured = self.check_measurable(state_ansatz)
fg = fh.create_group(str(self), track_order=True)
fg.attrs["is_measured"] = is_measured
if is_measured:
for name in self.name:
res = self.results_buffer[name]
if np.any(np.abs(np.imag(res)) > 1e-12):
# write as complex
fg.create_dataset(name, data=res)
else:
# write as real
fg.create_dataset(name, data=np.real(res))
self.results_buffer = {}