Unverified Commit 8405a0f7 authored by Stephan Rave's avatar Stephan Rave Committed by GitHub
Browse files

Merge pull request #1110 from TiKeil/linear_optimization

Towards linear optimization (dual problem, sensitivity problem, output gradient)
parents 70af0f04 6abf6dd1
Pipeline #69883 failed with stages
in 94 minutes and 11 seconds
......@@ -12,9 +12,10 @@
## pyMOR 2020.2
* Tim Keil, tim.keil@uni-muenster.de
* Energy product in elliptic discretizer
* energy product in elliptic discretizer
* rename estimate --> estimate_error and estimator -> error_estimator
* avoid nested Product and Lincomb Functionals and Functions
* linear optimization (dual solution, sensitivities, output gradient)
* Hendrik Kleikamp, hendrik.kleikamp@uni-muenster.de
* artificial neural networks for instationary problems
......
......@@ -91,6 +91,10 @@ Bibliography
Hierarchical Approximate Proper Orthogonal Decomposition,
SIAM J. Sci. Comput. 40, A3267-A3292, 2018.
.. [HPUU09] M. Hinze, R. Pinnau, M. Ulbrich, S. Ulbrich,
Optimization with PDE constraints,
Springer Netherlands, 2009.
.. [HU18] J. Hesthaven, S. Ubbiali,
Non-intrusive reduced order modeling of nonlinear problems using neural networks,
Journal of Computational Physics, 363, pp. 55-78, 2018.
......
......@@ -2,8 +2,11 @@
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)
import numpy as np
from pymor.algorithms.timestepping import TimeStepper
from pymor.models.interface import Model
from pymor.operators.block import BlockOperatorBase
from pymor.operators.constructions import IdentityOperator, VectorOperator, ZeroOperator
from pymor.vectorarrays.interface import VectorArray
from pymor.vectorarrays.numpy import NumpyVectorSpace
......@@ -84,6 +87,66 @@ class StationaryModel(Model):
def _compute_solution(self, mu=None, **kwargs):
return self.operator.apply_inverse(self.rhs.as_range_array(mu), mu=mu)
def _compute_solution_d_mu_single_direction(self, parameter, index, solution, mu):
lhs_d_mu = self.operator.d_mu(parameter, index).apply(solution, mu=mu)
rhs_d_mu = self.rhs.d_mu(parameter, index).as_range_array(mu)
rhs = rhs_d_mu - lhs_d_mu
return self.operator.jacobian(solution, mu=mu).apply_inverse(rhs)
_compute_allowed_kwargs = frozenset({'use_adjoint'})
def _compute_output_d_mu(self, solution, mu, return_array=False, use_adjoint=None):
"""compute the gradient of the output functional w.r.t. the parameters
Parameters
----------
solution
Internal model state for the given |Parameter value|
mu
|Parameter value| for which to compute the gradient
return_array
if `True`, return the output gradient as a |NumPy array|.
Otherwise, return a dict of gradients for each |Parameter|.
use_adjoint
if `None` use standard approach, if `True`, use
the adjoint solution for a more efficient way of computing the gradient.
See Section 1.6.2 in [HPUU09]_ for more details.
So far, the adjoint approach is only valid for linear models.
Returns
-------
The gradient as a |NumPy array| or a dict of |NumPy arrays|.
"""
if use_adjoint is None:
use_adjoint = True if (self.output_functional.linear and self.operator.linear) else False
if not use_adjoint:
return super()._compute_output_d_mu(solution, mu, return_array)
else:
assert self.output_functional is not None
assert self.operator.linear
assert self.output_functional.linear
dual_solutions = self.operator.range.empty()
for d in range(self.output_functional.range.dim):
dual_problem = self.with_(operator=self.operator.H, rhs=self.output_functional.H.as_range_array(mu)[d])
dual_solutions.append(dual_problem.solve(mu))
gradients = [] if return_array else {}
for (parameter, size) in self.parameters.items():
array = np.empty(shape=(size,self.output_functional.range.dim))
for index in range(size):
output_partial_dmu = self.output_functional.d_mu(parameter, index).apply(solution,
mu=mu).to_numpy()[0]
lhs_d_mu = self.operator.d_mu(parameter, index).apply2(dual_solutions, solution, mu=mu)[:,0]
rhs_d_mu = self.rhs.d_mu(parameter, index).apply_adjoint(dual_solutions, mu=mu).to_numpy()[:,0]
array[index] = output_partial_dmu + rhs_d_mu - lhs_d_mu
if return_array:
gradients.extend(array)
else:
gradients[parameter] = array
if return_array:
return np.array(gradients)
else:
return gradients
class InstationaryModel(Model):
"""Generic class for models of instationary problems.
......
......@@ -48,9 +48,9 @@ class Model(CacheableObject, ParametricObject):
self.__auto_init(locals())
def _compute(self, solution=False, output=False,
def _compute(self, solution=False, output=False, solution_d_mu=False, output_d_mu=False,
solution_error_estimate=False, output_error_estimate=False,
mu=None, **kwargs):
output_d_mu_return_array=False, mu=None, **kwargs):
return {}
def _compute_solution(self, mu=None, **kwargs):
......@@ -109,6 +109,86 @@ class Model(CacheableObject, ParametricObject):
else:
return self.output_functional.apply(solution, mu=mu).to_numpy()
def _compute_solution_d_mu_single_direction(self, parameter, index, solution, mu=None, **kwargs):
"""Compute the partial derivative of the solution w.r.t. a parameter index
Parameters
----------
parameter
parameter for which to compute the sensitivity
index
parameter index for which to compute the sensitivity
solution
Internal model state for the given |Parameter value|.
mu
|Parameter value| for which to solve
Returns
-------
The sensitivity of the solution as a |VectorArray|.
"""
raise NotImplementedError
def _compute_solution_d_mu(self, solution, mu=None, **kwargs):
"""Compute all partial derivative of the solution w.r.t. a parameter index
Parameters
----------
solution
Internal model state for the given |Parameter value|.
mu
|Parameter value| for which to solve
Returns
-------
A dict of all partial sensitivities of the solution.
"""
sensitivities = {}
for (parameter, size) in self.parameters.items():
sens_for_param = self.solution_space.empty()
for l in range(size):
sens_for_param.append(self._compute_solution_d_mu_single_direction(
parameter, l, solution, mu))
sensitivities[parameter] = sens_for_param
return sensitivities
def _compute_output_d_mu(self, solution, mu=None, return_array=False, **kwargs):
"""compute the gradient w.r.t. the parameter of the output functional
Parameters
----------
solution
Internal model state for the given |Parameter value|.
mu
|Parameter value| for which to compute the gradient
return_array
if `True`, return the output gradient as a |NumPy array|.
Otherwise, return a dict of gradients for each |Parameter|.
Returns
-------
The gradient as a |NumPy array| or a dict of |NumPy arrays|.
"""
assert self.output_functional is not None
U_d_mus = self._compute_solution_d_mu(solution, mu)
gradients = [] if return_array else {}
for (parameter, size) in self.parameters.items():
array = np.empty(shape=(size,self.output_functional.range.dim))
for index in range(size):
output_partial_dmu = self.output_functional.d_mu(parameter, index).apply(
solution, mu=mu).to_numpy()[0]
U_d_mu = U_d_mus[parameter][index]
array[index] = output_partial_dmu + self.output_functional.jacobian(
solution, mu).apply(U_d_mu, mu).to_numpy()[0]
if return_array:
gradients.extend(array)
else:
gradients[parameter] = array
if return_array:
return np.array(gradients)
else:
return gradients
def _compute_solution_error_estimate(self, solution, mu=None, **kwargs):
"""Compute an error estimate for the computed internal state.
......@@ -179,9 +259,9 @@ class Model(CacheableObject, ParametricObject):
_compute_allowed_kwargs = frozenset()
def compute(self, solution=False, output=False,
solution_error_estimate=False, output_error_estimate=False, *,
mu=None, **kwargs):
def compute(self, solution=False, output=False, solution_d_mu=False, output_d_mu=False,
solution_error_estimate=False, output_error_estimate=False,
output_d_mu_return_array=False, *, mu=None, **kwargs):
"""Compute the solution of the model and associated quantities.
This methods computes the output of the model it's internal state
......@@ -205,10 +285,19 @@ class Model(CacheableObject, ParametricObject):
If `True`, return the model's internal state.
output
If `True`, return the model output.
solution_d_mu
If not `False`, either `True` to return the derivative of the model's
internal state w.r.t. all parameter components or a tuple `(parameter, index)`
to return the derivative of a single parameter component.
output_d_mu
If `True`, return the gradient of the model output w.r.t. the |Parameter|.
solution_error_estimate
If `True`, return an error estimate for the computed internal state.
output_error_estimate
If `True`, return an error estimate for the computed output.
output_d_mu_return_array
if `True`, return the output gradient as a |NumPy array|.
Otherwise, return a dict of gradients for each |Parameter|.
mu
|Parameter values| for which to compute the values.
kwargs
......@@ -235,12 +324,14 @@ class Model(CacheableObject, ParametricObject):
# first call _compute to give subclasses more control
data = self._compute(solution=solution, output=output,
solution_d_mu=solution_d_mu, output_d_mu=output_d_mu,
solution_error_estimate=solution_error_estimate,
output_error_estimate=output_error_estimate,
mu=mu, **kwargs)
if (solution or output or solution_error_estimate or output_error_estimate) and \
'solution' not in data:
if (solution or output or solution_error_estimate or
output_error_estimate or solution_d_mu or output_d_mu) \
and 'solution' not in data:
retval = self.cached_method_call(self._compute_solution, mu=mu, **kwargs)
if isinstance(retval, dict):
assert 'solution' in retval
......@@ -257,6 +348,29 @@ class Model(CacheableObject, ParametricObject):
else:
data['output'] = retval
if solution_d_mu and 'solution_d_mu' not in data:
if isinstance(solution_d_mu, tuple):
retval = self._compute_solution_d_mu_single_direction(
solution_d_mu[0], solution_d_mu[1], data['solution'], mu=mu, **kwargs)
else:
retval = self._compute_solution_d_mu(data['solution'], mu=mu, **kwargs)
# retval is always a dict
if isinstance(retval, dict) and 'solution_d_mu' in retval:
data.update(retval)
else:
data['solution_d_mu'] = retval
if output_d_mu and 'output_d_mu' not in data:
# TODO use caching here (requires skipping args in key generation)
retval = self._compute_output_d_mu(data['solution'], mu=mu,
return_array=output_d_mu_return_array,
**kwargs)
# retval is always a dict
if isinstance(retval, dict) and 'output_d_mu' in retval:
data.update(retval)
else:
data['output_d_mu'] = retval
if solution_error_estimate and 'solution_error_estimate' not in data:
# TODO use caching here (requires skipping args in key generation)
retval = self._compute_solution_error_estimate(data['solution'], mu=mu, **kwargs)
......@@ -349,6 +463,52 @@ class Model(CacheableObject, ParametricObject):
else:
return data['output']
def solve_d_mu(self, parameter, index, mu=None, **kwargs):
"""Solve for the partial derivative of the solution w.r.t. a parameter index
Parameters
----------
parameter
parameter for which to compute the sensitivity
index
parameter index for which to compute the sensitivity
mu
|Parameter value| for which to solve
Returns
-------
The sensitivity of the solution as a |VectorArray|.
"""
data = self.compute(
solution_d_mu=(parameter, index),
mu=mu,
**kwargs
)
return data['solution_d_mu']
def output_d_mu(self, mu=None, return_array=False, **kwargs):
"""compute the gradient w.r.t. the parameter of the output functional
Parameters
----------
mu
|Parameter value| for which to compute the gradient
return_array
if `True`, return the output gradient as a |NumPy array|.
Otherwise, return a dict of gradients for each |Parameter|.
Returns
-------
The gradient as a |NumPy array| or a dict of |NumPy arrays|.
"""
data = self.compute(
output_d_mu=True,
mu=mu,
output_d_mu_return_array=return_array,
**kwargs
)
return data['output_d_mu']
def estimate_error(self, mu=None, **kwargs):
"""Estimate the error for the computed internal state.
......
......@@ -119,6 +119,12 @@ class BlockOperatorBase(Operator):
blocks = [process_col(col, space) for col, space in zip(self.blocks.T, subspaces)]
return self.source.make_array(blocks) if self.blocked_source else blocks[0]
def d_mu(self, parameter, index=0):
blocks = np.empty(self.blocks.shape, dtype=object)
for (i, j) in np.ndindex(self.blocks.shape):
blocks[i, j] = self.blocks[i, j].d_mu(parameter, index)
return self.with_(blocks=blocks)
class BlockOperator(BlockOperatorBase):
"""A matrix of arbitrary |Operators|.
......
#!/usr/bin/env python
# This file is part of the pyMOR project (http://www.pymor.org).
# Copyright 2013-2020 pyMOR developers and contributors. All rights reserved.
# License: BSD 2-Clause License (http://opensource.org/licenses/BSD-2-Clause)
import numpy as np
from typer import Argument, run
from pymor.basic import *
def main(
grid_intervals: int = Argument(..., help='Grid interval count.'),
training_samples: int = Argument(..., help='Number of samples used for training the reduced basis.')
):
"""Example script for solving linear PDE-constrained parameter optimization problems"""
fom, mu_bar = create_fom(grid_intervals)
parameter_space = fom.parameters.space(0, np.pi)
ranges = parameter_space.ranges['diffusion']
initial_guess = fom.parameters.parse([0.25, 0.5])
def fom_objective_functional(mu):
return fom.output(mu)
def fom_gradient_of_functional(mu):
return fom.output_d_mu(fom.parameters.parse(mu), return_array=True, use_adjoint=True)
from functools import partial
from scipy.optimize import minimize
from time import perf_counter
opt_fom_minimization_data = {'num_evals': 0,
'evaluations' : [],
'evaluation_points': [],
'time': np.inf}
tic = perf_counter()
opt_fom_result = minimize(partial(record_results, fom_objective_functional,
fom.parameters.parse, opt_fom_minimization_data),
initial_guess.to_numpy(),
method='L-BFGS-B',
jac=fom_gradient_of_functional,
bounds=(ranges, ranges),
options={'ftol': 1e-15})
opt_fom_minimization_data['time'] = perf_counter()-tic
reference_mu = opt_fom_result.x
from pymor.algorithms.greedy import rb_greedy
from pymor.reductors.coercive import CoerciveRBReductor
from pymor.parameters.functionals import MinThetaParameterFunctional
coercivity_estimator = MinThetaParameterFunctional(fom.operator.coefficients, mu_bar)
training_set = parameter_space.sample_uniformly(training_samples)
training_set_simple = [mu['diffusion'] for mu in training_set]
RB_reductor = CoerciveRBReductor(fom, product=fom.energy_product, coercivity_estimator=coercivity_estimator)
RB_greedy_data = rb_greedy(fom, RB_reductor, training_set, atol=1e-2)
rom = RB_greedy_data['rom']
def rom_objective_functional(mu):
return rom.output(mu)
def rom_gradient_of_functional(mu):
return rom.output_d_mu(fom.parameters.parse(mu), return_array=True, use_adjoint=True)
opt_rom_minimization_data = {'num_evals': 0,
'evaluations' : [],
'evaluation_points': [],
'time': np.inf,
'offline_time': RB_greedy_data['time']}
tic = perf_counter()
opt_rom_result = minimize(partial(record_results, rom_objective_functional, fom.parameters.parse,
opt_rom_minimization_data),
initial_guess.to_numpy(),
method='L-BFGS-B',
jac=rom_gradient_of_functional,
bounds=(ranges, ranges),
options={'ftol': 1e-15})
opt_rom_minimization_data['time'] = perf_counter()-tic
print("\nResult of optimization with FOM model and adjoint gradient")
report(opt_fom_result, fom.parameters.parse, opt_fom_minimization_data, reference_mu)
print("Result of optimization with ROM model and adjoint gradient")
report(opt_rom_result, fom.parameters.parse, opt_rom_minimization_data, reference_mu)
def create_fom(grid_intervals, vector_valued_output=False):
domain = RectDomain(([-1,-1], [1,1]))
indicator_domain = ExpressionFunction(
'(-2/3. <= x[..., 0]) * (x[..., 0] <= -1/3.) * (-2/3. <= x[..., 1]) * (x[..., 1] <= -1/3.) * 1. \
+ (-2/3. <= x[..., 0]) * (x[..., 0] <= -1/3.) * (1/3. <= x[..., 1]) * (x[..., 1] <= 2/3.) * 1.',
dim_domain=2, shape_range=())
rest_of_domain = ConstantFunction(1, 2) - indicator_domain
f = ExpressionFunction('0.5*pi*pi*cos(0.5*pi*x[..., 0])*cos(0.5*pi*x[..., 1])', dim_domain=2, shape_range=())
parameters = {'diffusion': 2}
thetas = [ExpressionParameterFunctional('1.1 + sin(diffusion[0])*diffusion[1]', parameters,
derivative_expressions={'diffusion': ['cos(diffusion[0])*diffusion[1]',
'sin(diffusion[0])']}),
ExpressionParameterFunctional('1.1 + sin(diffusion[1])', parameters,
derivative_expressions={'diffusion': ['0',
'cos(diffusion[1])']}),
]
diffusion = LincombFunction([rest_of_domain, indicator_domain], thetas)
theta_J = ExpressionParameterFunctional('1 + 1/5 * diffusion[0] + 1/5 * diffusion[1]', parameters,
derivative_expressions={'diffusion': ['1/5','1/5']})
if vector_valued_output:
problem = StationaryProblem(domain, f, diffusion, outputs=[('l2', f * theta_J), ('l2', f *0.5* theta_J)])
else:
problem = StationaryProblem(domain, f, diffusion, outputs=[('l2', f * theta_J)])
print('Discretize ...')
mu_bar = problem.parameters.parse([np.pi/2,np.pi/2])
fom, _ = discretize_stationary_cg(problem, diameter=1. / grid_intervals, mu_energy_product=mu_bar)
return fom, mu_bar
def record_results(function, parse, data, mu):
QoI = function(mu)
data['num_evals'] += 1
data['evaluation_points'].append(parse(mu).to_numpy())
data['evaluations'].append(QoI[0])
print('.', end='')
return QoI
def report(result, parse, data, reference_mu):
if (result.status != 0):
print('\n failed!')
else:
print('\n succeeded!')
print(' mu_min: {}'.format(parse(result.x)))
print(' J(mu_min): {}'.format(result.fun[0]))
print(' absolute error w.r.t. reference solution: {:.2e}'.format(np.linalg.norm(result.x-reference_mu)))
print(' num iterations: {}'.format(result.nit))
print(' num function calls: {}'.format(data['num_evals']))
print(' time: {:.5f} seconds'.format(data['time']))
if 'offline_time' in data:
print(' offline time: {:.5f} seconds'.format(data['offline_time']))
print('')
if __name__ == '__main__':
run(main)
......@@ -44,6 +44,7 @@ DISCRETIZATION_ARGS = (
('burgers', ['--num-flux=lax_friedrichs', '0.1']),
('burgers', ['--num-flux=engquist_osher', '0.1']),
('burgers', ['--num-flux=simplified_engquist_osher', '0.1']),
('linear_optimization', [40, 20]),
('parabolic', ['heat', 1]),
('parabolic', ['heat', '--rect', 1]),
('parabolic', ['heat', '--fv', 1]),
......
......@@ -277,3 +277,49 @@ def test_d_mu_of_LincombOperator():
assert hes_nu_mu_1 == [0. ,0. ,0]
assert hes_nu_mu_2 == [0. ,0. ,0]
assert hes_nu_nu == [0. ,0. ,-0]
def test_output_d_mu():
from pymordemos.linear_optimization import create_fom
grid_intervals = 10
training_samples = 3
fom, mu_bar = create_fom(grid_intervals, vector_valued_output=True)
easy_fom, _ = create_fom(grid_intervals, vector_valued_output=False)
parameter_space = fom.parameters.space(0, np.pi)
training_set = parameter_space.sample_uniformly(training_samples)
#verifying that the adjoint and sensitivity gradients are the same and that solve_d_mu works
for mu in training_set:
gradient_with_adjoint_approach = fom.output_d_mu(mu, return_array=True, use_adjoint=True)
gradient_with_sensitivities = fom.output_d_mu(mu, return_array=True, use_adjoint=False)
assert np.allclose(gradient_with_adjoint_approach, gradient_with_sensitivities)
u_d_mu = fom.solve_d_mu('diffusion', 1, mu=mu).to_numpy()
u_d_mu_ = fom.compute(solution_d_mu=True, mu=mu)['solution_d_mu']['diffusion'][1].to_numpy()
assert np.allclose(u_d_mu, u_d_mu_)
# test the complex case
complex_fom = easy_fom.with_(operator=easy_fom.operator.with_(
operators=[op* (1+2j) for op in easy_fom.operator.operators]))
complex_gradient_adjoint = complex_fom.output_d_mu(mu, return_array=True, use_adjoint=True)
complex_gradient = complex_fom.output_d_mu(mu, return_array=True, use_adjoint=False)
assert np.allclose(complex_gradient_adjoint, complex_gradient)
complex_fom = easy_fom.with_(output_functional=easy_fom.output_functional.with_(
operators=[op* (1+2j) for op in easy_fom.output_functional.operators]))
complex_gradient_adjoint = complex_fom.output_d_mu(mu, return_array=True, use_adjoint=True)
complex_gradient = complex_fom.output_d_mu(mu, return_array=True, use_adjoint=False)
assert np.allclose(complex_gradient_adjoint, complex_gradient)
# another fom to test the 3d case
ops, coefs = fom.operator.operators, fom.operator.coefficients
ops += (fom.operator.operators[1],)
coefs += (ProjectionParameterFunctional('nu', 1, 0),)
fom_ = fom.with_(operator=LincombOperator(ops, coefs))
parameter_space = fom_.parameters.space(0, np.pi)
training_set = parameter_space.sample_uniformly(training_samples)
for mu in training_set:
gradient_with_adjoint_approach = fom_.output_d_mu(mu, return_array=True, use_adjoint=True)
gradient_with_sensitivities = fom_.output_d_mu(mu, return_array=True, use_adjoint=False)
assert np.allclose(gradient_with_adjoint_approach, gradient_with_sensitivities)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment