-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
797 additions
and
660 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from .base import (BaseRobustTransformation, GeneratorTransformation, | ||
NominalTransformation, UnknownTransformation) | ||
from .ellipsoidal import EllipsoidalTransformation | ||
from .polyhedral import PolyhedralTransformation | ||
from .gp import GPTransformation | ||
from .warpedgp import WGPTransformation |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
from pyomo.environ import (Constraint, | ||
Var, | ||
Objective, | ||
Block, | ||
maximize, | ||
minimize) | ||
from pyomo.core import Transformation, TransformationFactory | ||
from pyomo.repn import generate_standard_repn | ||
from romodel.visitor import _expression_is_uncertain | ||
from romodel.generator import RobustConstraint | ||
from itertools import chain | ||
from romodel.util import collect_uncparam | ||
from pyomo.core.expr.visitor import replace_expressions | ||
|
||
|
||
class BaseRobustTransformation(Transformation): | ||
def __init__(self): | ||
self._fixed_unc_params = [] | ||
self._fixed_components = {} | ||
|
||
def fix_component(self, instance, component=Var): | ||
fixed = [] | ||
for c in instance.component_data_objects(component, active=True): | ||
if not c.is_fixed(): | ||
c.fix() | ||
fixed.append(c) | ||
self._fixed_components[component] = fixed | ||
|
||
def unfix_component(self, component=Var): | ||
for c in self._fixed_components[component]: | ||
c.unfix() | ||
self._fixed_components[component] = None | ||
|
||
def get_uncertain_components(self, instance, component=Constraint): | ||
""" Return all uncertain components of type `component`. """ | ||
comp_list = instance.component_data_objects(component, active=True) | ||
for c in comp_list: | ||
if hasattr(c, 'body'): | ||
# Constraints | ||
expr = c.body | ||
else: | ||
# Objective | ||
expr = c.expr | ||
|
||
if _expression_is_uncertain(expr): | ||
yield c | ||
|
||
def generate_repn_param(self, cdata): | ||
# Get instance | ||
instance = self._instance | ||
# Fix variables | ||
self.fix_component(instance, component=Var) | ||
if hasattr(cdata, 'body'): | ||
expr = cdata.body | ||
else: | ||
expr = cdata.expr | ||
repn = generate_standard_repn(expr, compute_values=False) | ||
self.unfix_component(component=Var) | ||
return repn | ||
|
||
def _apply_to(self, instance, **kwargs): | ||
self._instance = instance | ||
for c in chain(self.get_uncertain_components(instance), | ||
self.get_uncertain_components(instance, | ||
component=Objective)): | ||
# Collect unc. parameter and unc. set | ||
param = collect_uncparam(c) | ||
uncset = param._uncset | ||
assert uncset is not None, ("No uncertainty set provided for " | ||
"uncertain parameter {}." | ||
.format(param.name)) | ||
|
||
# Check if uncertainty set is empty | ||
assert not uncset.is_empty(), ("{} does not have any " | ||
"constraints.".format(uncset.name)) | ||
# Check if reformulation is applicable to constraint & UncSet | ||
if self._check_applicability(uncset): | ||
# Check constraint | ||
if c.ctype is Constraint: | ||
self._check_constraint(c) | ||
else: | ||
self._check_objective(c) | ||
|
||
counterpart = Block() | ||
setattr(instance, c.name + '_counterpart', counterpart) | ||
self._reformulate(c, param, uncset, counterpart, **kwargs) | ||
|
||
c.deactivate() | ||
|
||
def _reformulate(self, c, param, uncset, counterpart, **kwargs): | ||
raise NotImplementedError | ||
|
||
def _check_applicability(self, c): | ||
raise NotImplementedError | ||
|
||
def _check_constraint(self, c): | ||
return True | ||
|
||
def _check_objective(self, c): | ||
return True | ||
|
||
|
||
@TransformationFactory.register('romodel.generators', | ||
doc=("Replace uncertain constraints by" | ||
" cutting plane generators")) | ||
class GeneratorTransformation(BaseRobustTransformation): | ||
""" Replace all uncertain constraints by RobustConstraint objects. """ | ||
def _apply_to(self, instance): | ||
self._instance = instance | ||
cons = self.get_uncertain_components(instance) | ||
objs = self.get_uncertain_components(instance, component=Objective) | ||
|
||
tdata = instance._transformation_data['romodel.generators'] | ||
tdata.generators = [] | ||
|
||
for c in cons: | ||
generator = RobustConstraint() | ||
setattr(instance, c.name + '_generator', generator) | ||
|
||
generator.build(c.lower, c.body, c.upper) | ||
tdata.generators.append(generator) | ||
|
||
c.deactivate() | ||
|
||
for o in objs: | ||
generator = RobustConstraint() | ||
setattr(instance, o.name + '_epigraph', Var()) | ||
epigraph = getattr(instance, o.name + '_epigraph') | ||
setattr(instance, o.name + '_generator', generator) | ||
|
||
if o.is_minimizing(): | ||
generator.build(None, o.expr - epigraph, 0) | ||
setattr(instance, | ||
o.name + '_new', | ||
Objective(expr=epigraph, sense=minimize)) | ||
else: | ||
generator.build(0, o.expr - epigraph, None) | ||
setattr(instance, | ||
o.name + '_new', | ||
Objective(expr=epigraph, sense=maximize)) | ||
|
||
tdata.generators.append(generator) | ||
|
||
o.deactivate() | ||
|
||
def get_generator(self, c): | ||
pass | ||
|
||
def get_uncset(self, c): | ||
pass | ||
|
||
|
||
@TransformationFactory.register('romodel.nominal', | ||
doc="Transform robust to nominal model.") | ||
class NominalTransformation(BaseRobustTransformation): | ||
def _apply_to(self, instance): | ||
cons = self.get_uncertain_components(instance) | ||
objs = self.get_uncertain_components(instance, component=Objective) | ||
|
||
smap = {} | ||
|
||
for c in cons: | ||
param = collect_uncparam(c) | ||
for i in param: | ||
smap[id(param[i])] = param[i].nominal | ||
body_nominal = replace_expressions(c.body, smap) | ||
c.set_value((c.lower, body_nominal, c.upper)) | ||
|
||
for o in objs: | ||
param = collect_uncparam(o) | ||
for i in param: | ||
smap[id(param[i])] = param[i].nominal | ||
expr_nominal = replace_expressions(o.expr, smap) | ||
o.expr = expr_nominal | ||
|
||
|
||
@TransformationFactory.register('romodel.unknown', | ||
doc="Check for unknown uncertainty sets.") | ||
class UnknownTransformation(BaseRobustTransformation): | ||
def _apply_to(self, instance): | ||
for c in chain(self.get_uncertain_components(instance), | ||
self.get_uncertain_components(instance, | ||
component=Objective)): | ||
# Collect UncParam and UncSet | ||
param = collect_uncparam(c) | ||
uncset = param.uncset | ||
raise RuntimeError("Cannot reformulate UncSet with unknown " | ||
"geometry: {}".format(uncset.name)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
from pyomo.environ import Constraint, Var, quicksum, sqrt, Objective, Block | ||
from romodel.reformulate import BaseRobustTransformation | ||
from pyomo.core import TransformationFactory | ||
from pyomo.repn import generate_standard_repn | ||
from romodel.uncset import UncSet, EllipsoidalSet | ||
import numpy as np | ||
|
||
|
||
@TransformationFactory.register('romodel.ellipsoidal', | ||
doc="Ellipsoidal Counterpart") | ||
class EllipsoidalTransformation(BaseRobustTransformation): | ||
def _check_applicability(self, uncset): | ||
""" | ||
Returns `True` if the reformulation is applicable to `uncset` | ||
uncset: UncSet | ||
""" | ||
# Check for library set | ||
if uncset.__class__ == EllipsoidalSet: | ||
return True | ||
# Check generic set | ||
elif uncset.__class__ == UncSet: | ||
first_constraint = True | ||
is_ellipsoidal = False | ||
for c in uncset.component_data_objects(Constraint, active=True): | ||
# make sure set has only one constraint | ||
if first_constraint: | ||
first_constraint = False | ||
else: | ||
return False | ||
# Check if constraint is ellipsoidal | ||
repn = generate_standard_repn(c.body) | ||
if not repn.is_quadratic(): | ||
return False | ||
# TODO: assumes implicitly that there is one UncParam per UncSet | ||
param = repn.quadratic_vars[0][0].parent_component() | ||
# Collect covariance matrix and mean | ||
quadratic_coefs = {(id(x[0]), id(x[1])): c for x, c in | ||
zip(repn.quadratic_vars, repn.quadratic_coefs)} | ||
invcov = [[quadratic_coefs.get((id(param[i]), id(param[j])), 0) | ||
for i in param] for j in param] | ||
invcov = np.array(invcov) | ||
invcov = 1/2*(invcov + invcov.T) | ||
eig, _ = np.linalg.eig(invcov) | ||
cov = np.linalg.inv(invcov) | ||
mean = -1/2*np.matmul(cov, np.array(repn.linear_coefs)) | ||
uncset.mean = {x: mean[i] for i, x in enumerate(param)} | ||
uncset.cov = cov.tolist() | ||
uncset.invcov = invcov | ||
# TODO: need to check repn.constant == mean^T * cov * mean? | ||
|
||
is_ellipsoidal = ((c.has_ub() and np.all(eig > 0)) | ||
or (c.has_lb() and np.all(eig < 0))) | ||
|
||
return is_ellipsoidal | ||
|
||
def _check_constraint(self, c): | ||
""" | ||
Raise an error if the constraint is inappropriate for this | ||
reformulation | ||
c: Constraint | ||
""" | ||
assert not c.equality, ( | ||
"Currently can't handle equality constraints yet.") | ||
|
||
def _check_objective(self, o): | ||
""" | ||
Raise an error if the objective is inappropriate for this | ||
reformulation | ||
c: Objective | ||
""" | ||
pass | ||
|
||
def _reformulate(self, c, param, uncset, counterpart, root=False): | ||
""" | ||
Reformulate an uncertain constraint or objective | ||
c: Constraint or Objective | ||
param: UncParam | ||
uncset: UncSet | ||
counterpart: Block | ||
""" | ||
|
||
# Check constraint/objective | ||
repn = self.generate_repn_param(c) | ||
assert repn.is_linear(), ( | ||
"Constraint {} should be linear in " | ||
"unc. parameters".format(c.name)) | ||
|
||
# Generate robust counterpart | ||
det = quicksum(x[0]*x[1].nominal for x in zip(repn.linear_coefs, | ||
repn.linear_vars)) | ||
det += repn.constant | ||
param_var_dict = {id(param): var | ||
for param, var | ||
in zip(repn.linear_vars, repn.linear_coefs)} | ||
# padding = sqrt( var^T * cov^-1 * var ) | ||
padding = quicksum(param_var_dict[id(param[ind_i])] | ||
* uncset.cov[i][j] | ||
* param_var_dict[id(param[ind_j])] | ||
for i, ind_i in enumerate(param) | ||
for j, ind_j in enumerate(param)) | ||
if c.ctype is Constraint: | ||
# For upper bound: det + padding <= b | ||
if c.has_ub(): | ||
counterpart.upper = Block() | ||
if root: | ||
expr = det + sqrt(padding) <= c.upper | ||
robust = Constraint(expr=expr) | ||
else: | ||
counterpart.upper.padding = Var(bounds=(0, float('inf'))) | ||
pvar = counterpart.upper.padding | ||
robust = Constraint(expr=det + pvar <= c.upper()) | ||
deterministic = Constraint(expr=padding <= pvar**2) | ||
counterpart.upper.det = deterministic | ||
counterpart.upper.rob = robust | ||
# For lower bound: det - padding >= b | ||
if c.has_lb(): | ||
counterpart.lower = Block() | ||
if root: | ||
expr = det - sqrt(padding) >= c.lower | ||
robust = Constraint(expr=expr) | ||
else: | ||
counterpart.lower.padding = Var(bounds=(0, float('inf'))) | ||
pvar = counterpart.lower.padding | ||
robust = Constraint(expr=c.lower() <= det - pvar) | ||
deterministic = Constraint(expr=padding <= pvar**2) | ||
counterpart.lower.det = deterministic | ||
counterpart.lower.rob = robust | ||
else: | ||
# For minimization: min det + padding | ||
# For maximization: max det - padding | ||
sense = c.sense | ||
if root: | ||
expr = det + c.sense*sqrt(padding) | ||
robust = Objective(expr=expr, sense=sense) | ||
else: | ||
counterpart.padding = Var(bounds=(0, float('inf'))) | ||
pvar = counterpart.padding | ||
robust = Objective(expr=det + sense*pvar, sense=sense) | ||
deterministic = Constraint(expr=padding <= pvar**2) | ||
counterpart.det = deterministic | ||
counterpart.rob = robust | ||
|
Oops, something went wrong.