# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
Observable to measure the weighted sum of tensor product observables
"""
import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Self
import numpy as np
from qtealeaves.emulator import ATTN, MPS, TTN, TTO
from qtealeaves.mpos import ITPO, DenseMPO, DenseMPOList, MPOSite
from qtealeaves.tooling import QTeaLeavesError
from .tensor_product import TensorProduct
from .tnobase import _TNObsBase
if TYPE_CHECKING:
from qtealeaves.abstracttns.abstract_tn import _AbstractTN
from qtealeaves.operators.tnoperators import TNOperators
from qtealeaves.tensors import TensorBackend
else:
_AbstractTN = Any
TNOperators = Any
TensorBackend = Any
__all__ = ["WeightedSum"]
logger = logging.getLogger(__name__)
[docs]
class WeightedSum(_TNObsBase):
r"""
Class to measure observables which is the weighted sum of tensor product,
which means of the type
.. math::
O = \sum_{i=0}^m \alpha_i\left( o_1^i\otimes o_2^i \otimes \dots \otimes o_n^i
\right)
where :math:`m` is the number of addends and :math:`n` the number of sites.
For further informations about the single observable
:math:`O_i=o_1^i\otimes o_2^i \otimes \dots \otimes o_n^i` see the documentation
of :class:`TNObsTensorProduct`.
The output of the measurement will be a dictionary where:
- The key is the `name` of the observable
- The value is its expectation value
An example of this observable are Pauli decompositions of Hamiltonian, i.e.
Hamiltonians written as a weighted sum of tensor product operators formed
by Pauli matrices.
They are usually used in the Quantum chemistry applications, such as
the Variational Quantum Eigensolver.
Parameters
----------
name: str
Name to identify the observable
tp_operators: :class:`TNObsTensorProduct`
Tensor product observables. Its length, i.e. the number of tensor product
observables contained in it, should be the same of the number of complex
coefficients.
coeffs: list of complex
Coefficients of the weighted sum for each tp_operators
use_itpo: bool, optional
If True, measure using ITPO. Default to False. Consumed in python.
"""
_measurable_ansaetze = (MPS, TTN, TTO, ATTN)
def __init__(
self,
name: str,
tp_operators: TensorProduct,
coeffs: Sequence[complex] | np.ndarray | complex,
use_itpo: bool = False,
):
if not isinstance(coeffs, (Sequence, np.ndarray)):
coeffs = [coeffs]
self.tp_operators = [tp_operators]
self.coeffs = [coeffs]
self.use_itpo = use_itpo
_TNObsBase.__init__(self, name)
[docs]
@classmethod
def empty(cls) -> Self:
"""
Documentation see :func:`_TNObsBase.empty`.
"""
obj = cls("", TensorProduct.empty(), 0)
obj.name = []
obj.tp_operators = []
obj.coeffs = []
obj.use_itpo = True
return obj
def __iadd__(self, other: Any) -> Self:
"""
Documentation see :func:`_TNObsBase.__iadd__`.
"""
if isinstance(other, WeightedSum):
self.name += other.name
self.tp_operators += other.tp_operators
self.coeffs += other.coeffs
self.use_itpo = self.use_itpo and other.use_itpo
else:
raise QTeaLeavesError(
f"__iadd__ not defined for types {type(self)} and {type(other)}."
)
return self
# pylint: disable-next=too-many-locals
[docs]
def measure(
self, state: _AbstractTN, operators: TNOperators, **kwargs: Any
) -> dict[str, np.ndarray]:
"""
Documentation see :func:`_TNObsBase.measure`.
"""
if len(self.name) == 0:
return self.results_buffer
if not self.check_measurable(state.__class__):
logger.warning("Observable %s not measurable for %s", self.name, str(state))
return self.results_buffer
# Why only the effective projectors, but not the effective operators. While
# isometrizing in meas_tensor_product, we still propagate them through? Why?
tmp_eff_proj = state.eff_proj
state.eff_proj = []
ini_iso_pos = state.iso_center
if self.use_itpo:
state.iso_towards(state.default_iso_pos, keep_singvals=True, trunc=True)
hmpo = state.eff_op
itpo = self.to_itpo(operators, state.tensor_backend, state.num_sites)
state.eff_op = None # type: ignore[assignment]
itpo.setup_as_eff_ops(state, measurement_mode=True)
# Retrieve the results
dict_by_tpo_id = itpo.collect_measurements()
cnt = 0
for name, coeffs in zip(self.name, self.coeffs):
values = np.array(
[dict_by_tpo_id[ii] for ii in range(cnt, cnt + len(coeffs))]
)
self.results_buffer[name] = np.complex128(values.sum())
cnt += len(coeffs)
if (np.array(state.iso_center) != np.array(state.default_iso_pos)).any():
raise QTeaLeavesError(
"Iso center moved.! Cannot re-install Hamiltonian MPO."
)
state.eff_op = hmpo
else:
# Cycle over weighted sum observables
for name, coef, tp_ops in zip(self.name, self.coeffs, self.tp_operators):
op_string = []
idxs_string = []
if isinstance(tp_ops, list):
tp_op = tp_ops[0]
else:
tp_op = tp_ops
# Cycle over the TPO of a single weighted sum
for ops, sites in zip(tp_op.operators, tp_op.sites):
tp_operators = [
operators[(sites[jj], ii)] for jj, ii in enumerate(ops)
]
idxs_string.append(sites)
op_string.append(tp_operators)
self.results_buffer[name] = np.complex128(
# pylint: disable-next=line-too-long
state.meas_weighted_sum(op_string, idxs_string, coef) # type: ignore[attr-defined]
)
if ini_iso_pos is not None:
state.iso_towards(ini_iso_pos)
state.eff_proj = tmp_eff_proj
return self.results_buffer
[docs]
@classmethod
def from_pauli_string(
cls,
name: str,
pauli_string: dict[str, Sequence[dict[str, Any]]],
use_itpo: bool = False,
) -> Self:
"""
Initialize the observable from a qiskit chemistry pauli string format.
First, outside of the function use the WeightedPauliOperator method to_dict()
and then give that dict as input to this function
Parameters
----------
name: str
Name of the observable
pauli_string: dict
Dictionary of pauli strings
use_itpo: bool, optional
If True, measure using ITPO. Default to False. Consumed in python.
Returns
-------
TNObsWeightedSum
The weighted sum observable initialized from the pauli dictionary
"""
assert (
"paulis" in pauli_string.keys()
), "Dictionary is not in pauli string format"
addends = pauli_string["paulis"]
coeffs = []
tp_operators = TensorProduct.empty()
# First, we look at each term in the weighted sum
for term in addends:
string = term["label"]
coef = term["coeff"]["real"] + 1j * term["coeff"]["imag"]
operators = []
sites = []
for idx, pauli in enumerate(string):
if pauli != "I":
operators.append(pauli)
sites.append(idx)
if len(sites) == 0:
operators.append("I")
sites.append(0)
tp_operators += TensorProduct(string, operators, sites)
coeffs.append(coef)
obs_wt = cls(name, tp_operators, coeffs, use_itpo)
return obs_wt
[docs]
def to_itpo(
self, operators: TNOperators, tensor_backend: TensorBackend, num_sites: int
) -> ITPO:
"""
Return an ITPO represented the weighted sum observable
Parameters
----------
operators: TNOperators
The operator class
tensor_backend: instance of `TensorBackend`
Tensor backend of the simulation
num_sites: int
Number of sites in the state to be measures
Returns
-------
ITPO
The ITPO class
"""
dense_mpo_list = DenseMPOList()
# Cycle over weighted sum observables
for coeffs, tp_ops in zip(self.coeffs, self.tp_operators):
if isinstance(tp_ops, list):
tp_ops = tp_ops[0]
# Cycle over the TPO of a single weighted sum
for ops, sites, coef in zip(tp_ops.operators, tp_ops.sites, coeffs):
mpo_sites_list = []
# Cycle over the different operators in a single TPO
for op_ii, site_ii in zip(ops, sites):
mpo_sites_list.append(
MPOSite(
site_ii, op_ii, 1.0, coef, operators=operators, params={}
)
)
# iTPO has local weights, need to set to one after first term
coef = 1.0
# Checked manually, must be a linter-problem with double-inheritance
# pylint: disable-next=abstract-class-instantiated
dense_mpo = DenseMPO(mpo_sites_list, tensor_backend=tensor_backend)
dense_mpo_list.append(dense_mpo)
if len(dense_mpo) > 0:
logger.warning("Adding length zero MPO to dense MPO list.")
# Sites are not ordered and we have to make links match anyways
dense_mpo_list = dense_mpo_list.sort_sites()
itpo = ITPO(num_sites)
itpo.add_dense_mpo_list(dense_mpo_list)
itpo.set_meas_status(do_measurement=True)
return itpo