Source code for scenic.syntax.veneer

"""Python implementations of Scenic language constructs.

This module is automatically imported by all Scenic programs. In addition to
defining the built-in functions, operators, specifiers, etc., it also stores
global state such as the list of all created Scenic objects.

.. highlight:: scenic-grammar
"""

__all__ = (
	# Primitive statements and functions
	'ego', 'workspace',
	'require', 'resample', 'param', 'globalParameters', 'mutate', 'verbosePrint',
	'localPath', 'model', 'simulator', 'simulation', 'require_always', 'require_eventually',
	'terminate_when', 'terminate_simulation_when', 'terminate_after', 'in_initial_scenario',
	'override',
	'record', 'record_initial', 'record_final',
	'sin', 'cos', 'hypot', 'max', 'min',
	'filter', 'str',
	# Prefix operators
	'Visible', 'NotVisible',
	'Front', 'Back', 'Left', 'Right',
	'FrontLeft', 'FrontRight', 'BackLeft', 'BackRight',
	'RelativeHeading', 'ApparentHeading', 'RelativePosition',
	'DistanceFrom', 'DistancePast', 'AngleTo', 'AngleFrom', 'Follow',
	# Infix operators
	'FieldAt', 'RelativeTo', 'OffsetAlong', 'CanSee',
	# Primitive types
	'Vector', 'VectorField', 'PolygonalVectorField',
	'Region', 'PointSetRegion', 'RectangularRegion', 'CircularRegion', 'SectorRegion',
	'PolygonalRegion', 'PolylineRegion',
	'Workspace', 'Mutator',
	'Range', 'DiscreteRange', 'Options', 'Uniform', 'Discrete', 'Normal',
	'TruncatedNormal',
	'VerifaiParameter', 'VerifaiRange', 'VerifaiDiscreteRange', 'VerifaiOptions',
	# Constructible types
	'Point', 'OrientedPoint', 'Object',
	# Specifiers
	'With',
	'At', 'In', 'Beyond', 'VisibleFrom', 'VisibleSpec', 'NotVisibleSpec',
	'OffsetBy', 'OffsetAlongSpec',
	'Facing', 'FacingToward', 'ApparentlyFacing',
	'LeftSpec', 'RightSpec', 'Ahead', 'Behind',
	'Following',
	# Constants
	'everywhere', 'nowhere',
	# Exceptions
	'GuardViolation', 'PreconditionViolation', 'InvariantViolation',
	# Internal APIs 	# TODO remove?
	'PropertyDefault', 'Behavior', 'Monitor', 'makeTerminationAction',
	'BlockConclusion', 'runTryInterrupt', 'wrapStarredValue', 'callWithStarArgs',
	'Modifier', 'DynamicScenario'
)

# various Python types and functions used in the language but defined elsewhere
from scenic.core.geometry import sin, cos, hypot, max, min
from scenic.core.vectors import Vector, VectorField, PolygonalVectorField
from scenic.core.regions import (Region, PointSetRegion, RectangularRegion,
	CircularRegion, SectorRegion, PolygonalRegion, PolylineRegion,
	everywhere, nowhere)
from scenic.core.workspaces import Workspace
from scenic.core.distributions import (Range, DiscreteRange, Options, Uniform, Normal,
	TruncatedNormal, RandomControlFlowError)
Discrete = Options
from scenic.core.external_params import (VerifaiParameter, VerifaiRange, VerifaiDiscreteRange,
										 VerifaiOptions)
from scenic.core.object_types import Mutator, Point, OrientedPoint, Object
from scenic.core.specifiers import PropertyDefault	# TODO remove
from scenic.core.dynamics import (Behavior, Monitor, DynamicScenario, BlockConclusion,
                                  GuardViolation, PreconditionViolation, InvariantViolation,
                                  makeTerminationAction, runTryInterrupt)

# everything that should not be directly accessible from the language is imported here:
import builtins
import collections.abc
from contextlib import contextmanager
import importlib
import sys
import os.path
import traceback
import typing
from scenic.core.distributions import (RejectionException, Distribution,
									   TupleDistribution, StarredDistribution, toDistribution,
									   needsSampling, canUnpackDistributions, distributionFunction)
from scenic.core.type_support import (isA, toType, toTypes, toScalar, toHeading, toVector,
									  evaluateRequiringEqualTypes, underlyingType,
									  canCoerce, coerce)
from scenic.core.geometry import normalizeAngle, apparentHeadingAtPoint
from scenic.core.object_types import Constructible
from scenic.core.specifiers import Specifier
from scenic.core.lazy_eval import DelayedArgument, needsLazyEvaluation
import scenic.core.errors as errors
from scenic.core.errors import RuntimeParseError, InvalidScenarioError
from scenic.core.vectors import OrientedVector
from scenic.core.external_params import ExternalParameter
import scenic.core.requirements as requirements
from scenic.core.simulators import RejectSimulationException

