Coverage for pythia/chaos.py: 93%

120 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-08 17:13 +0000

1""" 

2File: pythia/chaos.py 

3Author: Nando Hegemann 

4Gitlab: https://gitlab.com/Nando-Hegemann 

5Description: Sample-based computation of polynomial chaos expansion. 

6SPDX-License-Identifier: LGPL-3.0-or-later OR Hippocratic-3.0-ECO-MEDIA-MIL 

7""" 

8import warnings 

9import math 

10import psutil 

11import numpy as np 

12import pythia as pt 

13 

14 

15class PolynomialChaos: 

16 """Computation of sparse polynomial chaos expansion. 

17 

18 Parameters 

19 ---------- 

20 params : list of `pt.parameter.Parameter` 

21 List of stochastic parameters. 

22 index_set : pt.index.IndexSet 

23 Index set for sparse polynomial chaos expansion. 

24 x_train : array_like 

25 Parameter realizations for training. 

26 weights : array_like 

27 Regression weights for training. 

28 fEval : array_like 

29 Function evaluation for training. 

30 coefficients : array_like, default=None 

31 Polynomial expansion coefficients. If given, the coefficients are not 

32 computed during initiation. This can be used to load a chaos expansion. 

33 """ 

34 

35 def __init__( 

36 self, 

37 params: list[pt.parameter.Parameter], 

38 index_set: pt.index.IndexSet, 

39 x_train: np.ndarray, 

40 w_train: np.ndarray, 

41 y_train: np.ndarray, 

42 coefficients: np.ndarray | None = None, 

43 ) -> None: 

44 """Initiate the computation of the PC expansion of a function.""" 

45 assert x_train.ndim == 2 and x_train.shape[1] == len(params) 

46 assert w_train.ndim == 1 and w_train.size == x_train.shape[0] 

47 assert y_train.ndim == 2 and y_train.shape[0] == x_train.shape[0] 

48 

49 self.parameters = params 

50 self.index_set = index_set 

51 self.x_train = x_train 

52 self.w_train = w_train 

53 self.y_train = y_train 

54 

55 self.n_samples, self.sdim = x_train.shape 

56 self.ydim = y_train.shape[1] 

57 

58 self.univariate_pdfs = [ 

59 pt.sampler.assign_sampler(param) for param in self.parameters 

60 ] 

61 self.pdf = pt.sampler.ParameterSampler(self.parameters).pdf 

62 self.univariate_bases = pt.basis.univariate_basis( 

63 self.parameters, self.index_set.max 

64 ) 

65 self.basis = pt.basis.multivariate_basis( 

66 self.univariate_bases, self.index_set.indices 

67 ) 

68 

69 self.gramian, self.basis_eval_mat = self._assemble_matrices() 

70 if coefficients is None: 

71 self.coefficients = self._fit() 

72 else: 

73 assert coefficients.shape == (index_set.shape[0], self.ydim) 

74 self.coefficients = coefficients 

75 self.sobol = self._compute_sobol_indices() 

76 

77 @property 

78 def mean(self) -> np.ndarray: 

79 """Mean of the PC expansion.""" 

80 idx = self.index_set.get_index_number(np.zeros((1, self.sdim), dtype=int)) 

81 return self.coefficients[idx].flatten() 

82 

83 @property 

84 def var(self) -> np.ndarray: 

85 """Variance of the PC expansion.""" 

86 if self.index_set.shape[0] == 1 and np.all(self.index_set.indices == 0): 

87 return np.zeros(self.ydim) 

88 return np.sum(self.coefficients**2, axis=0).flatten() - self.mean**2 

89 

90 @property 

91 def std(self) -> np.ndarray: 

92 """Standard deviation of the PC expansion.""" 

93 return np.sqrt(self.var) 

94 

95 def _assemble_matrices(self) -> tuple[np.ndarray, np.ndarray]: 

