Source code for scenic.core.scenarios

"""Scenario and scene objects."""

import random
import time
import sys

from scenic.core.distributions import (Samplable, ConstantSamplable, RejectionException,
                                       needsSampling)
from scenic.core.lazy_eval import needsLazyEvaluation
from scenic.core.external_params import ExternalSampler
from scenic.core.regions import EmptyRegion
from scenic.core.vectors import Vector
from scenic.core.errors import InvalidScenarioError, optionallyDebugRejection
from scenic.core.dynamics import Behavior
from scenic.core.requirements import BoundRequirement
from scenic.core.serialization import dumpAsScenicCode

# Pickling support

class _ScenarioPickleMixin:
	def __getstate__(self):
		# Start the pickle with an object storing our global parameters and activating
		# the veneer with them; this will ensure they are available during import of
		# any needed Scenic modules (which might have been purged earlier, and in any
		# case won't already exist during unpickling). Similarly, tack a dummy object
		# on the end of the pickle which will deactivate the veneer and clean up.
		oldModules = []
		return (_Activator(self.params, oldModules), self.__dict__, _Deactivator(oldModules))

	def __setstate__(self, state):
		self.__dict__.update(state[1])

class _Activator:
	def __init__(self, params, oldModules):
		self.params = params
		# Save all modules already imported prior to pickling
		oldModules.extend(sys.modules.keys())

	def activate(self):
		import scenic.syntax.veneer as veneer
		assert not veneer.isActive()
		veneer.activate(paramOverrides=self.params)

	def __getstate__(self):
		# Step 1 (during pickling)
		self.activate()
		return self.__dict__

	def __setstate__(self, state):
		# Step 3 (during unpickling)
		self.__dict__.update(state)
		self.activate()

class _Deactivator:
	def __init__(self, oldModules):
		self.oldModules = oldModules

	def deactivate(self):
		import scenic.syntax.veneer as veneer
		veneer.deactivate()
		assert not veneer.isActive(), 'nested pickle of Scene/Scenario'
		# Purge Scenic modules imported during pickling
		from scenic.syntax.translator import purgeModulesUnsafeToCache
		purgeModulesUnsafeToCache(self.oldModules)

	def __getstate__(self):
		# Step 2 (during pickling)
		self.deactivate()
		return self.__dict__

	def __setstate__(self, state):
		# Step 4 (during unpickling)
		self.__dict__.update(state)
		self.deactivate()

# Scenes and scenarios