### Internals

activity = 0
currentScenario = None
scenarioStack = []
scenarios = []
evaluatingRequirement = False
_globalParameters = {}
lockedParameters = set()
lockedModel = None
loadingModel = False
currentSimulation = None
inInitialScenario = True
runningScenarios = set()
currentBehavior = None
simulatorFactory = None
evaluatingGuard = False

## APIs used internally by the rest of Scenic

# Scenic compilation

def isActive():
	"""Are we in the middle of compiling a Scenic module?

	The 'activity' global can be >1 when Scenic modules in turn import other
	Scenic modules.
	"""
	return activity > 0

def activate(paramOverrides={}, modelOverride=None, filename=None, namespace=None):
	"""Activate the veneer when beginning to compile a Scenic module."""
	global activity, _globalParameters, lockedParameters, lockedModel, currentScenario
	if paramOverrides or modelOverride:
		assert activity == 0
		_globalParameters.update(paramOverrides)
		lockedParameters = set(paramOverrides)
		lockedModel = modelOverride

	activity += 1
	assert not evaluatingRequirement
	assert not evaluatingGuard
	assert currentSimulation is None
	# placeholder scenario for top-level code
	newScenario = DynamicScenario._dummy(filename, namespace)
	scenarioStack.append(newScenario)
	currentScenario = newScenario

def deactivate():
	"""Deactivate the veneer after compiling a Scenic module."""
	global activity, _globalParameters, lockedParameters, lockedModel
	global currentScenario, scenarios, scenarioStack, simulatorFactory
	activity -= 1
	assert activity >= 0
	assert not evaluatingRequirement
	assert not evaluatingGuard
	assert currentSimulation is None
	scenarioStack.pop()
	assert len(scenarioStack) == activity
	scenarios = []

	if activity == 0:
		lockedParameters = set()
		lockedModel = None
		currentScenario = None
		simulatorFactory = None
		_globalParameters = {}
	else:
		currentScenario = scenarioStack[-1]

# Object creation

def registerObject(obj):
	"""Add a Scenic object to the global list of created objects.

	This is called by the Object constructor.
	"""
	if evaluatingRequirement:
		raise RuntimeParseError('tried to create an object inside a requirement')
	elif currentBehavior is not None:
		raise RuntimeParseError('tried to create an object inside a behavior')
	elif activity > 0 or currentScenario:
		assert not evaluatingRequirement
		assert isinstance(obj, Constructible)
		currentScenario._registerObject(obj)
		if currentSimulation:
			currentSimulation.createObject(obj)

# External parameter creation

def registerExternalParameter(value):
	"""Register a parameter whose value is given by an external sampler."""
	if activity > 0:
		assert isinstance(value, ExternalParameter)
		currentScenario._externalParameters.append(value)

# Function call support

def wrapStarredValue(value, lineno):
	if isinstance(value, TupleDistribution) or not needsSampling(value):
		return value
	elif isinstance(value, Distribution):
		return [StarredDistribution(value, lineno)]
	else:
		raise RuntimeParseError(f'iterable unpacking cannot be applied to {value}')

def callWithStarArgs(_func_to_call, *args, **kwargs):
	if not canUnpackDistributions(_func_to_call):
		# wrap function to delay evaluation until starred distributions are sampled
		_func_to_call = distributionFunction(_func_to_call)
	return _func_to_call(*args, **kwargs)

# Simulations

def instantiateSimulator(factory, params):
	global _globalParameters
	assert not _globalParameters		# TODO improve hack?
	_globalParameters = dict(params)
	try:
		return factory()
	finally:
		_globalParameters = {}

def beginSimulation(sim):
	global currentSimulation, currentScenario, inInitialScenario, runningScenarios
	global _globalParameters
	if isActive():
		raise RuntimeError('tried to start simulation during Scenic compilation!')
	assert currentSimulation is None
	assert currentScenario is None
	assert not scenarioStack
	currentSimulation = sim
	currentScenario = sim.scene.dynamicScenario
	runningScenarios = {currentScenario}
	inInitialScenario = currentScenario._setup is None
	currentScenario._bindTo(sim.scene)
	_globalParameters = dict(sim.scene.params)

	# rebind globals that could be referenced by behaviors to their sampled values
	for modName, (namespace, sampledNS, originalNS) in sim.scene.behaviorNamespaces.items():
		namespace.clear()
		namespace.update(sampledNS)

def endSimulation(sim):
	global currentSimulation, currentScenario, currentBehavior, runningScenarios
	global _globalParameters
	currentSimulation = None
	currentScenario = None
	runningScenarios = set()
	currentBehavior = None
	_globalParameters = {}

	for modName, (namespace, sampledNS, originalNS) in sim.scene.behaviorNamespaces.items():
		namespace.clear()
		namespace.update(originalNS)

