Source code for scenic.core.requirements

"""Support for hard and soft requirements."""

import enum
import inspect

from scenic.core.distributions import Samplable, needsSampling
from scenic.core.errors import InvalidScenarioError
from scenic.core.lazy_eval import needsLazyEvaluation
import scenic.syntax.relations as relations

[docs]@enum.unique class RequirementType(enum.Enum): # requirements which must hold during initial sampling require = 'require' requireAlways = 'require always' requireEventually = 'require eventually' # requirements used only during simulation terminateWhen = 'terminate when' terminateSimulationWhen = 'terminate simulation when' # recorded values, which aren't requirements but are handled similarly record = 'record' recordInitial = 'record initial' recordFinal = 'record final' @property def constrainsSampling(self): return self in (self.require, self.requireAlways)
class PendingRequirement: def __init__(self, ty, condition, line, prob, name, ego): self.ty = ty self.condition = condition self.line = line self.prob = prob self.name = name # the translator wrapped the requirement in a lambda to prevent evaluation, # so we need to save the current values of all referenced names; we save # the ego object too since it can be referred to implicitly self.bindings = getAllGlobals(condition) self.egoObject = ego def compile(self, namespace, scenario, syntax=None): """Create a closure testing the requirement in the correct runtime state. While we're at it, determine whether the requirement implies any relations we can use for pruning, and gather all of its dependencies. """ bindings, ego, line = self.bindings, self.egoObject, self.line condition, ty = self.condition, self.ty # Check whether requirement implies any relations used for pruning if ty.constrainsSampling and syntax: relations.inferRelationsFrom(syntax, bindings, ego, line) # Gather dependencies of the requirement deps = set() for value in bindings.values(): if needsSampling(value): deps.add(value) if needsLazyEvaluation(value): raise InvalidScenarioError(f'{ty} on line {line} uses value {value}' ' undefined outside of object definition') if ego is not None: assert isinstance(ego, Samplable) deps.add(ego) # Construct closure def closure(values): # rebind any names referring to sampled objects namespace = condition.__globals__ for name, value in bindings.items(): if value in values: namespace[name] = values[value] # rebind ego object, which can be referred to implicitly boundEgo = None if ego is None else values[ego] # evaluate requirement condition, reporting errors on the correct line import scenic.syntax.veneer as veneer with veneer.executeInRequirement(scenario, boundEgo): result = condition() assert not needsSampling(result) if needsLazyEvaluation(result): raise InvalidScenarioError(f'{ty} on line {line} uses value' ' undefined outside of object definition') return result return CompiledRequirement(self, closure, deps)
[docs]def getAllGlobals(req, restrictTo=None): """Find all names the given lambda depends on, along with their current bindings.""" namespace = req.__globals__ if restrictTo is not None and restrictTo is not namespace: return {} externals = inspect.getclosurevars(req) assert not externals.nonlocals # TODO handle these globs = dict(externals.builtins) for name, value in externals.globals.items(): globs[name] = value if inspect.isfunction(value): subglobs = getAllGlobals(value, restrictTo=namespace) for name, value in subglobs.items(): if name in globs: assert value is globs[name] else: globs[name] = value return globs
class CompiledRequirement: def __init__(self, pendingReq, closure, dependencies): self.ty = pendingReq.ty self.closure = closure self.line = pendingReq.line self.name = pendingReq.name self.prob = pendingReq.prob self.dependencies = dependencies @property def constrainsSampling(self): return self.ty.constrainsSampling def satisfiedBy(self, sample): return self.closure(sample) def __str__(self): if self.name: return self.name else: return f'"{self.ty.value}" on line {self.line}' class BoundRequirement: def __init__(self, compiledReq, sample): self.ty = compiledReq.ty self.closure = compiledReq.closure self.line = compiledReq.line self.name = compiledReq.name assert compiledReq.prob == 1 self.sample = sample def isTrue(self): return self.value() def value(self): return self.closure(self.sample) def __str__(self): if self.name: return self.name else: return f'"{self.ty.value}" on line {self.line}' class DynamicRequirement: def __init__(self, ty, condition, line, name=None): self.ty = ty self.line = line self.name = name import scenic.syntax.veneer as veneer scenario = veneer.currentScenario def closure(): with veneer.executeInScenario(scenario): return condition() self.closure = closure def isTrue(self): return self.value() def value(self): return self.closure() def __str__(self): if self.name: return self.name else: return f'"{self.ty.value}" on line {self.line}'