96 """Assemble Gramian and basis evaluation matrix. 

97 

98 Assemble the information matrix :math:`A` and the basis evaluation 

99 matrix :math:`\\Psi` with the regression points of the PC expansion. 

100 The basis evaluation matrix :math:`\\Psi` is given by 

101 

102 .. math:: 

103 \\Psi_{kj} = \\operatorname{basis}[j](\\operatorname{regPoints}[k]). 

104 

105 Returns 

106 ------- 

107 gramian : np.ndarray 

108 Empirical Gramian matrix. 

109 psi_mat : np.ndarray 

110 Basis evaluation matrix :math:`\\Psi`. 

111 """ 

112 batch_size = get_gram_batchsize(len(self.basis)) 

113 psi_mat = np.array([p(self.x_train) for p in self.basis]) 

114 gramian = np.zeros((len(self.basis), len(self.basis))) 

115 for batch in pt.misc.batch(range(self.x_train.shape[0]), batch_size): 

116 mat_1 = psi_mat[:, batch].reshape(1, len(self.basis), len(batch)) 

117 mat_2 = psi_mat[:, batch].reshape(len(self.basis), 1, len(batch)) 

118 gramian += np.sum(self.w_train[batch] * mat_1 * mat_2, axis=-1) 

119 return gramian, psi_mat.T 

120 

121 def _fit(self) -> np.ndarray: 

122 """Compute polynomial chaos expansion coefficients. 

123 

124 Compute the PC coefficients with linear regression. The coefficients 

125 are given by 

126 

127 .. math:: 

128 S = A^(-1) * \\Psi^T * W * F_\\mathrm{ex} 

129 

130 where the Gram matrix :math:`A` is of full rank but may be ill 

131 conditioned. :math:`F_\\mathrm{ex}` is an array containing the values 

132 of f evaluated at the required regression points. For more detail on 

133 the Gram matrix or :math:`\\Psi`, see `assemble_matrices()`. 

134 

135 Returns 

136 ------- 

137 : 

138 Polynomial chaos expansion coefficients. 

139 """ 

140 u, s, vh = np.linalg.svd(self.gramian) 

141 gramian_inv = np.dot(vh.T, np.dot(np.diag(1 / s), u.T)) 

142 W = self.w_train.reshape(-1, 1) 

143 coefficients = np.linalg.multi_dot( 

144 [gramian_inv, self.basis_eval_mat.T, W * self.y_train] 

145 ) 

146 return coefficients 

147 

148 def _compute_sobol_indices(self) -> np.ndarray: 

149 """Compute Sobol indices. 

150 

151 The Sobol coefficients are given as 

152 

153 .. math:: 

154 S_{i_1,...,i_k} = \\sum_{\\alpha\\in\\mathcal{M}} f_\\alpha(x)^2 

155 

156 where :math:`\\mathcal{M} = { \\alpha | \\alpha_{i_j} != 0 for j = 1,...,k }`. 

157 """ 

158 sobol = np.zeros([len(self.index_set.sobol_tuples), self.ydim]) 

159 # mask components of f with zero variance 

160 nz_idx = np.nonzero(self.var) 

161 for idx, sdx in enumerate(self.index_set.sobol_tuples): 

162 _mdx = self.index_set.sobol_tuple_to_indices([sdx])[0] 

163 rows = self.index_set.get_index_number(_mdx) 

164 coeffs = self.coefficients[rows] 

165 sobol[idx, nz_idx] = ( 

166 np.sum(coeffs[:, nz_idx] ** 2, axis=0) / self.var[nz_idx] 

167 ) 

168 return sobol 

169 

170 def eval( 

171 self, x: np.ndarray, partial: list[int] | dict | None = None 

172 ) -> np.ndarray: 