def simulationInProgress():
	return currentSimulation is not None

# Requirements

@contextmanager
def executeInRequirement(scenario, boundEgo):
	global evaluatingRequirement, currentScenario
	assert activity == 0
	assert not evaluatingRequirement
	evaluatingRequirement = True
	if currentScenario is None:
		currentScenario = scenario
		clearScenario = True
	else:
		assert currentScenario is scenario
		clearScenario = False
	oldEgo = currentScenario._ego
	if boundEgo:
		currentScenario._ego = boundEgo
	try:
		yield
	except RandomControlFlowError as e:
		# Such errors should not be possible inside a requirement, since all values
		# should have already been sampled: something's gone wrong with our rebinding.
		raise RuntimeError('internal error: requirement dependency not sampled') from e
	finally:
		evaluatingRequirement = False
		currentScenario._ego = oldEgo
		if clearScenario:
			currentScenario = None

# Dynamic scenarios

def registerDynamicScenarioClass(cls):
	scenarios.append(cls)

@contextmanager
def executeInScenario(scenario, inheritEgo=False):
	global currentScenario
	oldScenario = currentScenario
	if inheritEgo and oldScenario is not None:
		scenario._ego = oldScenario._ego 	# inherit ego from parent
	currentScenario = scenario
	try:
		yield
	except AttributeError as e:
		# Convert confusing AttributeErrors from trying to access nonexistent scenario
		# variables into NameErrors, which is what the user would expect. The information
		# needed to do this was made available in Python 3.10, but unfortunately could be
		# wrong until 3.10.3: see bpo-46940.
		if sys.version_info >= (3, 10, 3) and isinstance(e.obj, DynamicScenario):
			newExc = NameError(f"name '{e.name}' is not defined", name=e.name)
			raise newExc.with_traceback(e.__traceback__)
		else:
			raise
	finally:
		currentScenario = oldScenario

def prepareScenario(scenario):
	if currentSimulation:
		verbosePrint(f'Starting scenario {scenario}', level=3)

def finishScenarioSetup(scenario):
	global inInitialScenario
	inInitialScenario = False

def startScenario(scenario):
	runningScenarios.add(scenario)

def endScenario(scenario, reason, quiet=False):
	runningScenarios.remove(scenario)
	if not quiet:
		verbosePrint(f'Stopping scenario {scenario} because: {reason}', level=3)

# Dynamic behaviors

@contextmanager
def executeInBehavior(behavior):
	global currentBehavior
	oldBehavior = currentBehavior
	currentBehavior = behavior
	try:
		yield
	except AttributeError as e:
		# See comment for corresponding code in executeInScenario
		if sys.version_info >= (3, 10, 3) and isinstance(e.obj, Behavior):
			newExc = NameError(f"name '{e.name}' is not defined", name=e.name)
			raise newExc.with_traceback(e.__traceback__)
		else:
			raise
	finally:
		currentBehavior = oldBehavior

@contextmanager
def executeInGuard():
	global evaluatingGuard
	assert not evaluatingGuard
	evaluatingGuard = True
	try:
		yield
	finally:
		evaluatingGuard = False

### Parsing support

