Coverage for pythia/chaos.py: 93%
120 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 17:13 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-08 17:13 +0000
1"""
2File: pythia/chaos.py
3Author: Nando Hegemann
4Gitlab: https://gitlab.com/Nando-Hegemann
5Description: Sample-based computation of polynomial chaos expansion.
6SPDX-License-Identifier: LGPL-3.0-or-later OR Hippocratic-3.0-ECO-MEDIA-MIL
7"""
8import warnings
9import math
10import psutil
11import numpy as np
12import pythia as pt
15class PolynomialChaos:
16 """Computation of sparse polynomial chaos expansion.
18 Parameters
19 ----------
20 params : list of `pt.parameter.Parameter`
21 List of stochastic parameters.
22 index_set : pt.index.IndexSet
23 Index set for sparse polynomial chaos expansion.
24 x_train : array_like
25 Parameter realizations for training.
26 weights : array_like
27 Regression weights for training.
28 fEval : array_like
29 Function evaluation for training.
30 coefficients : array_like, default=None
31 Polynomial expansion coefficients. If given, the coefficients are not
32 computed during initiation. This can be used to load a chaos expansion.
33 """
35 def __init__(
36 self,
37 params: list[pt.parameter.Parameter],
38 index_set: pt.index.IndexSet,
39 x_train: np.ndarray,
40 w_train: np.ndarray,
41 y_train: np.ndarray,
42 coefficients: np.ndarray | None = None,
43 ) -> None:
44 """Initiate the computation of the PC expansion of a function."""
45 assert x_train.ndim == 2 and x_train.shape[1] == len(params)
46 assert w_train.ndim == 1 and w_train.size == x_train.shape[0]
47 assert y_train.ndim == 2 and y_train.shape[0] == x_train.shape[0]
49 self.parameters = params
50 self.index_set = index_set
51 self.x_train = x_train
52 self.w_train = w_train
53 self.y_train = y_train
55 self.n_samples, self.sdim = x_train.shape
56 self.ydim = y_train.shape[1]
58 self.univariate_pdfs = [
59 pt.sampler.assign_sampler(param) for param in self.parameters
60 ]
61 self.pdf = pt.sampler.ParameterSampler(self.parameters).pdf
62 self.univariate_bases = pt.basis.univariate_basis(
63 self.parameters, self.index_set.max
64 )
65 self.basis = pt.basis.multivariate_basis(
66 self.univariate_bases, self.index_set.indices
67 )
69 self.gramian, self.basis_eval_mat = self._assemble_matrices()
70 if coefficients is None:
71 self.coefficients = self._fit()
72 else:
73 assert coefficients.shape == (index_set.shape[0], self.ydim)
74 self.coefficients = coefficients
75 self.sobol = self._compute_sobol_indices()
77 @property
78 def mean(self) -> np.ndarray:
79 """Mean of the PC expansion."""
80 idx = self.index_set.get_index_number(np.zeros((1, self.sdim), dtype=int))
81 return self.coefficients[idx].flatten()
83 @property
84 def var(self) -> np.ndarray:
85 """Variance of the PC expansion."""
86 if self.index_set.shape[0] == 1 and np.all(self.index_set.indices == 0):
87 return np.zeros(self.ydim)
88 return np.sum(self.coefficients**2, axis=0).flatten() - self.mean**2
90 @property
91 def std(self) -> np.ndarray:
92 """Standard deviation of the PC expansion."""
93 return np.sqrt(self.var)
95 def _assemble_matrices(self) -> tuple[np.ndarray, np.ndarray]:
96 """Assemble Gramian and basis evaluation matrix.
98 Assemble the information matrix :math:`A` and the basis evaluation
99 matrix :math:`\\Psi` with the regression points of the PC expansion.
100 The basis evaluation matrix :math:`\\Psi` is given by
102 .. math::
103 \\Psi_{kj} = \\operatorname{basis}[j](\\operatorname{regPoints}[k]).
105 Returns
106 -------
107 gramian : np.ndarray
108 Empirical Gramian matrix.
109 psi_mat : np.ndarray
110 Basis evaluation matrix :math:`\\Psi`.
111 """
112 batch_size = get_gram_batchsize(len(self.basis))
113 psi_mat = np.array([p(self.x_train) for p in self.basis])
114 gramian = np.zeros((len(self.basis), len(self.basis)))
115 for batch in pt.misc.batch(range(self.x_train.shape[0]), batch_size):
116 mat_1 = psi_mat[:, batch].reshape(1, len(self.basis), len(batch))
117 mat_2 = psi_mat[:, batch].reshape(len(self.basis), 1, len(batch))
118 gramian += np.sum(self.w_train[batch] * mat_1 * mat_2, axis=-1)
119 return gramian, psi_mat.T
121 def _fit(self) -> np.ndarray:
122 """Compute polynomial chaos expansion coefficients.
124 Compute the PC coefficients with linear regression. The coefficients
125 are given by
127 .. math::
128 S = A^(-1) * \\Psi^T * W * F_\\mathrm{ex}
130 where the Gram matrix :math:`A` is of full rank but may be ill
131 conditioned. :math:`F_\\mathrm{ex}` is an array containing the values
132 of f evaluated at the required regression points. For more detail on
133 the Gram matrix or :math:`\\Psi`, see `assemble_matrices()`.
135 Returns
136 -------
137 :
138 Polynomial chaos expansion coefficients.
139 """
140 u, s, vh = np.linalg.svd(self.gramian)
141 gramian_inv = np.dot(vh.T, np.dot(np.diag(1 / s), u.T))
142 W = self.w_train.reshape(-1, 1)
143 coefficients = np.linalg.multi_dot(
144 [gramian_inv, self.basis_eval_mat.T, W * self.y_train]
145 )
146 return coefficients
148 def _compute_sobol_indices(self) -> np.ndarray:
149 """Compute Sobol indices.
151 The Sobol coefficients are given as
153 .. math::
154 S_{i_1,...,i_k} = \\sum_{\\alpha\\in\\mathcal{M}} f_\\alpha(x)^2
156 where :math:`\\mathcal{M} = { \\alpha | \\alpha_{i_j} != 0 for j = 1,...,k }`.
157 """
158 sobol = np.zeros([len(self.index_set.sobol_tuples), self.ydim])
159 # mask components of f with zero variance
160 nz_idx = np.nonzero(self.var)
161 for idx, sdx in enumerate(self.index_set.sobol_tuples):
162 _mdx = self.index_set.sobol_tuple_to_indices([sdx])[0]
163 rows = self.index_set.get_index_number(_mdx)
164 coeffs = self.coefficients[rows]
165 sobol[idx, nz_idx] = (
166 np.sum(coeffs[:, nz_idx] ** 2, axis=0) / self.var[nz_idx]
167 )
168 return sobol
170 def eval(
171 self, x: np.ndarray, partial: list[int] | dict | None = None
172 ) -> np.ndarray:
173 """Evaluate the (partial derivative of the) PC approximation.
175 Parameters
176 ----------
177 x : np.ndarray
178 Parameter realizations in which the approximation is evaluated.
179 partial : list[int] | dict | None, optional
180 Number of derivatives for each parameter component.
181 If a list is given, length has to be the number of parameters.
182 Ordering of list is according to ``self.parameters``.
183 If a dict is given, keys have to be subset of parameter names.
185 Returns
186 -------
187 :
188 Evaluation of polynomial expansion in x values.
190 Examples
191 --------
192 Given two parameters :math:`x_1` and :math:`x_2`
194 >>> param1 = pt.parameter.Parameter("x1", [-1, 1], "uniform")
195 >>> param2 = pt.parameter.Parameter("x2", [-1, 1], "uniform")
197 a corresponding polynomial chaos approximation for a function :math:`f\\colon (x_1,x_1) \\mapsto y`
199 >>> surrogate = pt.chaos.PolynomialChaos([param1, param2], ...)
201 and an array the surrogate should be evaluated in
203 >>> x_test = np.random.uniform(-1, 1, (1000, 2))
205 we can evaluate the surrogate with
207 >>> y_approx = surrogate.eval(x_test)
209 To obtain partial a partial derivative of the approximation, e.g., :math:`\\frac{\\partial^2f}{\\partial x_2^2}`, specify a list
211 >>> y_approx = surrogate.eval(x_test, partial=[0, 2])
213 or a dictionary with parameter names and number of partial derivates
215 >>> y_approx = surrogate.eval(x_test, partial={'x2':2})
216 """
217 if x.ndim < 1 or x.ndim > 2:
218 raise ValueError(f"Wrong ndim: '{x.ndim}'")
219 if x.ndim == 1:
220 x.shape = 1, -1 # TODO: need to do this: x = x.reshape(1, -1)?
221 c = self.coefficients.reshape(self.coefficients.shape[0], 1, -1)
222 if partial is None:
223 basis = self.basis
224 else:
225 if isinstance(partial, dict):
226 _param_names = [p.name for p in self.parameters]
227 # check that all keys in partial are parameter names
228 assert set(partial.keys()).issubset(set(_param_names))
229 _partial = [0] * len(_param_names)
230 for k, v in partial.items():
231 _partial[_param_names.index(k)] = v
232 else: # partial already is a list of int
233 _partial = partial
234 basis = pt.basis.multivariate_basis(
235 self.univariate_bases, self.index_set.indices, _partial
236 )
237 eval_mat = np.array([b(x) for b in basis])
238 eval_mat.shape = eval_mat.shape[0], eval_mat.shape[1], 1
239 return np.sum(c * eval_mat, axis=0)
242def find_optimal_indices(
243 params: list[pt.parameter.Parameter],
244 x_train: np.ndarray,
245 w_train: np.ndarray,
246 y_train: np.ndarray,
247 max_terms: int = 0,
248 threshold: float = 1e-03,
249) -> tuple[np.ndarray, np.ndarray]:
250 """Compute optimal multiindices of polynomial chaos expansion.
252 Heuristical approach to compute almost optimal multiindices for a
253 polynomial chaos expansion based on an estimate of the Sobol
254 index values.
256 Parameters
257 ----------
258 params : list of pythia.Parameters.Parameter
259 Random parameters of the problem.
260 x_train : array_like
261 Sample points for training
262 w_train : array_like
263 Weights for training.
264 y_train : array_like
265 Function evaluations for training.
266 max_terms : int, default=0
267 Maximum number of expansion terms. Number of expansion terms is chosen
268 automatically for `max_terms=0`.
269 threshold : float, default=1e-03
270 Truncation threshold for Sobol indices. Smallest Sobol values with sum
271 less then ``threshold`` are ignored.
273 Returns
274 -------
275 indices :
276 Array with multiindices.
277 sobol :
278 Crude intermediate approximation of Sobol indices.
280 Notes
281 -----
282 To find reasonable candidates for the sparse polynomial chaos expansion,
283 first an expansion with a large simplex index set is computed.
284 The simplex index set uses the same maximum dimension in each component
285 and is designed to have at least ``max_terms`` many elements.
286 With this index set a polynomial chaos expansion is computed.
287 The computed Sobol indices are then ordered and the largest contributions
288 are collected by a Dörfler marking strategy.
289 Then a new index set is assembled by including a downward closed subset of
290 polynomial chaos coefficient indices for each selected Sobol index tuple.
291 The number of chosen indices for each selected Sobol index tuple is weighted
292 by the respective Sobol index value.
293 """
294 assert 0 <= threshold < 1
295 # set maximal number of expansion terms
296 n_samples, dim = x_train.shape
297 if max_terms > 0:
298 _max_terms = max_terms
299 else:
300 _max_terms = int(n_samples / np.log(n_samples) / 2)
301 if _max_terms > int(n_samples / np.log(n_samples) / 2):
302 warnings.warn("Gramian may become ill conditioned.")
304 # compute crude approximation of Sobol coefficients
305 deg = 1
306 for _deg in range(2, 1000):
307 n_terms = (
308 math.factorial(_deg + dim) / math.factorial(_deg) / math.factorial(dim)
309 )
310 if n_terms > _max_terms:
311 break
312 deg = _deg
313 _indices = pt.index.simplex_set(dim, deg)
314 index_set = pt.index.IndexSet(_indices)
315 surrogate = PolynomialChaos(params, index_set, x_train, w_train, y_train)
317 # sort Sobol coefficients by largest and mark top threshold percent.
318 idx, _, marker = pt.misc.doerfler_marking(
319 np.sum(surrogate.sobol, axis=1), threshold=1 - threshold
320 )
322 # assemble adaptive choice of multiindices
323 indices = assemble_indices(idx[:marker], index_set.sobol_tuples, _max_terms)
325 return indices, surrogate.sobol
328def assemble_indices(
329 enum_idx: list[int] | tuple[int] | np.ndarray,
330 sobol_tuples: list[tuple],
331 max_terms: int,
332) -> np.ndarray:
333 """Compute automatic choice of multiindices.
335 Parameters
336 ----------
337 enum_idx : np.ndarray
338 Sorted enumeration indices according to magnitude of Sobol indices.
339 sobol_tuples : list of tuple
340 List of Sobol subscript tuples.
341 max_terms : int
342 Maximum number of expansion terms.
344 Returns
345 -------
346 indices : np.ndarray
347 Array of (sparse) optimal multiindices.
348 """
349 dim = max(len(sdx) for sdx in sobol_tuples)
350 n_terms_per_idx = int((max_terms - 1) / len(enum_idx))
351 indices_list = []
352 for idx in enum_idx:
353 components = [s - 1 for s in sobol_tuples[idx]] # current sdx
354 deg = int(n_terms_per_idx ** (1 / len(components)) + 1)
355 tmp_indices = pt.index.tensor_set(
356 [deg + 2 if j in components else 1 for j in range(dim)],
357 [1 if j in components else 0 for j in range(dim)],
358 )
359 indices_list += [tmp_indices[:n_terms_per_idx]]
360 indices = pt.index.sort_index_array(
361 np.concatenate([np.zeros((1, dim))] + indices_list, axis=0)
362 )
363 return indices
366def get_gram_batchsize(dim: int, save_memory: float = 1025**3 / 2) -> int:
367 """Compute memory allocation batch sizes for information matrix.
369 Compute the maximal number of samples in each batch when assembling the
370 information matrix to be maximally memory efficient and avoid OutOfMemory
371 errors.
373 Parameters
374 ----------
375 dim : int
376 Number of rows/columns of information matrix.
377 save_memory : int, default=3*1025/2
378 Memory (in bytes), that should be kept free. The default is equivalent
379 to 512 MB.
381 Returns
382 -------
383 :
384 Batchsize for assembling of information matrix.
385 """
386 available_memory = psutil.virtual_memory().available
387 mem = available_memory - save_memory
388 n = int(mem / 8 / dim**2)
389 if n < 1:
390 # There is less memory available than required for at least one sample.
391 raise MemoryError("Not enough free memory.")
392 else:
393 return n
396if __name__ == "__main__":
397 pass