173 """Evaluate the (partial derivative of the) PC approximation. 

174 

175 Parameters 

176 ---------- 

177 x : np.ndarray 

178 Parameter realizations in which the approximation is evaluated. 

179 partial : list[int] | dict | None, optional 

180 Number of derivatives for each parameter component. 

181 If a list is given, length has to be the number of parameters. 

182 Ordering of list is according to ``self.parameters``. 

183 If a dict is given, keys have to be subset of parameter names. 

184 

185 Returns 

186 ------- 

187 : 

188 Evaluation of polynomial expansion in x values. 

189 

190 Examples 

191 -------- 

192 Given two parameters :math:`x_1` and :math:`x_2` 

193 

194 >>> param1 = pt.parameter.Parameter("x1", [-1, 1], "uniform") 

195 >>> param2 = pt.parameter.Parameter("x2", [-1, 1], "uniform") 

196 

197 a corresponding polynomial chaos approximation for a function :math:`f\\colon (x_1,x_1) \\mapsto y` 

198 

199 >>> surrogate = pt.chaos.PolynomialChaos([param1, param2], ...) 

200 

201 and an array the surrogate should be evaluated in 

202 

203 >>> x_test = np.random.uniform(-1, 1, (1000, 2)) 

204 

205 we can evaluate the surrogate with 

206 

207 >>> y_approx = surrogate.eval(x_test) 

208 

209 To obtain partial a partial derivative of the approximation, e.g., :math:`\\frac{\\partial^2f}{\\partial x_2^2}`, specify a list 

210 

211 >>> y_approx = surrogate.eval(x_test, partial=[0, 2]) 

212 

213 or a dictionary with parameter names and number of partial derivates 

214 

215 >>> y_approx = surrogate.eval(x_test, partial={'x2':2}) 

216 """ 

217 if x.ndim < 1 or x.ndim > 2: 

218 raise ValueError(f"Wrong ndim: '{x.ndim}'") 

219 if x.ndim == 1: 

220 x.shape = 1, -1 # TODO: need to do this: x = x.reshape(1, -1)? 

221 c = self.coefficients.reshape(self.coefficients.shape[0], 1, -1) 

222 if partial is None: 

223 basis = self.basis 

224 else: 

225 if isinstance(partial, dict): 

226 _param_names = [p.name for p in self.parameters] 

227 # check that all keys in partial are parameter names 

228 assert set(partial.keys()).issubset(set(_param_names)) 

229 _partial = [0] * len(_param_names) 

230 for k, v in partial.items(): 

231 _partial[_param_names.index(k)] = v 

232 else: # partial already is a list of int 

233 _partial = partial 

234 basis = pt.basis.multivariate_basis( 

235 self.univariate_bases, self.index_set.indices, _partial 

236 ) 

237 eval_mat = np.array([b(x) for b in basis]) 

238 eval_mat.shape = eval_mat.shape[0], eval_mat.shape[1], 1 

239 return np.sum(c * eval_mat, axis=0) 

240 

241 

242def find_optimal_indices( 

243 params: list[pt.parameter.Parameter], 

244 x_train: np.ndarray, 

245 w_train: np.ndarray, 

246 y_train: np.ndarray, 

247 max_terms: int = 0, 

248 threshold: float = 1e-03, 

249) -> tuple[np.ndarray, np.ndarray]: 

250 """Compute optimal multiindices of polynomial chaos expansion. 

251 

252 Heuristical approach to compute almost optimal multiindices for a 

253 polynomial chaos expansion based on an estimate of the Sobol 

254 index values. 

255 

256 Parameters 

257 ---------- 

258 params : list of pythia.Parameters.Parameter 

259 Random parameters of the problem. 

260 x_train : array_like 

261 Sample points for training 

262 w_train : array_like 

263 Weights for training. 

264 y_train : array_like 

265 Function evaluations for training. 

266 max_terms : int, default=0 

267 Maximum number of expansion terms. Number of expansion terms is chosen 

268 automatically for `max_terms=0`. 

269 threshold : float, default=1e-03 

270 Truncation threshold for Sobol indices. Smallest Sobol values with sum 

271 less then ``threshold`` are ignored. 

272 

273 Returns 

274 ------- 

275 indices : 

276 Array with multiindices. 

277 sobol : 

278 Crude intermediate approximation of Sobol indices. 

279 

280 Notes 

281 ----- 

282 To find reasonable candidates for the sparse polynomial chaos expansion, 

283 first an expansion with a large simplex index set is computed. 

284 The simplex index set uses the same maximum dimension in each component 

285 and is designed to have at least ``max_terms`` many elements. 

286 With this index set a polynomial chaos expansion is computed. 

287 The computed Sobol indices are then ordered and the largest contributions 

288 are collected by a Dörfler marking strategy. 

289 Then a new index set is assembled by including a downward closed subset of 

290 polynomial chaos coefficient indices for each selected Sobol index tuple. 

291 The number of chosen indices for each selected Sobol index tuple is weighted 

292 by the respective Sobol index value. 

293 """ 