class Modifier(typing.NamedTuple):
[docs] name: str value: typing.Any terminator: typing.Optional[str] = None
### Primitive statements and functions def ego(obj=None):
[docs] """Function implementing loads and stores to the 'ego' pseudo-variable. The translator calls this with no arguments for loads, and with the source value for stores. """ egoObject = currentScenario._ego if obj is None: if egoObject is None: raise RuntimeParseError('referred to ego object not yet assigned') elif not isinstance(obj, Object): raise RuntimeParseError('tried to make non-object the ego object') else: currentScenario._ego = obj for scenario in runningScenarios: if scenario._ego is None: scenario._ego = obj return egoObject
def workspace(workspace=None):
[docs] """Function implementing loads and stores to the 'workspace' pseudo-variable. See `ego`. """ if workspace is None: if currentScenario._workspace is None: raise RuntimeParseError('referred to workspace not yet assigned') elif not isinstance(workspace, Workspace): raise RuntimeParseError(f'workspace {workspace} is not a Workspace') elif needsSampling(workspace): raise InvalidScenarioError('workspace must be a fixed region') elif needsLazyEvaluation(workspace): raise InvalidScenarioError('workspace uses value undefined ' 'outside of object definition') else: currentScenario._workspace = workspace return currentScenario._workspace
def require(reqID, req, line, name, prob=1):
[docs] """Function implementing the require statement.""" if not name: name = f'requirement on line {line}' if evaluatingRequirement: raise RuntimeParseError('tried to create a requirement inside a requirement') if currentSimulation is not None: # requirement being evaluated at runtime if prob >= 1 or Range(0, 1) <= prob: # use Range so value can be recorded result = req() assert not needsSampling(result) if needsLazyEvaluation(result): raise RuntimeParseError(f'requirement on line {line} uses value' ' undefined outside of object definition') if not result: raise RejectSimulationException(name) else: # requirement being defined at compile time currentScenario._addRequirement(requirements.RequirementType.require, reqID, req, line, name, prob)
def record(reqID, value, line, name): if not name: name = f'record{line}' makeRequirement(requirements.RequirementType.record, reqID, value, line, name) def record_initial(reqID, value, line, name): if not name: name = f'record{line}' makeRequirement(requirements.RequirementType.recordInitial, reqID, value, line, name) def record_final(reqID, value, line, name): if not name: name = f'record{line}' makeRequirement(requirements.RequirementType.recordFinal, reqID, value, line, name) def require_always(reqID, req, line, name):
[docs] """Function implementing the 'require always' statement.""" if not name: name = f'requirement on line {line}' makeRequirement(requirements.RequirementType.requireAlways, reqID, req, line, name)
def require_eventually(reqID, req, line, name):
[docs] """Function implementing the 'require eventually' statement.""" if not name: name = f'requirement on line {line}' makeRequirement(requirements.RequirementType.requireEventually, reqID, req, line, name)
def terminate_when(reqID, req, line, name):
[docs] """Function implementing the 'terminate when' statement.""" if not name: name = f'termination condition on line {line}' makeRequirement(requirements.RequirementType.terminateWhen, reqID, req, line, name)
def terminate_simulation_when(reqID, req, line, name):
[docs] """Function implementing the 'terminate simulation when' statement.""" if not name: name = f'termination condition on line {line}' makeRequirement(requirements.RequirementType.terminateSimulationWhen, reqID, req, line, name)
def makeRequirement(ty, reqID, req, line, name): if evaluatingRequirement: raise RuntimeParseError(f'tried to use "{ty.value}" inside a requirement') elif currentBehavior is not None: raise RuntimeParseError(f'"{ty.value}" inside a behavior on line {line}') elif currentSimulation is not None: currentScenario._addDynamicRequirement(ty, req, line, name) else: # requirement being defined at compile time currentScenario._addRequirement(ty, reqID, req, line, name, 1) def terminate_after(timeLimit, terminator=None): if not isinstance(timeLimit, (float, int)): raise RuntimeParseError('"terminate after N" with N not a number') assert terminator in (None, 'seconds', 'steps') inSeconds = (terminator != 'steps') currentScenario._setTimeLimit(timeLimit, inSeconds=inSeconds) def resample(dist):
[docs] """The built-in resample function.""" if not isinstance(dist, Distribution): return dist try: return dist.clone() except NotImplementedError: raise RuntimeParseError('cannot resample non-primitive distribution') from None
def verbosePrint(*objects, level=1, indent=True,
[docs] sep=' ', end='\n', file=sys.stdout, flush=False): """Built-in function printing a message only in verbose mode. Scenic's verbosity may be set using the :option:`-v` command-line option. The simplest way to use this function is with code like :scenic:`verbosePrint('hello world!')` or :scenic:`verbosePrint('details here', level=3)`; the other keyword arguments are probably only useful when replacing more complex uses of the Python `print` function. Args: objects: Object(s) to print (`str` will be called to make them strings). level (int): Minimum verbosity level at which to print. Default is 1. indent (bool): Whether to indent the message to align with messages generated by Scenic (default true). sep, end, file, flush: As in `print`. """ if errors.verbosityLevel >= level: if indent: if currentSimulation: indent = ' ' if errors.verbosityLevel >= 3 else ' ' else: indent = ' ' * activity if errors.verbosityLevel >= 2 else ' ' print(indent, end='', file=file) print(*objects, sep=sep, end=end, file=file, flush=flush)
def localPath(relpath):
[docs] """Convert a path relative to the calling Scenic file into an absolute path. For example, :scenic:`localPath('resource.dat')` evaluates to the absolute path of a file called ``resource.dat`` located in the same directory as the Scenic file where this expression appears. """ filename = traceback.extract_stack(limit=2)[0].filename base = os.path.dirname(filename) return os.path.join(base, relpath)
def simulation():
[docs] """Get the currently-running `Simulation`. May only be called from code that runs at simulation time, e.g. inside :term:`dynamic behaviors` and :keyword:`compose` blocks of scenarios. """ if isActive(): raise RuntimeParseError('used simulation() outside a behavior') assert currentSimulation is not None return currentSimulation
def simulator(sim): global simulatorFactory simulatorFactory = sim def in_initial_scenario(): return inInitialScenario def override(*args): if len(args) < 1: raise RuntimeParseError('"override" missing an object') elif len(args) < 2: raise RuntimeParseError('"override" missing a list of specifiers') obj = args[0] if not isinstance(obj, Object): raise RuntimeParseError(f'"override" passed non-Object {obj}') specs = args[1:] for spec in specs: assert isinstance(spec, Specifier), spec currentScenario._override(obj, specs) def model(namespace, modelName): global loadingModel if loadingModel: raise RuntimeParseError('Scenic world model itself uses the "model" statement') if lockedModel is not None: modelName = lockedModel try: loadingModel = True module = importlib.import_module(modelName) except ModuleNotFoundError as e: if e.name == modelName: raise InvalidScenarioError(f'could not import world model {modelName}') from None else: raise finally: loadingModel = False names = module.__dict__.get('__all__', None) if names is not None: for name in names: namespace[name] = getattr(module, name) else: for name, value in module.__dict__.items(): if not name.startswith('_'): namespace[name] = value def param(*quotedParams, **params):
[docs] """Function implementing the param statement.""" global loadingModel if evaluatingRequirement: raise RuntimeParseError('tried to create a global parameter inside a requirement') elif currentSimulation is not None: raise RuntimeParseError('tried to create a global parameter during a simulation') for name, value in params.items(): if name not in lockedParameters and (not loadingModel or name not in _globalParameters): _globalParameters[name] = toDistribution(value) assert len(quotedParams) % 2 == 0, quotedParams it = iter(quotedParams) for name, value in zip(it, it): if name not in lockedParameters: _globalParameters[name] = toDistribution(value)
class ParameterTableProxy(collections.abc.Mapping): def __init__(self, map): object.__setattr__(self, '_internal_map', map) def __getitem__(self, name): return self._internal_map[name] def __iter__(self): return iter(self._internal_map) def __len__(self): return len(self._internal_map) def __getattr__(self, name): return self.__getitem__(name) # allow namedtuple-like access def __setattr__(self, name, value): raise RuntimeParseError('cannot modify globalParameters (use "param" statement)') def _clone_table(self): return ParameterTableProxy(self._internal_map.copy()) def globalParameters(): return ParameterTableProxy(_globalParameters) def mutate(*objects): # TODO update syntax
[docs] """Function implementing the mutate statement.""" if evaluatingRequirement: raise RuntimeParseError('used mutate statement inside a requirement') scale = 1 if objects and isinstance(objects[-1], (float, int)): scale = objects[-1] objects = objects[:-1] if len(objects) == 0: objects = currentScenario._objects for obj in objects: if not isinstance(obj, Object): raise RuntimeParseError('"mutate X" with X not an object') obj.mutationScale = scale
### Prefix operators def Visible(region):
[docs] """The :grammar:`visible <region>` operator.""" region = toType(region, Region, '"visible X" with X not a Region') return region.intersect(ego().visibleRegion)
def NotVisible(region):
[docs] """The :grammar:`not visible <region>` operator.""" region = toType(region, Region, '"not visible X" with X not a Region') return region.difference(ego().visibleRegion)
# front of <object>, etc. ops = ( 'front', 'back', 'left', 'right', 'front left', 'front right', 'back left', 'back right' ) template = '''\ def {function}(X): """The :grammar:`{syntax} of <object>` operator.""" if not isinstance(X, Object): raise RuntimeParseError('"{syntax} of X" with X not an Object') return X.{property} ''' for op in ops: func = ''.join(word.capitalize() for word in op.split(' ')) prop = func[0].lower() + func[1:] definition = template.format(function=func, syntax=op, property=prop) exec(definition) ### Infix operators def FieldAt(X, Y):
[docs] """The :grammar:`<VectorField> at <vector>` operator.""" if not isinstance(X, VectorField): raise RuntimeParseError('"X at Y" with X not a vector field') Y = toVector(Y, '"X at Y" with Y not a vector') return X[Y]
def RelativeTo(X, Y):
[docs] """The :scenic:`X relative to Y` polymorphic operator. Allowed forms:: <value> relative to <value> # with at least one a field, the other a field or heading <vector> relative to <oriented point> # and vice versa <vector> relative to <vector> <heading> relative to <heading> """ xf, yf = isA(X, VectorField), isA(Y, VectorField) if xf or yf: if xf and yf and X.valueType != Y.valueType: raise RuntimeParseError('"X relative to Y" with X, Y fields of different types') fieldType = X.valueType if xf else Y.valueType error = '"X relative to Y" with field and value of different types' def helper(context): pos = context.position.toVector() xp = X[pos] if xf else toType(X, fieldType, error) yp = Y[pos] if yf else toType(Y, fieldType, error) return xp + yp return DelayedArgument({'position'}, helper) else: if isinstance(X, OrientedPoint): # TODO too strict? if isinstance(Y, OrientedPoint): raise RuntimeParseError('"X relative to Y" with X, Y both oriented points') Y = toVector(Y, '"X relative to Y" with X an oriented point but Y not a vector') return X.relativize(Y) elif isinstance(Y, OrientedPoint): X = toVector(X, '"X relative to Y" with Y an oriented point but X not a vector') return Y.relativize(X) else: X = toTypes(X, (Vector, float), '"X relative to Y" with X neither a vector nor scalar') Y = toTypes(Y, (Vector, float), '"X relative to Y" with Y neither a vector nor scalar') return evaluateRequiringEqualTypes(lambda: X + Y, X, Y, '"X relative to Y" with vector and scalar')
def OffsetAlong(X, H, Y):
[docs] """The :scenic:`X offset along H by Y` polymorphic operator. Allowed forms:: <vector> offset along <heading> by <vector> <vector> offset along <field> by <vector> """ X = toVector(X, '"X offset along H by Y" with X not a vector') Y = toVector(Y, '"X offset along H by Y" with Y not a vector') if isinstance(H, VectorField): H = H[X] H = toHeading(H, '"X offset along H by Y" with H not a heading or vector field') return X.offsetRotated(H, Y)
def RelativePosition(X, Y=None):
[docs] """The :grammar:`relative position of <vector> [from <vector>]` operator. If the :grammar:`from <vector>` is omitted, the position of ego is used. """ X = toVector(X, '"relative position of X from Y" with X not a vector') if Y is None: Y = ego() Y = toVector(Y, '"relative position of X from Y" with Y not a vector') return X - Y
def RelativeHeading(X, Y=None):
[docs] """The :grammar:`relative heading of <heading> [from <heading>]` operator. If the :grammar:`from <heading>` is omitted, the heading of ego is used. """ X = toHeading(X, '"relative heading of X from Y" with X not a heading') if Y is None: Y = ego().heading else: Y = toHeading(Y, '"relative heading of X from Y" with Y not a heading') return normalizeAngle(X - Y)
def ApparentHeading(X, Y=None):
[docs] """The :grammar:`apparent heading of <oriented point> [from <vector>]` operator. If the :grammar:`from <vector>` is omitted, the position of ego is used. """ if not isinstance(X, OrientedPoint): raise RuntimeParseError('"apparent heading of X from Y" with X not an OrientedPoint') if Y is None: Y = ego() Y = toVector(Y, '"relative heading of X from Y" with Y not a vector') return apparentHeadingAtPoint(X.position, X.heading, Y)
def DistanceFrom(X, Y=None):
[docs] """The :scenic:`distance from {X} to {Y}` polymorphic operator. Allowed forms:: distance from <vector> [to <vector>] distance from <region> [to <vector>] distance from <vector> to <region> If the :grammar:`to <vector>` is omitted, the position of ego is used. """ X = toTypes(X, (Vector, Region), '"distance from X to Y" with X neither a vector nor region') if Y is None: Y = ego() Y = toTypes(Y, (Vector, Region), '"distance from X to Y" with Y neither a vector nor region') return X.distanceTo(Y)
def DistancePast(X, Y=None):
[docs] """The :grammar:`distance past <vector> of <oriented point>` operator. If the :grammar:`of {oriented point}` is omitted, the ego object is used. """ X = toVector(X, '"distance past X" with X not a vector') if Y is None: Y = ego() Y = toType(Y, OrientedPoint, '"distance past X of Y" with Y not an OrientedPoint') return Y.distancePast(X)
def AngleTo(X):
[docs] """The :grammar:`angle to <vector>` operator (using the position of ego as the reference).""" X = toVector(X, '"angle to X" with X not a vector') return ego().angleTo(X)
def AngleFrom(X, Y):
[docs] """The :grammar:`angle from <vector> to <vector>` operator.""" X = toVector(X, '"angle from X to Y" with X not a vector') Y = toVector(Y, '"angle from X to Y" with Y not a vector') return X.angleTo(Y)
def Follow(F, X, D):
[docs] """The :grammar:`follow <field> from <vector> for <number>` operator.""" if not isinstance(F, VectorField): raise RuntimeParseError('"follow F from X for D" with F not a vector field') X = toVector(X, '"follow F from X for D" with X not a vector') D = toScalar(D, '"follow F from X for D" with D not a number') pos = F.followFrom(X, D) heading = F[pos] return OrientedPoint(position=pos, heading=heading)
def CanSee(X, Y):
[docs] """The :scenic:`{X} can see {Y}` polymorphic operator. Allowed forms:: <point> can see <object> <point> can see <vector> """ if not isinstance(X, Point): raise RuntimeParseError('"X can see Y" with X not a Point') if isinstance(Y, Point): return X.canSee(Y) else: Y = toVector(Y, '"X can see Y" with Y not a vector') return X.visibleRegion.containsPoint(Y)
### Specifiers def With(prop, val):
[docs] """The :grammar:`with <property> <value>` specifier. Specifies the given property, with no dependencies. """ return Specifier(prop, val)
def At(pos):
[docs] """The :grammar:`at <vector>` specifier. Specifies :prop:`position`, with no dependencies.""" pos = toVector(pos, 'specifier "at X" with X not a vector') return Specifier('position', pos)
def In(region):
[docs] """The :grammar:`in/on <region>` specifier. Specifies :prop:`position`, with no dependencies. Optionally specifies :prop:`heading` if the given `Region` has a :term:`preferred orientation`. """ region = toType(region, Region, 'specifier "in/on R" with R not a Region') extras = {'heading'} if alwaysProvidesOrientation(region) else {} return Specifier('position', Region.uniformPointIn(region), optionals=extras)
def alwaysProvidesOrientation(region): """Whether a Region or distribution over Regions always provides an orientation.""" if isinstance(region, Region): return region.orientation is not None elif (isinstance(region, Options) and all(alwaysProvidesOrientation(opt) for opt in region.options)): return True else: # TODO improve somehow! try: sample = region.sample() return sample.orientation is not None or sample is nowhere except RejectionException: return False def Beyond(pos, offset, fromPt=None):
[docs] """The :specifier:`beyond {X} by {Y} from {Z}` polymorphic specifier. Specifies :prop:`position`, with no dependencies. Allowed forms:: beyond <vector> by <number> [from <vector>] beyond <vector> by <vector> [from <vector>] If the :grammar:`from <vector>` is omitted, the position of ego is used. """ pos = toVector(pos, 'specifier "beyond X by Y" with X not a vector') offset = toTypes(offset, (Vector, float), 'specifier "beyond X by Y" with Y not a number or vector') dType = underlyingType(offset) if dType is float: offset = Vector(0, offset) elif dType is not Vector: raise RuntimeParseError('specifier "beyond X by Y" with Y not a number or vector') if fromPt is None: fromPt = ego() fromPt = toVector(fromPt, 'specifier "beyond X by Y from Z" with Z not a vector') lineOfSight = fromPt.angleTo(pos) return Specifier('position', pos.offsetRotated(lineOfSight, offset))
def VisibleFrom(base):
[docs] """The :grammar:`visible from <Point>` specifier. Specifies :prop:`position`, with no dependencies. This uses the given object's :prop:`visibleRegion` property, and so correctly handles the view regions of Points, OrientedPoints, and Objects. """ if not isinstance(base, Point): raise RuntimeParseError('specifier "visible from O" with O not a Point') return Specifier('position', Region.uniformPointIn(base.visibleRegion))
def VisibleSpec():
[docs] """The :specifier:`visible` specifier (equivalent to :specifier:`visible from ego`). Specifies :prop:`position`, with no dependencies. """ return VisibleFrom(ego())
def NotVisibleFrom(base): """The :grammar:`not visible from <Point>` specifier. Specifies :prop:`position`, depending on :prop:`regionContainedIn`. See `VisibleFrom`. """ if not isinstance(base, Point): raise RuntimeParseError('specifier "not visible from O" with O not a Point') def helper(self): region = self.regionContainedIn if region is None: if currentScenario._workspace is None: raise RuntimeParseError('"not visible" specifier with no workspace defined') region = currentScenario._workspace.region return Region.uniformPointIn(region.difference(base.visibleRegion)) return Specifier('position', DelayedArgument({'regionContainedIn'}, helper)) def NotVisibleSpec():
[docs] """The :specifier:`not visible` specifier (equivalent to :specifier:`not visible from ego`). Specifies :prop:`position`, depending on :prop:`regionContainedIn`. """ return NotVisibleFrom(ego())
def OffsetBy(offset):
[docs] """The :grammar:`offset by <vector>` specifier. Specifies :prop:`position`, with no dependencies. """ offset = toVector(offset, 'specifier "offset by X" with X not a vector') pos = RelativeTo(offset, ego()).toVector() return Specifier('position', pos)
def OffsetAlongSpec(direction, offset):
[docs] """The :specifier:`offset along {X} by {Y}` polymorphic specifier. Specifies :prop:`position`, with no dependencies. Allowed forms:: offset along <heading> by <vector> offset along <field> by <vector> """ return Specifier('position', OffsetAlong(ego(), direction, offset))
def Facing(heading):
[docs] """The :specifier:`facing {X}` polymorphic specifier. Specifies :prop:`heading`, with dependencies depending on the form:: facing <number> # no dependencies; facing <field> # depends on 'position' """ if isinstance(heading, VectorField): return Specifier('heading', DelayedArgument({'position'}, lambda self: heading[self.position])) else: heading = toHeading(heading, 'specifier "facing X" with X not a heading or vector field') return Specifier('heading', heading)
def FacingToward(pos):
[docs] """The :grammar:`facing toward <vector>` specifier. Specifies :prop:`heading`, depending on :prop:`position`. """ pos = toVector(pos, 'specifier "facing toward X" with X not a vector') return Specifier('heading', DelayedArgument({'position'}, lambda self: self.position.angleTo(pos)))
def ApparentlyFacing(heading, fromPt=None):
[docs] """The :grammar:`apparently facing <heading> [from <vector>]` specifier. Specifies :prop:`heading`, depending on :prop:`position`. If the :grammar:`from <vector>` is omitted, the position of ego is used. """ heading = toHeading(heading, 'specifier "apparently facing X" with X not a heading') if fromPt is None: fromPt = ego() fromPt = toVector(fromPt, 'specifier "apparently facing X from Y" with Y not a vector') value = lambda self: fromPt.angleTo(self.position) + heading return Specifier('heading', DelayedArgument({'position'}, value))
def LeftSpec(pos, dist=0):
[docs] """The :specifier:`left of {X} by {Y}` polymorphic specifier. Specifies :prop:`position`, depending on :prop:`width`. See other dependencies below. Allowed forms:: left of <oriented point> [by <scalar/vector>] # optionally specifies 'heading'; left of <vector> [by <scalar/vector>] # depends on 'heading'. If the :grammar:`by <scalar/vector>` is omitted, zero is used. """ return leftSpecHelper('left of', pos, dist, 'width', lambda dist: (dist, 0), lambda self, dx, dy: Vector(-self.width / 2 - dx, dy))
def RightSpec(pos, dist=0):
[docs] """The :specifier:`right of {X} by {Y}` polymorphic specifier. Specifies :prop:`position`, depending on :prop:`width`. See other dependencies below. Allowed forms:: right of <oriented point> [by <scalar/vector>] # optionally specifies 'heading'; right of <vector> [by <scalar/vector>] # depends on 'heading'. If the :grammar:`by <scalar/vector>` is omitted, zero is used. """ return leftSpecHelper('right of', pos, dist, 'width', lambda dist: (dist, 0), lambda self, dx, dy: Vector(self.width / 2 + dx, dy))
def Ahead(pos, dist=0):
[docs] """The :specifier:`ahead of {X} by {Y}` polymorphic specifier. Specifies :prop:`position`, depending on :prop:`length`. See other dependencies below. Allowed forms:: ahead of <oriented point> [by <scalar/vector>] # optionally specifies 'heading'; ahead of <vector> [by <scalar/vector>] # depends on 'heading'. If the :grammar:`by <scalar/vector>` is omitted, zero is used. """ return leftSpecHelper('ahead of', pos, dist, 'length', lambda dist: (0, dist), lambda self, dx, dy: Vector(dx, self.length / 2 + dy))
def Behind(pos, dist=0):
[docs] """The :specifier:`behind {X} by {Y}` polymorphic specifier. Specifies :prop:`position`, depending on :prop:`length`. See other dependencies below. Allowed forms:: behind <oriented point> [by <scalar/vector>] # optionally specifies 'heading'; behind <vector> [by <scalar/vector>] # depends on 'heading'. If the :grammar:`by <scalar/vector>` is omitted, zero is used. """ return leftSpecHelper('behind', pos, dist, 'length', lambda dist: (0, dist), lambda self, dx, dy: Vector(dx, -self.length / 2 - dy))
def leftSpecHelper(syntax, pos, dist, axis, toComponents, makeOffset): extras = set() if canCoerce(dist, float): dx, dy = toComponents(coerce(dist, float)) elif canCoerce(dist, Vector): dx, dy = coerce(dist, Vector) else: raise RuntimeParseError(f'"{syntax} X by D" with D not a number or vector') if isinstance(pos, OrientedPoint): # TODO too strict? val = lambda self: pos.relativize(makeOffset(self, dx, dy)) new = DelayedArgument({axis}, val) extras.add('heading') else: pos = toVector(pos, f'specifier "{syntax} X" with X not a vector') val = lambda self: pos.offsetRotated(self.heading, makeOffset(self, dx, dy)) new = DelayedArgument({axis, 'heading'}, val) return Specifier('position', new, optionals=extras) def Following(field, dist, fromPt=None):
[docs] """The :specifier:`following {F} from {X} for {D}` specifier. Specifies :prop:`position`, and optionally :prop:`heading`, with no dependencies. Allowed forms:: following <field> [from <vector>] for <number> If the :grammar:`from <vector>` is omitted, the position of ego is used. """ if fromPt is None: fromPt = ego() else: dist, fromPt = fromPt, dist if not isinstance(field, VectorField): raise RuntimeParseError('"following F" specifier with F not a vector field') fromPt = toVector(fromPt, '"following F from X for D" with X not a vector') dist = toScalar(dist, '"following F for D" with D not a number') pos = field.followFrom(fromPt, dist) heading = field[pos] val = OrientedVector.make(pos, heading) return Specifier('position', val, optionals={'heading'})
### Primitive functions overriding Python builtins @distributionFunction def filter(function, iterable): return list(builtins.filter(function, iterable)) @distributionFunction def str(*args, **kwargs): return builtins.str(*args, **kwargs)