[docs]class Scene(_ScenarioPickleMixin): """Scene() A scene generated from a Scenic scenario. To run a dynamic simulation from a scene, create an instance of `Simulator` for the simulator you want to use, and pass the scene to its `simulate` method. Attributes: objects (tuple of :obj:`~scenic.core.object_types.Object`): All objects in the scene. The ``ego`` object is first. egoObject (:obj:`~scenic.core.object_types.Object`): The ``ego`` object. params (dict): Dictionary mapping the name of each global parameter to its value. workspace (:obj:`~scenic.core.workspaces.Workspace`): Workspace for the scenario. """ def __init__(self, workspace, objects, egoObject, params, alwaysReqs=(), eventuallyReqs=(), terminationConds=(), termSimulationConds=(), recordedExprs=(), recordedInitialExprs=(), recordedFinalExprs=(), monitors=(), behaviorNamespaces={}, dynamicScenario=None): self.workspace = workspace self.objects = tuple(objects) self.egoObject = egoObject self.params = params self.alwaysRequirements = tuple(alwaysReqs) self.eventuallyRequirements = tuple(eventuallyReqs) self.terminationConditions = tuple(terminationConds) self.terminateSimulationConditions = tuple(termSimulationConds) self.recordedExprs = tuple(recordedExprs) self.recordedInitialExprs = tuple(recordedInitialExprs) self.recordedFinalExprs = tuple(recordedFinalExprs) self.monitors = tuple(monitors) self.behaviorNamespaces = behaviorNamespaces self.dynamicScenario = dynamicScenario
[docs] def dumpAsScenicCode(self, stream=sys.stdout): """Dump Scenic code reproducing this scene to the given stream. .. note:: This function does not currently reproduce parts of the original Scenic program defining behaviors, functions, etc. used in the scene. Also, if the scene involves any user-defined types, they must provide a suitable :obj:`~object.__repr__` for this function to print them properly. Args: stream (:term:`text file`): Where to print the code (default `sys.stdout`). """ for name, value in self.params.items(): stream.write(f'param "{name}" = ') dumpAsScenicCode(value, stream) stream.write('\n') stream.write('ego = ') for obj in self.objects: dumpAsScenicCode(obj, stream) stream.write('\n')
[docs] def show(self, zoom=None, block=True): """Render a schematic of the scene for debugging.""" import matplotlib.pyplot as plt plt.gca().set_aspect('equal') # display map self.workspace.show(plt) # draw objects for obj in self.objects: obj.show(self.workspace, plt, highlight=(obj is self.egoObject)) # zoom in if requested if zoom: self.workspace.zoomAround(plt, self.objects, expansion=zoom) plt.show(block=block)
[docs]class Scenario(_ScenarioPickleMixin): """Scenario() A compiled Scenic scenario, from which scenes can be sampled. """ def __init__(self, workspace, simulator, objects, egoObject, params, externalParams, requirements, requirementDeps, monitors, behaviorNamespaces, dynamicScenario): self.workspace = workspace self.simulator = simulator # simulator for dynamic scenarios # make ego the first object, while otherwise preserving order ordered = [] for obj in objects: if obj is not egoObject: ordered.append(obj) self.objects = (egoObject,) + tuple(ordered) if egoObject else tuple(ordered) self.egoObject = egoObject self.params = dict(params) self.externalParams = tuple(externalParams) self.externalSampler = ExternalSampler.forParameters(self.externalParams, self.params) self.monitors = tuple(monitors) self.behaviorNamespaces = behaviorNamespaces self.dynamicScenario = dynamicScenario staticReqs, alwaysReqs, terminationConds = [], [], [] self.requirements = tuple(dynamicScenario._requirements) # TODO clean up self.alwaysRequirements = tuple(dynamicScenario._alwaysRequirements) self.eventuallyRequirements = tuple(dynamicScenario._eventuallyRequirements) self.terminationConditions = tuple(dynamicScenario._terminationConditions) self.terminateSimulationConditions = tuple(dynamicScenario._terminateSimulationConditions) self.initialRequirements = self.requirements + self.alwaysRequirements assert all(req.constrainsSampling for req in self.initialRequirements) self.recordedExprs = tuple(dynamicScenario._recordedExprs) self.recordedInitialExprs = tuple(dynamicScenario._recordedInitialExprs) self.recordedFinalExprs = tuple(dynamicScenario._recordedFinalExprs) # dependencies must use fixed order for reproducibility paramDeps = tuple(p for p in self.params.values() if isinstance(p, Samplable)) behaviorDeps = [] for namespace in self.behaviorNamespaces.values(): for value in namespace.values(): if isinstance(value, Samplable): behaviorDeps.append(value) self.dependencies = self.objects + paramDeps + tuple(requirementDeps) + tuple(behaviorDeps) self.validate() def containerOfObject(self, obj): if hasattr(obj, 'regionContainedIn') and obj.regionContainedIn is not None: return obj.regionContainedIn else: return self.workspace.region def validate(self): """Make some simple static checks for inconsistent built-in requirements. :meta private: """ objects = self.objects staticVisibility = self.egoObject and not needsSampling(self.egoObject.visibleRegion) staticBounds = [self.hasStaticBounds(obj) for obj in objects] for i in range(len(objects)): oi = objects[i] container = self.containerOfObject(oi) # Trivial case where container is empty if isinstance(container, EmptyRegion): raise InvalidScenarioError(f'Container region of {oi} is empty') # skip objects with unknown positions or bounding boxes if not staticBounds[i]: continue # Require object to be contained in the workspace/valid region if not needsSampling(container) and not container.containsObject(oi): raise InvalidScenarioError(f'Object at {oi.position} does not fit in container') # Require object to be visible from the ego object if staticVisibility and oi.requireVisible is True and oi is not self.egoObject: if not self.egoObject.canSee(oi): raise InvalidScenarioError(f'Object at {oi.position} is not visible from ego') if not oi.allowCollisions: # Require object to not intersect another object for j in range(i): oj = objects[j] if oj.allowCollisions or not staticBounds[j]: continue if oi.intersects(oj): raise InvalidScenarioError(f'Object at {oi.position} intersects' f' object at {oj.position}') def hasStaticBounds(self, obj): if needsSampling(obj.position): return False if any(needsSampling(corner) for corner in obj.corners): return False return True
[docs] def generate(self, maxIterations=2000, verbosity=0, feedback=None): """Sample a `Scene` from this scenario. For a description of how scene generation is done, see `scene generation`. Args: maxIterations (int): Maximum number of rejection sampling iterations. verbosity (int): Verbosity level. feedback (float): Feedback to pass to external samplers doing active sampling. See :mod:`scenic.core.external_params`. Returns: A pair with the sampled `Scene` and the number of iterations used. Raises: `RejectionException`: if no valid sample is found in **maxIterations** iterations. """ objects = self.objects # choose which custom requirements will be enforced for this sample activeReqs = [req for req in self.initialRequirements if random.random() <= req.prob] # do rejection sampling until requirements are satisfied rejection = True iterations = 0 while rejection is not None: if iterations > 0: # rejected the last sample if verbosity >= 2: print(f' Rejected sample {iterations} because of: {rejection}') if self.externalSampler is not None: feedback = self.externalSampler.rejectionFeedback if iterations >= maxIterations: raise RejectionException(f'failed to generate scenario in {iterations} iterations') iterations += 1 try: if self.externalSampler is not None: self.externalSampler.sample(feedback) sample = Samplable.sampleAll(self.dependencies) except RejectionException as e: optionallyDebugRejection(e) rejection = e continue rejection = None ego = sample[self.egoObject] # Normalize types of some built-in properties for obj in objects: sampledObj = sample[obj] assert not needsSampling(sampledObj) # position, heading assert isinstance(sampledObj.position, Vector) sampledObj.heading = float(sampledObj.heading) # behavior behavior = sampledObj.behavior if behavior is not None and not isinstance(behavior, Behavior): raise InvalidScenarioError( f'behavior {behavior} of Object {obj} is not a behavior') # Check built-in requirements for i in range(len(objects)): vi = sample[objects[i]] # Require object to be contained in the workspace/valid region container = self.containerOfObject(vi) if not container.containsObject(vi): rejection = 'object containment' break # Require object to be visible from the ego object if vi.requireVisible and vi is not ego and not ego.canSee(vi): rejection = 'object visibility' break # Require object to not intersect another object if not vi.allowCollisions: for j in range(i): vj = sample[objects[j]] if not vj.allowCollisions and vi.intersects(vj): rejection = 'object intersection' break if rejection is not None: break if rejection is not None: optionallyDebugRejection() continue # Check user-specified requirements for req in activeReqs: if not req.satisfiedBy(sample): rejection = str(req) optionallyDebugRejection() break # obtained a valid sample; assemble a scene from it sampledObjects = tuple(sample[obj] for obj in objects) sampledParams = {} for param, value in self.params.items(): sampledValue = sample[value] assert not needsLazyEvaluation(sampledValue) sampledParams[param] = sampledValue sampledNamespaces = {} for modName, namespace in self.behaviorNamespaces.items(): sampledNamespace = { name: sample[value] for name, value in namespace.items() } sampledNamespaces[modName] = (namespace, sampledNamespace, namespace.copy()) alwaysReqs = (BoundRequirement(req, sample) for req in self.alwaysRequirements) eventuallyReqs = (BoundRequirement(req, sample) for req in self.eventuallyRequirements) terminationConds = (BoundRequirement(req, sample) for req in self.terminationConditions) termSimulationConds = (BoundRequirement(req, sample) for req in self.terminateSimulationConditions) recordedExprs = (BoundRequirement(req, sample) for req in self.recordedExprs) recordedInitialExprs = (BoundRequirement(req, sample) for req in self.recordedInitialExprs) recordedFinalExprs = (BoundRequirement(req, sample) for req in self.recordedFinalExprs) scene = Scene(self.workspace, sampledObjects, ego, sampledParams, alwaysReqs, eventuallyReqs, terminationConds, termSimulationConds, recordedExprs, recordedInitialExprs,recordedFinalExprs, self.monitors, sampledNamespaces, self.dynamicScenario) return scene, iterations
[docs] def resetExternalSampler(self): """Reset the scenario's external sampler, if any. If the Python random seed is reset before calling this function, this should cause the sequence of generated scenes to be deterministic. """ self.externalSampler = ExternalSampler.forParameters(self.externalParams, self.params)
[docs] def conditionOn(self, scene=None, objects=(), params={}): """Condition the scenario on particular values for some objects or parameters. This method changes the distribution of the scenario and should be used with care: it does not attempt to check that the new distribution is equivalent to the old one or that it has nonzero probability of satisfying the scenario's requirements. For example, to sample object #5 in the scenario once and then leave it fixed in all subsequent samples:: sceneA, _ = scenario.generate() scenario.conditionOn(scene=sceneA, objects=(5,)) sceneB, _ = scenario.generate() # will have the same object 5 as sceneA Args: scene (Scene): Scene from which to take values for the given **objects**, if any. objects: Sequence of indices specifying which objects in this scenario should be conditioned on the corresponding objects in **scene** (i.e. those with the same index in the list of objects). params (dict): Dictionary of global parameters to condition and their new values (which may be constants or distributions). """ assert objects or params assert bool(scene) == bool(objects) if scene: assert len(self.objects) == len(scene.objects) for i in objects: assert i < len(self.objects) self.objects[i].conditionTo(scene.objects[i]) for param, newVal in params.items(): curVal = self.params[param] if isinstance(curVal, Samplable): if not isinstance(newVal, Samplable): newVal = ConstantSamplable(newVal) curVal.conditionTo(newVal) else: self.params[param] = newVal
def getSimulator(self): if self.simulator is None: raise RuntimeError('scenario does not specify a simulator') import scenic.syntax.veneer as veneer return veneer.instantiateSimulator(self.simulator, self.params)