Source code for qtealeaves.mpos.sparsematrixproductoperator

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Sparse matrix product operators for simulations. This MPO covers a
full system with a list of `SparseMatrixOperator`.
"""

# pylint: disable=protected-access
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
import logging

import numpy as np

from qtealeaves.tensors.tensor_backend import TensorBackend, _AbstractQteaTensor
from qtealeaves.tooling import QTeaLeavesError
from qtealeaves.tooling.permutations import _transpose_idx
from qtealeaves.tooling.restrictedclasses import _RestrictedList

from .abstracteffop import _AbstractEffectiveMpo
from .sparsematrixoperator import SparseMatrixOperator, SparseMatrixOperatorPy

__all__ = ["SparseMPO", "SparseMatrixProductOperator"]

logger = logging.getLogger(__name__)


[docs] class SparseMatrixProductOperator: """ Indexed sparse MPO for a set of sites. **Arguments** params : dict Parameterization of a simulation. model : instance of `QuantumModel` The physical model to be converted into an MPO. operator_map : dict Mapping the operators to their integer IDs. param_map : dict Mapping the parameters to their integer IDs. """ def __init__(self, params, model, operator_map, param_map): ll = model.get_number_of_sites(params) ll_xyz = model.get_number_of_sites_xyz(params) self.sp_mat_ops = [] for ii in range(ll): self.sp_mat_ops.append(SparseMatrixOperator(ii == 0, ii + 1 == ll, True)) for term in model.hterms: sp_mat_ops_new = term.get_sparse_matrix_operators( ll_xyz, params, operator_map, param_map, SparseMatrixOperator, dim=model.dim, ) self.add_terms(sp_mat_ops_new)
[docs] def add_terms(self, sp_mat_ops_list): """ Add a list of `SparseMatrixOperators` to the existing one in-place. **Arguments** sp_mat_ops_list : list of `SparseMatrixOperators` Another interaction to be added to the MPO. """ ll = len(self.sp_mat_ops) nn = len(sp_mat_ops_list) if ll != nn: raise QTeaLeavesError("Can only combine same lengths.") for ii in range(ll): self.sp_mat_ops[ii] += sp_mat_ops_list[ii]
[docs] def write(self, fh): """ Write out the sparse MPO compatible with reading it in fortran. **Arguments** fh : open filehandle Information about MPO will be written here. """ nn = len(self.sp_mat_ops) fh.write("%d \n" % (nn)) fh.write("-" * 32 + "\n") for ii in range(nn): self.sp_mat_ops[ii].write(fh) fh.write("-" * 32 + "\n")
class SparseMPOSites(_RestrictedList): """SparseMPOSites contains the physical terms.""" class_allowed = SparseMatrixOperatorPy def __init__(self, num_sites, operators, do_vecs=True, tensor_backend=None): if tensor_backend is None: raise NotImplementedError( "tensor_backend has to be set when on this level." ) super().__init__() for ii in range(num_sites): operator_eye = operators[(ii, "id")] site = SparseMatrixOperatorPy( ii == 0, ii + 1 == num_sites, do_vecs, operator_eye, tensor_backend=tensor_backend, ) self.append(site) def update_couplings(self, params): """Load couplings from an update params dictionary.""" for elem in self: elem.update_couplings(params) def to_dense_mpo_matrices(self): """Convert sparse MPO into a list of dense matrices of physical sites. The weights are considered during the conversion, but any parameterization is lost. Sparse MPOs with symmetric tensors cannot be converted. Returns: dense_mat_list (list[_AbstractQteaTensor]) : the dense matrix representing the sparse MPO matrix compatible with the backend. """ return [elem.to_dense_mpo_matrix() for elem in self] def snapshot_matrix(self, limit_qubits=12): """Convert sparse MPO into matrix representation on full Hilbert space. Args: limit_qubits : int, optional The limit for converting a system into the full matrix to prevent memory overflows when called on accident. Limit is in qubit-equivalents. Default to 12 (matrix size 4096). Returns: mat (_AbstractQteaTensor) : Sparse MPO represented as a matrix. """ mpo_matrices = self.to_dense_mpo_matrices() dims = [elem.shape[1] for elem in mpo_matrices] num_qubits = np.log2(np.prod(dims)) if num_qubits > limit_qubits: raise RuntimeError( "Cannot convert to full matrix due to memory restriction." ) obj = self[0].tensor_backend([1, 1, 1, 1], ctrl="O") for elem in mpo_matrices: obj = obj.einsum("ijkl,labc->ijakbc", elem) ld = obj.shape[1] * obj.shape[2] dims = [obj.shape[0], ld, ld, obj.shape[-1]] obj = obj.reshape(dims) dims = [obj.shape[1], obj.shape[2]] obj = obj.reshape(dims) return obj
[docs] class SparseMPO(_AbstractEffectiveMpo): """ Representation of a sparseMPO for python. """ def __init__(self, num_sites, operators, do_vecs=True, tensor_backend=None): super().__init__() if tensor_backend is None: logger.warning("Setting default tensor backend because not passed.") tensor_backend = TensorBackend() self.site_terms = SparseMPOSites( num_sites, operators, do_vecs=do_vecs, tensor_backend=tensor_backend ) self.eff_ops = {} # tracking self._contraction_counter = {} # for convert self._tensor_network = None # store mode on indexing when possible self._do_indexing = None # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def device(self): """Device where the tensor is stored.""" for _, elem in self.eff_ops.items(): return elem.device return None @property def dtype(self): """Data type of the underlying arrays.""" for _, elem in self.eff_ops.items(): return elem.dtype return None @property def num_sites(self): """Return the number of sites in the underlying system.""" return len(self.site_terms) # -------------------------------------------------------------------------- # Overwritten operators # -------------------------------------------------------------------------- def __contains__(self, idxs) -> bool: """Check if entry inside the effective operators.""" return idxs in self.eff_ops def __getitem__(self, key): """Get an entry from the effective operators.""" return self.eff_ops[key] def __setitem__(self, key, value): """Set an entry from the effective operators.""" if not isinstance(value, SparseMatrixOperatorPy): raise TypeError("`site_term` must be an SparseMatrixOperatorPy.") self.eff_ops[key] = value # -------------------------------------------------------------------------- # Abstract effective operator methods requiring implementation here # -------------------------------------------------------------------------- # pylint: disable-next=too-many-locals
[docs] def contr_to_eff_op(self, tensor, pos, pos_links, idx_out): """Contract operator lists with tensors T and Tdagger to effective operator.""" ops_list = [] idx_list = [] pos_link_out = None for ii, pos_link in enumerate(pos_links): if ii == idx_out: pos_link_out = pos_link continue if pos_link is None: continue pos_jj = self.eff_ops[(pos_link, pos)] ops_list.append(pos_jj) idx_list.append(ii) if pos_link_out is None: raise QTeaLeavesError( "Arguments for contraction effective operator mismatch." ) # Key and key for inverse direction key = (pos, pos_link_out) ikey = (pos_link_out, pos) c_counter = self._contraction_counter.get(key, 0) # Different loop starts are beneficial if idx_out > np.max(idx_list): # From left to right idx_start = 0 stride = 1 elif idx_out < np.min(idx_list): # Looping backwards requires flipping links - avoid for now idx_start = 0 stride = 1 ## From right to left # idx_start = len(idx_list) - 1 # stride = -1 else: # To get contractable stuff, we start right # of the gap and move rightwards stride = 1 for ii, elem in enumerate(idx_list): if elem > idx_out: idx_start = ii # Contract tree tensor with sparse MPO cidx = idx_list[idx_start] perm_out = _transpose_idx(tensor.ndim, cidx) perm_out[perm_out == tensor.ndim - 1] += 1 perm_out = [tensor.ndim - 1] + list(perm_out) + [tensor.ndim + 1] ctens = ops_list[idx_start].tensordot_with_tensor_left( tensor, [cidx], [2], perm_out=perm_out ) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 ii = idx_start for _ in range(len(idx_list) - 1): ii += stride if ii == len(idx_list): ii = 0 if stride > 0: mat_b = ops_list[ii] # Need offset of one as we have link to the left now cidx_a = [idx_list[ii] + 1] cidx_b = [2] perm_out = _transpose_idx(ctens.ndim - 1, cidx_a[0]) perm_out = list(perm_out) + [ctens.ndim - 1] ctens = ctens.matrix_multiply(mat_b, cidx_a, cidx_b, perm_out=perm_out) else: mat_a = ops_list[ii] # Need offset of one as we have link to the left now (cidx_b) cidx_a = [2] cidx_b = [idx_list[ii] + 1] perm_out = _transpose_idx(ctens.ndim - 2, idx_list[ii]) perm_out += 1 perm_out = [0] + list(perm_out) + [ctens.ndim - 1] ctens = mat_a.matrix_multiply(ctens, cidx_a, cidx_b, perm_out=perm_out) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 # Contract with complex conjugated cidx_a = tensor._invert_link_selection([idx_out]) cidx_b = [ii + 1 for ii in cidx_a] # We get a four-link tensor perm_out = [1, 0, 2, 3] ctens = ctens.tensordot_with_tensor_left( tensor.conj(), cidx_a, cidx_b, perm_out=perm_out ) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 if ikey in self.eff_ops: del self.eff_ops[ikey] self.eff_ops[key] = ctens self._contraction_counter[key] = c_counter
# pylint: disable-next=too-many-locals, too-many-arguments
[docs] def contract_tensor_lists( self, tensor, pos, pos_links, custom_ops=None, pre_return_hook=None ): """ Linear operator to contract all the effective operators around the tensor in position `pos`. Used in the optimization. """ if custom_ops is None: ops_list = [] idx_list = [] for ii, pos_link in enumerate(pos_links): if pos_link is None: continue pos_jj = self.eff_ops[(pos_link, pos)] ops_list.append(pos_jj) idx_list.append(ii) else: # Required for time evolution backwards step on R-tensor ops_list = custom_ops idx_list = list(range(len(ops_list))) # Find the best start of the loop with an sparse MPO with just one # row if possible idx_start = 0 for ii, elem in enumerate(ops_list): if elem is None: pass elif elem._sp_mat.shape[0] == 1: idx_start = ii break # Contract tree tensor with sparse MPO cidx = idx_list[idx_start] perm_out = _transpose_idx(tensor.ndim, cidx) perm_out[perm_out == tensor.ndim - 1] += 1 perm_out = [tensor.ndim - 1] + list(perm_out) + [tensor.ndim + 1] ctens = ops_list[idx_start].tensordot_with_tensor_left( tensor, [cidx], [2], perm_out=perm_out ) c_counter = self._contraction_counter.get(pos, 0) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 ii = idx_start for _ in range(len(idx_list) - 1): ii += 1 if ii == len(idx_list): ii = 0 op_ii = ops_list[ii] if ops_list[ii] is None: continue cidx_a = [idx_list[ii] + 1] cidx_b = [2] perm_out = _transpose_idx(ctens.ndim - 1, cidx_a[0]) perm_out = list(perm_out) + [ctens.ndim - 1] ctens = ctens.matrix_multiply(op_ii, cidx_a, cidx_b, perm_out=perm_out) c_counter += ctens._contraction_counter ctens._contraction_counter = 0 shape = ctens._sp_mat.shape if shape[0] != shape[1]: raise QTeaLeavesError("Failed to contract to square sparse MPO", shape) if shape[0] == 1: idx = ctens._sp_mat[0, 0] weight = ctens._weight[0, 0] ctens = ctens.tensors[idx] if abs(weight - 1.0) > 1e-14: ctens = ctens * weight ctens.remove_dummy_link(ctens.ndim - 1) ctens.remove_dummy_link(0) else: # we can take the trace by hand here tmp = None for ii in range(shape[0]): jj = ctens._sp_mat[ii, ii] if jj == 0: continue tmp2 = ctens.tensors[jj].copy() tmp2.trace_one_dim_pair([0, tmp2.ndim - 1]) if tmp is None: tmp = tmp2 if abs(ctens._weight[ii, ii] - 1) > 1e-14: tmp = tmp * ctens._weight[0, 0] else: factor_other = ctens._weight[ii, ii] tmp.add_update(tmp2, factor_other=factor_other) if tmp is None: # Can be treated if we find any operator and multiply by zero? raise QTeaLeavesError("Diagonal empty in SPO.") ctens = tmp self._contraction_counter[pos] = c_counter if pre_return_hook is not None: ctens = pre_return_hook(ctens) return ctens
[docs] def convert(self, dtype, device): """ Convert underlying array to the speificed data type inplace. Original site terms are preserved. """ if (self.dtype == dtype) and (self.device == device): return # We could detect up-conversion and down-conversion. Only for # conversion to higher precisions, we have to copy from the # site terms again which are in double precision if self._tensor_network is None: raise QTeaLeavesError("convert needs tensor network to be set.") for ii, key in enumerate(self._tensor_network._iter_physical_links()): self[key] = self.site_terms[ii] for _, elem in self.eff_ops.items(): elem.convert(dtype, device)
[docs] def get_local_kraus_operators(self, dt: float) -> dict[int, _AbstractQteaTensor]: """ Constructs local Kraus operators from local Lindblad operators. Parameters ---------- dt : float, timestep Returns ------- kraus_ops : dict of :py:class:`_AbstractQTeaTensor` Dictionary, keys are site indices and elements the corresponding 3-leg kraus tensors """ raise NotImplementedError("Kraus operators and SPO.")
# -------------------------------------------------------------------------- # Overwriting methods from parent class # --------------------------------------------------------------------------
[docs] def print_summary(self): """Print summary of computational effort.""" mode_str = f"(indexing={self._do_indexing})" logger.info("%s Contraction summary SPO %s %s", "=" * 20, mode_str, "=" * 20) total = 0 for key, value in self._contraction_counter.items(): logger.info("Count %s = %s", key, value) total += value logger.info("Total contractions %s", total) logger.info("^" * 60)
# -------------------------------------------------------------------------- # Methods specific to SPO # --------------------------------------------------------------------------
[docs] def update_couplings(self, params): """Load couplings from an update params dictionary.""" self.site_terms.update_couplings(params)
# self.collapse(params) # pylint: disable-next=too-many-locals
[docs] def add_dense_mpo_list(self, dense_mpo_list, params, indexed_spo=True): """Add terms for :class:`DenseMPOList` to the SparseMPO.""" is_oqs = False num_sites = self.num_sites self._do_indexing = indexed_spo for mpo in dense_mpo_list: if len(mpo) == 1: # Local term op = mpo[0].operator idx = mpo[0].site match = None for kk, val in self.site_terms[idx].tensors.items(): if op == val: match = kk break if match is None or (not indexed_spo): match = len(self.site_terms[idx].tensors) + 1 self.site_terms[idx].tensors[match] = op self.site_terms[idx].add_local( match, mpo[0].pstrength, mpo[0].weight, is_oqs ) continue if len(mpo) == 2: if mpo[0].site + 1 == mpo[1].site: # We have a nearest neighbor interaction op_l = mpo[0].operator op_r = mpo[1].operator idx_l = mpo[0].site idx_r = mpo[1].site pre_l = mpo[0].weight pre_r = mpo[1].weight total_l = mpo[0].total_scaling total_r = mpo[1].total_scaling match_l = None match_r = None pdict = { 1: mpo[0].pstrength, 2: mpo[1].pstrength, } for kk, val in self.site_terms[idx_l].tensors.items(): if op_l == val: match_l = kk break if match_l is None or (not indexed_spo): match_l = len(self.site_terms[idx_l].tensors) + 1 self.site_terms[idx_l].tensors[match_l] = op_l for kk, val in self.site_terms[idx_r].tensors.items(): if op_r == val: match_r = kk break if match_r is None or (not indexed_spo): match_r = len(self.site_terms[idx_r].tensors) + 1 self.site_terms[idx_r].tensors[match_r] = op_r if idx_l == 0: shape_l = (1, 3) else: shape_l = (2, 3) if idx_r + 1 == num_sites: shape_r = (3, 1) else: shape_r = (3, 2) sp_mat_l = np.zeros(shape_l, dtype=int) sp_mat_r = np.zeros(shape_r, dtype=int) prefactor_l = np.zeros(shape_l, dtype=np.float64) prefactor_r = np.zeros(shape_r, dtype=np.float64) weight_l = np.zeros(shape_l, dtype=np.complex128) weight_r = np.zeros(shape_r, dtype=np.complex128) params_l = np.zeros(shape_l, dtype=int) params_r = np.zeros(shape_r, dtype=int) sp_mat_l[-1, 1] = match_l prefactor_l[-1, 1] = pre_l weight_l[-1, 1] = total_l params_l[-1, 1] = 1 sp_mat_r[1, 0] = match_r prefactor_r[1, 0] = pre_r weight_r[1, 0] = total_r params_r[1, 0] = 2 self.site_terms[idx_l].add_term( sp_mat_l, params_l, prefactor_l, weight_l, pdict ) self.site_terms[idx_r].add_term( sp_mat_r, params_r, prefactor_r, weight_r, pdict ) continue raise NotImplementedError( "Sparse-MPO beyond nearest-neighbor interactions." ) for elem in self.site_terms: elem.collapse_local(params)
[docs] def to_str(self): """String representation with sparse-matrix elements.""" str_buffer = "\n" + "*" * 60 + "\n" str_buffer += f"Number of sites : {len(self.site_terms)}\n\n" for ii, elem in enumerate(self): str_buffer += f"Site {ii}\n" str_buffer += str(elem._sp_mat) + "\n" str_buffer += str(elem._weight) + "\n" str_buffer += "\n" return str_buffer
[docs] def setup_as_eff_ops(self, tensor_network, measurement_mode=False): """Set this sparse MPO as effective ops in TN and initialize.""" if measurement_mode: raise ValueError("SPO has no measurement mode.") self._tensor_network = tensor_network for ii, key in enumerate(tensor_network._iter_physical_links()): self.eff_ops[key] = self.site_terms[ii].copy() self.convert(tensor_network.dtype, tensor_network.device) tensor_network.eff_op = self tensor_network.build_effective_operators()
[docs] def values(self): # -> dict_values: """Return the values stored in the effective ops dictionary. Returns: values (dict_values) """ return self.eff_ops.values()