294 assert 0 <= threshold < 1 

295 # set maximal number of expansion terms 

296 n_samples, dim = x_train.shape 

297 if max_terms > 0: 

298 _max_terms = max_terms 

299 else: 

300 _max_terms = int(n_samples / np.log(n_samples) / 2) 

301 if _max_terms > int(n_samples / np.log(n_samples) / 2): 

302 warnings.warn("Gramian may become ill conditioned.") 

303 

304 # compute crude approximation of Sobol coefficients 

305 deg = 1 

306 for _deg in range(2, 1000): 

307 n_terms = ( 

308 math.factorial(_deg + dim) / math.factorial(_deg) / math.factorial(dim) 

309 ) 

310 if n_terms > _max_terms: 

311 break 

312 deg = _deg 

313 _indices = pt.index.simplex_set(dim, deg) 

314 index_set = pt.index.IndexSet(_indices) 

315 surrogate = PolynomialChaos(params, index_set, x_train, w_train, y_train) 

316 

317 # sort Sobol coefficients by largest and mark top threshold percent. 

318 idx, _, marker = pt.misc.doerfler_marking( 

319 np.sum(surrogate.sobol, axis=1), threshold=1 - threshold 

320 ) 

321 

322 # assemble adaptive choice of multiindices 

323 indices = assemble_indices(idx[:marker], index_set.sobol_tuples, _max_terms) 

324 

325 return indices, surrogate.sobol 

326 

327 

328def assemble_indices( 

329 enum_idx: list[int] | tuple[int] | np.ndarray, 

330 sobol_tuples: list[tuple], 

331 max_terms: int, 

332) -> np.ndarray: 

333 """Compute automatic choice of multiindices. 

334 

335 Parameters 

336 ---------- 

337 enum_idx : np.ndarray 

338 Sorted enumeration indices according to magnitude of Sobol indices. 

339 sobol_tuples : list of tuple 

340 List of Sobol subscript tuples. 

341 max_terms : int 

342 Maximum number of expansion terms. 

343 

344 Returns 

345 ------- 

346 indices : np.ndarray 

347 Array of (sparse) optimal multiindices. 

348 """ 

349 dim = max(len(sdx) for sdx in sobol_tuples) 

350 n_terms_per_idx = int((max_terms - 1) / len(enum_idx)) 

351 indices_list = [] 

352 for idx in enum_idx: 

353 components = [s - 1 for s in sobol_tuples[idx]] # current sdx 

354 deg = int(n_terms_per_idx ** (1 / len(components)) + 1) 

355 tmp_indices = pt.index.tensor_set( 

356 [deg + 2 if j in components else 1 for j in range(dim)], 

357 [1 if j in components else 0 for j in range(dim)], 

358 ) 

359 indices_list += [tmp_indices[:n_terms_per_idx]] 

360 indices = pt.index.sort_index_array( 

361 np.concatenate([np.zeros((1, dim))] + indices_list, axis=0) 

362 ) 

363 return indices 

364 

365 

366def get_gram_batchsize(dim: int, save_memory: float = 1025**3 / 2) -> int: 

367 """Compute memory allocation batch sizes for information matrix. 

368 

369 Compute the maximal number of samples in each batch when assembling the 

370 information matrix to be maximally memory efficient and avoid OutOfMemory 

371 errors. 

372 

373 Parameters 

374 ---------- 

375 dim : int 

376 Number of rows/columns of information matrix. 

377 save_memory : int, default=3*1025/2 

378 Memory (in bytes), that should be kept free. The default is equivalent 

379 to 512 MB. 

380 

381 Returns 

382 ------- 

383 : 

384 Batchsize for assembling of information matrix. 

385 """ 

386 available_memory = psutil.virtual_memory().available 

387 mem = available_memory - save_memory 

388 n = int(mem / 8 / dim**2) 

389 if n < 1: 

390 # There is less memory available than required for at least one sample. 

391 raise MemoryError("Not enough free memory.") 

392 else: 

393 return n 

394 

395 

396if __name__ == "__main__": 

397 pass