Source code for scenic.core.dynamics

"""Support for dynamic behaviors and modular scenarios."""

from collections import defaultdict
import enum
import inspect
import itertools
import sys
import types
import warnings

from scenic.core.distributions import Samplable, Options, toDistribution, needsSampling
from scenic.core.errors import RuntimeParseError, InvalidScenarioError
from scenic.core.lazy_eval import DelayedArgument, needsLazyEvaluation
from scenic.core.requirements import (RequirementType, PendingRequirement,
                                      DynamicRequirement)
from scenic.core.simulators import (RejectSimulationException, EndSimulationAction,
                                    EndScenarioAction)
from scenic.core.utils import argsToString, alarm
from scenic.core.workspaces import Workspace

# Utilities

[docs]class StuckBehaviorWarning(UserWarning): """Warning issued when a behavior/scenario may have gotten stuck. When a behavior or compose block of a modular scenario executes for a long time without yielding control, there is no way to tell whether it has entered an infinite loop with no take/wait statements, or is actually doing some long computation. But since forgetting a wait statement in a wait loop is an easy mistake, we raise this warning after a behavior/scenario has run for `stuckBehaviorWarningTimeout` seconds without yielding. """ pass
#: Timeout in seconds after which a `StuckBehaviorWarning` will be raised. stuckBehaviorWarningTimeout = 10 # Scenarios
[docs]class Invocable: """Abstract class with common code for behaviors and modular scenarios. Both of these types of objects can be called like functions, can have guards, and can suspend their own execution to invoke sub-behaviors/scenarios. """ def __init__(self, *args, **kwargs): if veneer.evaluatingGuard: raise RuntimeParseError( 'tried to invoke behavior/scenario from inside guard or interrupt condition') self._args = args self._kwargs = kwargs self._agent = None self._runningIterator = None self._isRunning = False def _start(self): assert not self._isRunning self._isRunning = True self._finalizeArguments() def _step(self): assert self._isRunning def _stop(self, reason=None): assert self._isRunning self._isRunning = False def _finalizeArguments(self): # Evaluate any lazy arguments whose evaluation was deferred until just before # this invocable starts running. args = [] for arg in self._args: if needsLazyEvaluation(arg): assert isinstance(arg, DelayedArgument) args.append(arg.evaluateInner(None)) else: args.append(arg) self._args = tuple(args) for name, arg in self._kwargs.items(): if needsLazyEvaluation(arg): assert isinstance(arg, DelayedArgument) self._kwargs[name] = arg.evaluateInner(None) def _invokeSubBehavior(self, agent, subs, modifier=None, schedule=None): def pickEnabledInvocable(opts): enabled = {} if isinstance(opts, dict): for sub, weight in opts.items(): if sub._isEnabledForAgent(agent): enabled[sub] = weight else: for sub in opts: if sub._isEnabledForAgent(agent): enabled[sub] = 1 if not enabled: raise RejectSimulationException('deadlock in "do choose/shuffle"') if len(enabled) == 1: choice = list(enabled)[0] else: choice = Options(enabled) return choice scheduler = None if schedule == 'choose': if len(subs) == 1 and isinstance(subs[0], dict): subs = subs[0] subs = (pickEnabledInvocable(subs),) elif schedule == 'shuffle': if len(subs) == 1 and isinstance(subs[0], dict): subs = subs[0] else: subs = {item: 1 for item in subs} def scheduler(): while subs: choice = pickEnabledInvocable(subs) subs.pop(choice) yield from self._invokeInner(agent, (choice,)) else: assert schedule is None if not scheduler: def scheduler(): yield from self._invokeInner(agent, subs) if modifier: if modifier.name == 'for': # do X for Y [seconds | steps] timeLimit = modifier.value if not isinstance(timeLimit, (float, int)): raise RuntimeParseError('"do X for Y" with Y not a number') assert modifier.terminator in (None, 'seconds', 'steps') if modifier.terminator != 'steps': timeLimit /= veneer.currentSimulation.timestep startTime = veneer.currentSimulation.currentTime condition = lambda: veneer.currentSimulation.currentTime - startTime >= timeLimit elif modifier.name == 'until': # do X until Y condition = modifier.value else: raise RuntimeError(f'internal parsing error: impossible modifier {modifier}') def body(behavior, agent): yield from scheduler() def handler(behavior, agent): for sub in subs: if sub._isRunning: sub._stop(f'"{modifier.name}" condition met') return BlockConclusion.ABORT yield from runTryInterrupt(self, agent, body, [condition], [handler]) else: yield from scheduler()
[docs] def _invokeInner(self, agent, subs): """Run the given sub-behavior/scenario(s) in parallel. Implemented by subclasses. """ raise NotImplementedError
def _checkAllPreconditions(self): self.checkPreconditions(self._agent, *self._args, **self._kwargs) self.checkInvariants(self._agent, *self._args, **self._kwargs) def _isEnabledForAgent(self, agent): assert not self._isRunning assert self._agent is None try: self._agent = agent # in case `self` is used in a precondition self._checkAllPreconditions() return True except GuardViolation: return False finally: self._agent = None
[docs]class DynamicScenario(Invocable): """Internal class for scenarios which can execute during dynamic simulations. Provides additional information complementing `Scenario`, which originally only supported static scenarios. The two classes should probably eventually be merged. """ def __init_subclass__(cls, *args, **kwargs): veneer.registerDynamicScenarioClass(cls) _requirementSyntax = None # overridden by subclasses _simulatorFactory = None _globalParameters = None _locals = () def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._ego = None self._workspace = None self._objects = [] # ordered for reproducibility self._externalParameters = [] self._pendingRequirements = defaultdict(list) self._requirements = [] self._requirementDeps = set() # things needing to be sampled to evaluate the requirements self._agents = [] self._monitors = [] self._behaviors = [] self._alwaysRequirements = [] self._eventuallyRequirements = [] self._terminationConditions = [] self._terminateSimulationConditions = [] self._recordedExprs = [] self._recordedInitialExprs = [] self._recordedFinalExprs = [] self._subScenarios = [] self._endWithBehaviors = False self._timeLimit = None self._timeLimitIsInSeconds = False self._prepared = False self._delayingPreconditionCheck = False self._dummyNamespace = None self._timeLimitInSteps = None # computed at simulation time self._elapsedTime = 0 self._eventuallySatisfied = None self._overrides = {} @classmethod def _dummy(cls, filename, namespace): scenario = cls() scenario._setup = None scenario._compose = None scenario._filename = filename # for debugging scenario._prepared = True scenario._dummyNamespace = namespace return scenario
[docs] @classmethod def _requiresArguments(cls): """Whether this scenario cannot be instantiated without arguments.""" if cls._setup: func = cls._setup elif cls._compose: func = cls._compose else: return True sig = inspect.signature(func) try: sig.bind(None, None) # first two arguments are added internally by Scenic return False except TypeError: return True
@property def ego(self): if self._ego is None: return DelayedArgument((), lambda context: self._ego, _internal=True) return self._ego @property def objects(self): return tuple(self._objects)
[docs] def _bindTo(self, scene): """Bind this scenario to a sampled scene when starting a new simulation.""" self._ego = scene.egoObject self._workspace = scene.workspace self._objects = list(scene.objects) self._agents = [obj for obj in scene.objects if obj.behavior is not None] self._alwaysRequirements = scene.alwaysRequirements self._eventuallyRequirements = scene.eventuallyRequirements self._terminationConditions = scene.terminationConditions self._terminateSimulationConditions = scene.terminateSimulationConditions self._recordedExprs = scene.recordedExprs self._recordedInitialExprs = scene.recordedInitialExprs self._recordedFinalExprs = scene.recordedFinalExprs
[docs] def _prepare(self, delayPreconditionCheck=False): """Prepare the scenario for execution, executing its setup block.""" assert not self._prepared self._prepared = True self._finalizeArguments() # TODO generalize _prepare for Invocable? veneer.prepareScenario(self) with veneer.executeInScenario(self, inheritEgo=True): # Check preconditions and invariants if delayPreconditionCheck: self._delayingPreconditionCheck = True else: self._checkAllPreconditions() # Execute setup block if self._setup is not None: assert not any(needsLazyEvaluation(arg) for arg in self._args) assert not any(needsLazyEvaluation(arg) for arg in self._kwargs.values()) self._setup(None, *self._args, **self._kwargs) veneer.finishScenarioSetup(self) # Extract requirements, scan for relations used for pruning, and create closures self._compileRequirements()
@classmethod def _bindGlobals(cls, globs): cls._globalParameters = globs
[docs] def _start(self): """Start the scenario, starting its compose block, behaviors, and monitors.""" super()._start() assert self._prepared # Check preconditions if they could not be checked earlier if self._delayingPreconditionCheck: self._checkAllPreconditions() # Compute time limit now that we know the simulation timestep self._elapsedTime = 0 self._timeLimitInSteps = self._timeLimit if self._timeLimitIsInSeconds: self._timeLimitInSteps /= veneer.currentSimulation.timestep # Keep track of which 'require eventually' conditions have been satisfied self._eventuallySatisfied = { req: False for req in self._eventuallyRequirements } veneer.startScenario(self) with veneer.executeInScenario(self): # Start compose block if self._compose is not None: if not inspect.isgeneratorfunction(self._compose): from scenic.syntax.translator import composeBlock raise RuntimeParseError(f'"{composeBlock}" does not invoke any scenarios') self._runningIterator = self._compose(None, *self._args, **self._kwargs) # Initialize behavior coroutines of agents for agent in self._agents: assert isinstance(agent.behavior, Behavior), agent.behavior agent.behavior._start(agent) # Initialize monitor coroutines for monitor in self._monitors: monitor._start()
[docs] def _step(self): """Execute the (already-started) scenario for one time step. Returns: `None` if the scenario will continue executing; otherwise a string describing why it has terminated. """ super()._step() # Check 'require always' and 'require eventually' conditions for req in self._alwaysRequirements: if not req.isTrue(): # always requirements should never be violated at time 0, since # they are enforced during scene sampling assert veneer.currentSimulation.currentTime > 0 raise RejectSimulationException(str(req)) for req in self._eventuallyRequirements: if not self._eventuallySatisfied[req] and req.isTrue(): self._eventuallySatisfied[req] = True # Check if we have reached the time limit, if any if self._timeLimitInSteps is not None and self._elapsedTime >= self._timeLimitInSteps: return self._stop('reached time limit') self._elapsedTime += 1 # Execute compose block, if any composeDone = False if self._runningIterator is None: composeDone = True # compose block ended in an earlier step else: def alarmHandler(signum, frame): if sys.gettrace(): return # skip the warning if we're in the debugger warnings.warn(f'the compose block of scenario {self} is taking a long time; ' 'maybe you have an infinite loop with no "wait" statement?', StuckBehaviorWarning) with veneer.executeInScenario(self), alarm(stuckBehaviorWarningTimeout, alarmHandler): try: result = self._runningIterator.send(None) if isinstance(result, (EndSimulationAction, EndScenarioAction)): return self._stop(result) except StopIteration: self._runningIterator = None composeDone = True # If there is a compose block and it has finished, we're done if self._compose is not None and composeDone: return self._stop('finished compose block') # Optionally end when all our agents' behaviors have ended if self._endWithBehaviors: if all(agent.behavior._isFinished for agent in self._agents): return self._stop('all behaviors finished') # Check if any termination conditions apply for req in self._terminationConditions: if req.isTrue(): return self._stop(req) # Scenario will not terminate yet return None
[docs] def _stop(self, reason, quiet=False): """Stop the scenario's execution, for the given reason.""" if not quiet: # Reject if we never satisfied a 'require eventually' for req in self._eventuallyRequirements: if not self._eventuallySatisfied[req] and not req.isTrue(): raise RejectSimulationException(str(req)) super()._stop(reason) veneer.endScenario(self, reason, quiet=quiet) for obj, oldVals in self._overrides.items(): obj._revert(oldVals) self._runningIterator = None return reason
def _invokeInner(self, agent, subs): for sub in subs: if not isinstance(sub, DynamicScenario): raise RuntimeParseError(f'expected a scenario, got {sub}') sub._prepare() sub._start() self._subScenarios = list(subs) while True: newSubs = [] for sub in self._subScenarios: terminationReason = sub._step() if isinstance(terminationReason, EndSimulationAction): yield terminationReason elif terminationReason is None: newSubs.append(sub) self._subScenarios = newSubs if not newSubs: return yield None def _evaluateRecordedExprs(self, ty): if ty is RequirementType.record: place = '_recordedExprs' elif ty is RequirementType.recordInitial: place = '_recordedInitialExprs' elif ty is RequirementType.recordFinal: place = '_recordedFinalExprs' else: assert False, 'invalid record type requested' return self._evaluateRecordedExprsAt(place) def _evaluateRecordedExprsAt(self, place): values = {} for rec in getattr(self, place): values[rec.name] = rec.value() for sub in self._subScenarios: subvals = sub._evaluateRecordedExprsAt(place) values.update(subvals) return values def _runMonitors(self): terminationReason = None for monitor in self._monitors: action = monitor._step() if isinstance(action, EndSimulationAction): terminationReason = action # do not exit early, since subsequent monitors could reject the simulation for sub in self._subScenarios: subreason = sub._runMonitors() if subreason is not None: terminationReason = subreason return terminationReason def _checkSimulationTerminationConditions(self): for req in self._terminateSimulationConditions: if req.isTrue(): return req return None @property def _allAgents(self): agents = list(self._agents) for sub in self._subScenarios: agents.extend(sub._allAgents) return agents def _inherit(self, other): if not self._workspace: self._workspace = other._workspace self._objects.extend(other._objects) self._agents.extend(other._agents) self._globalParameters.update(other._globalParameters) self._externalParameters.extend(other._externalParameters) self._requirements.extend(other._requirements) self._behaviors.extend(other._behaviors) def _registerObject(self, obj): self._objects.append(obj) if getattr(obj, 'behavior', None) is not None: self._agents.append(obj)
[docs] def _addRequirement(self, ty, reqID, req, line, name, prob): """Save a requirement defined at compile-time for later processing.""" assert reqID not in self._pendingRequirements preq = PendingRequirement(ty, req, line, prob, name, self._ego) self._pendingRequirements[reqID] = preq
[docs] def _addDynamicRequirement(self, ty, req, line, name): """Add a requirement defined during a dynamic simulation.""" assert ty is not RequirementType.require dreq = DynamicRequirement(ty, req, line, name) self._registerCompiledRequirement(dreq)
def _compileRequirements(self): namespace = self._dummyNamespace if self._dummyNamespace else self.__dict__ requirementSyntax = self._requirementSyntax assert requirementSyntax is not None for reqID, requirement in self._pendingRequirements.items(): syntax = requirementSyntax[reqID] if requirementSyntax else None compiledReq = requirement.compile(namespace, self, syntax) self._registerCompiledRequirement(compiledReq) self._requirementDeps.update(compiledReq.dependencies) def _registerCompiledRequirement(self, req): if req.ty is RequirementType.require: place = self._requirements elif req.ty is RequirementType.requireAlways: place = self._alwaysRequirements elif req.ty is RequirementType.requireEventually: place = self._eventuallyRequirements elif req.ty is RequirementType.terminateWhen: place = self._terminationConditions elif req.ty is RequirementType.terminateSimulationWhen: place = self._terminateSimulationConditions elif req.ty is RequirementType.record: place = self._recordedExprs elif req.ty is RequirementType.recordInitial: place = self._recordedInitialExprs elif req.ty is RequirementType.recordFinal: place = self._recordedFinalExprs else: raise RuntimeError(f'internal error: requirement {req} has unknown type!') place.append(req) def _setTimeLimit(self, timeLimit, inSeconds=True): self._timeLimit = timeLimit self._timeLimitIsInSeconds = inSeconds def _override(self, obj, specifiers): oldVals = obj._override(specifiers) if obj not in self._overrides: self._overrides[obj] = oldVals def _toScenario(self, namespace): assert self._prepared if self._ego is None and self._compose is None: msg = 'did not specify ego object' modScenarios = namespace['_scenarios'] if self._dummyNamespace and len(modScenarios) == 1: if modScenarios[0]._requiresArguments(): msg += ('\n(Note: this Scenic file contains a modular scenario, but it\n' 'cannot be used as the top-level scenario since it requires\n' 'arguments; so the whole file is being used as a top-level\n' 'scenario and needs an ego object.)') else: msg += ('\n(Note: this Scenic file contains a modular scenario, but also\n' 'other code; so the whole file is being used as a top-level\n' 'scenario and needs an ego object.)') raise InvalidScenarioError(msg) if not self._workspace: self._workspace = Workspace() # default empty workspace from scenic.core.scenarios import Scenario scenario = Scenario(self._workspace, self._simulatorFactory, self._objects, self._ego, self._globalParameters, self._externalParameters, self._requirements, self._requirementDeps, self._monitors, self._behaviorNamespaces, self) # TODO unify these! return scenario def __getattr__(self, name): if name in self._locals: return DelayedArgument((), lambda context: getattr(self, name), _internal=True) return object.__getattribute__(self, name) def __str__(self): if self._dummyNamespace: return 'top-level scenario' else: args = argsToString(self._args, self._kwargs) return f'{self.__class__.__name__}({args})'
# Behaviors
[docs]class Behavior(Invocable, Samplable): """Dynamic behaviors of agents. Behavior statements are translated into definitions of subclasses of this class. """ def __init_subclass__(cls): if cls.__module__ is not __name__ and veneer.currentScenario: veneer.currentScenario._behaviors.append(cls) def __init__(self, *args, **kwargs): args = tuple(toDistribution(arg) for arg in args) kwargs = { name: toDistribution(arg) for name, arg in kwargs.items() } # Validate arguments to the behavior sig = inspect.signature(self.makeGenerator) try: sig.bind(None, *args, **kwargs) except TypeError as e: raise RuntimeParseError(str(e)) from e Samplable.__init__(self, itertools.chain(args, kwargs.values())) Invocable.__init__(self, *args, **kwargs) if not inspect.isgeneratorfunction(self.makeGenerator): raise RuntimeParseError(f'{self} does not take any actions' ' (perhaps you forgot to use "take" or "do"?)') def sampleGiven(self, value): args = (value[arg] for arg in self._args) kwargs = { name: value[val] for name, val in self._kwargs.items() } return type(self)(*args, **kwargs) def _start(self, agent): super()._start() self._agent = agent self._runningIterator = self.makeGenerator(agent, *self._args, **self._kwargs) self._checkAllPreconditions() def _step(self): super()._step() assert self._runningIterator def alarmHandler(signum, frame): if sys.gettrace(): return # skip the warning if we're in the debugger warnings.warn(f'the behavior {self} is taking a long time to take an action; ' 'maybe you have an infinite loop with no take/wait statements?', StuckBehaviorWarning) with veneer.executeInBehavior(self), alarm(stuckBehaviorWarningTimeout, alarmHandler): try: actions = self._runningIterator.send(None) except StopIteration: actions = () # behavior ended early return actions def _stop(self, reason=None): super()._stop(reason) self._agent = None self._runningIterator = None @property def _isFinished(self): return self._runningIterator is None def _invokeInner(self, agent, subs): assert len(subs) == 1 sub = subs[0] if not isinstance(sub, Behavior): raise RuntimeParseError(f'expected a behavior, got {sub}') sub._start(agent) with veneer.executeInBehavior(sub): try: yield from sub._runningIterator finally: if sub._isRunning: sub._stop() def __repr__(self): items = itertools.chain( (repr(arg) for arg in self._args), (f'{key}={repr(val)}' for key, val in self._kwargs.items()) ) allArgs = ', '.join(items) return f'{self.__class__.__name__}({allArgs})'
def makeTerminationAction(line): assert not veneer.isActive() return EndSimulationAction(line) # Monitors
[docs]class Monitor(Behavior): """Monitors for dynamic simulations. Monitor statements are translated into definitions of subclasses of this class. """ def __init_subclass__(cls): super().__init_subclass__() veneer.currentScenario._monitors.append(cls()) def _start(self): return super()._start(None)
monitorPrefix = '_Scenic_monitor_' def functionForMonitor(name): return monitorPrefix + name def isAMonitorName(name): return name.startswith(monitorPrefix) def monitorName(name): return name[len(monitorPrefix):] # Guards
[docs]class GuardViolation(Exception): """Abstract exception raised when a guard of a behavior is violated. This will never be raised directly; either of the subclasses `PreconditionViolation` or `InvariantViolation` will be used, as appropriate. """ violationType = 'guard' def __init__(self, behavior, lineno): self.behaviorName = behavior.__class__.__name__ self.lineno = lineno def __str__(self): return f'violated {self.violationType} of {self.behaviorName} on line {self.lineno}'
[docs]class PreconditionViolation(GuardViolation): """Raised when a precondition is violated when invoking a behavior.""" violationType = 'precondition'
[docs]class InvariantViolation(GuardViolation): """Raised when an invariant is violated when invoking/resuming a behavior.""" violationType = 'invariant'
# Try-interrupt blocks def runTryInterrupt(behavior, agent, body, conditions, handlers): body = InterruptBlock(None, body) interrupts = [InterruptBlock(c, h) for c, h in zip(conditions, handlers)] while True: # find next block to run, if any block = body for interrupt in interrupts: if interrupt.isEnabled or interrupt.isRunning: block = interrupt break result, concluded = block.step(behavior, agent) if concluded: if result is BlockConclusion.FINISHED and block is not body: continue # interrupt handler finished else: return result # entire try-interrupt statement will terminate else: yield result behavior.checkInvariants(None, *behavior._args, **behavior._kwargs)
[docs]@enum.unique class BlockConclusion(enum.Enum): FINISHED = enum.auto() ABORT = enum.auto() RETURN = enum.auto() BREAK = enum.auto() CONTINUE = enum.auto() def __call__(self, value): self.return_value = value return self
class InterruptBlock: def __init__(self, condition, body): self.condition = condition self.body = body self.runningIterator = None @property def isEnabled(self): with veneer.executeInGuard(): result = self.condition() if isinstance(result, DelayedArgument): # Condition cannot yet be evaluated because it depends on a scenario # local not yet initialized; we consider it to be false. return False return bool(result) @property def isRunning(self): return self.runningIterator is not None def step(self, behavior, agent): if not self.runningIterator: it = self.body(behavior, agent) if not isinstance(it, types.GeneratorType): return (it, True) self.runningIterator = it try: result = self.runningIterator.send(None) return (result, False) except StopIteration as e: self.runningIterator = None return (e.value, True) import scenic.syntax.veneer as veneer