"""Simulator interface for CARLA."""
try:
import carla
except ImportError as e:
raise ModuleNotFoundError('CARLA scenarios require the "carla" Python package') from e
import math
import os
import warnings
import scenic.core.errors as errors
if errors.verbosityLevel == 0: # suppress pygame advertisement at zero verbosity
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide"
import pygame
from scenic.core.simulators import SimulationCreationError
from scenic.domains.driving.simulators import DrivingSimulation, DrivingSimulator
from scenic.simulators.carla.blueprints import oldBlueprintNames
import scenic.simulators.carla.utils.utils as utils
import scenic.simulators.carla.utils.visuals as visuals
from scenic.syntax.veneer import verbosePrint
[docs]class CarlaSimulator(DrivingSimulator):
"""Implementation of `Simulator` for CARLA."""
def __init__(
self,
carla_map,
map_path,
address="127.0.0.1",
port=2000,
timeout=10,
render=True,
record="",
timestep=0.1,
traffic_manager_port=None,
):
super().__init__()
verbosePrint(f"Connecting to CARLA on port {port}")
self.client = carla.Client(address, port)
self.client.set_timeout(timeout) # limits networking operations (seconds)
if carla_map is not None:
try:
self.world = self.client.load_world(carla_map)
except Exception as e:
raise RuntimeError(f"CARLA could not load world '{carla_map}'") from e
else:
if str(map_path).endswith(".xodr"):
with open(map_path) as odr_file:
self.world = self.client.generate_opendrive_world(odr_file.read())
else:
raise RuntimeError("CARLA only supports OpenDrive maps")
self.timestep = timestep
if traffic_manager_port is None:
traffic_manager_port = port + 6000
self.tm = self.client.get_trafficmanager(traffic_manager_port)
self.tm.set_synchronous_mode(True)
# Set to synchronous with fixed timestep
settings = self.world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = timestep # NOTE: Should not exceed 0.1
self.world.apply_settings(settings)
verbosePrint("Map loaded in simulator.")
self.render = render # visualization mode ON/OFF
self.record = record # whether to use the carla recorder
self.scenario_number = 0 # Number of the scenario executed
def createSimulation(self, scene, *, timestep, **kwargs):
if timestep is not None and timestep != self.timestep:
raise RuntimeError(
"cannot customize timestep for individual CARLA simulations; "
"set timestep when creating the CarlaSimulator instead"
)
self.scenario_number += 1
return CarlaSimulation(
scene,
self.client,
self.tm,
self.render,
self.record,
self.scenario_number,
timestep=self.timestep,
**kwargs,
)
def destroy(self):
super().destroy()
settings = self.world.get_settings()
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
self.world.apply_settings(settings)
self.tm.set_synchronous_mode(False)
class CarlaSimulation(DrivingSimulation):
def __init__(self, scene, client, tm, render, record, scenario_number, **kwargs):
self.client = client
self.world = self.client.get_world()
self.map = self.world.get_map()
self.blueprintLib = self.world.get_blueprint_library()
self.tm = tm
self.render = render
self.record = record
self.scenario_number = scenario_number
self.cameraManager = None
super().__init__(scene, **kwargs)
def setup(self):
weather = self.scene.params.get("weather")
if weather is not None:
if isinstance(weather, str):
self.world.set_weather(getattr(carla.WeatherParameters, weather))
elif isinstance(weather, dict):
self.world.set_weather(carla.WeatherParameters(**weather))
# Setup HUD
if self.render:
self.displayDim = (1280, 720)
self.displayClock = pygame.time.Clock()
self.camTransform = 0
pygame.init()
pygame.font.init()
self.hud = visuals.HUD(*self.displayDim)
self.display = pygame.display.set_mode(
self.displayDim, pygame.HWSURFACE | pygame.DOUBLEBUF
)
self.cameraManager = None
if self.record:
if not os.path.exists(self.record):
os.mkdir(self.record)
name = "{}/scenario{}.log".format(self.record, self.scenario_number)
# Carla is looking for an absolute path, so convert it if necessary.
name = os.path.abspath(name)
self.client.start_recorder(name)
# Create objects.
super().setup()
# Set up camera manager and collision sensor for ego
if self.render:
camIndex = 0
camPosIndex = 0
egoActor = self.objects[0].carlaActor
self.cameraManager = visuals.CameraManager(self.world, egoActor, self.hud)
self.cameraManager._transform_index = camPosIndex
self.cameraManager.set_sensor(camIndex)
self.cameraManager.set_transform(self.camTransform)
self.world.tick() ## allowing manualgearshift to take effect # TODO still need this?
for obj in self.objects:
if isinstance(obj.carlaActor, carla.Vehicle):
obj.carlaActor.apply_control(
carla.VehicleControl(manual_gear_shift=False)
)
self.world.tick()
for obj in self.objects:
if obj.speed is not None and obj.speed != 0:
raise RuntimeError(
f"object {obj} cannot have a nonzero initial speed "
"(this is not yet possible in CARLA)"
)
def createObjectInSimulator(self, obj):
# Extract blueprint
try:
blueprint = self.blueprintLib.find(obj.blueprint)
except IndexError as e:
found = False
if obj.blueprint in oldBlueprintNames:
for oldName in oldBlueprintNames[obj.blueprint]:
try:
blueprint = self.blueprintLib.find(oldName)
found = True
warnings.warn(
f"CARLA blueprint {obj.blueprint} not found; "
f"using older version {oldName}"
)
obj.blueprint = oldName
break
except IndexError:
continue
if not found:
raise SimulationCreationError(
f"Unable to find blueprint {obj.blueprint}" f" for object {obj}"
) from e
if obj.rolename is not None:
blueprint.set_attribute("role_name", obj.rolename)
# set walker as not invincible
if blueprint.has_attribute("is_invincible"):
blueprint.set_attribute("is_invincible", "False")
# Set up transform
loc = utils.scenicToCarlaLocation(
obj.position,
world=self.world,
blueprint=obj.blueprint,
snapToGround=obj.snapToGround,
)
rot = utils.scenicToCarlaRotation(obj.orientation)
transform = carla.Transform(loc, rot)
# Color, cannot be set for Pedestrians
if blueprint.has_attribute("color") and obj.color is not None:
c = obj.color
c_str = f"{int(c.r*255)},{int(c.g*255)},{int(c.b*255)}"
blueprint.set_attribute("color", c_str)
# Create Carla actor
carlaActor = self.world.try_spawn_actor(blueprint, transform)
if carlaActor is None:
raise SimulationCreationError(f"Unable to spawn object {obj}")
obj.carlaActor = carlaActor
carlaActor.set_simulate_physics(obj.physics)
if isinstance(carlaActor, carla.Vehicle):
# TODO should get dimensions at compile time, not simulation time
extent = carlaActor.bounding_box.extent
ex, ey, ez = extent.x, extent.y, extent.z
# Ensure each extent is positive to work around CARLA issue #5841
obj.width = ey * 2 if ey > 0 else obj.width
obj.length = ex * 2 if ex > 0 else obj.length
obj.height = ez * 2 if ez > 0 else obj.height
carlaActor.apply_control(carla.VehicleControl(manual_gear_shift=True, gear=1))
elif isinstance(carlaActor, carla.Walker):
carlaActor.apply_control(carla.WalkerControl())
# spawn walker controller
controller_bp = self.blueprintLib.find("controller.ai.walker")
controller = self.world.try_spawn_actor(
controller_bp, carla.Transform(), carlaActor
)
if controller is None:
raise SimulationCreationError(
f"Unable to spawn carla controller for object {obj}"
)
obj.carlaController = controller
return carlaActor
def executeActions(self, allActions):
super().executeActions(allActions)
# Apply control updates which were accumulated while executing the actions
for obj in self.agents:
ctrl = obj._control
if ctrl is not None:
obj.carlaActor.apply_control(ctrl)
obj._control = None
def step(self):
# Run simulation for one timestep
self.world.tick()
# Render simulation
if self.render:
self.cameraManager.render(self.display)
pygame.display.flip()
def getProperties(self, obj, properties):
# Extract Carla properties
carlaActor = obj.carlaActor
currTransform = carlaActor.get_transform()
currLoc = currTransform.location
currRot = currTransform.rotation
currVel = carlaActor.get_velocity()
currAngVel = carlaActor.get_angular_velocity()
# Prepare Scenic object properties
position = utils.carlaToScenicPosition(currLoc)
velocity = utils.carlaToScenicPosition(currVel)
speed = math.hypot(*velocity)
angularSpeed = utils.carlaToScenicAngularSpeed(currAngVel)
angularVelocity = utils.carlaToScenicAngularVel(currAngVel)
globalOrientation = utils.carlaToScenicOrientation(currRot)
yaw, pitch, roll = obj.parentOrientation.localAnglesFor(globalOrientation)
elevation = utils.carlaToScenicElevation(currLoc)
values = dict(
position=position,
velocity=velocity,
speed=speed,
angularSpeed=angularSpeed,
angularVelocity=angularVelocity,
yaw=yaw,
pitch=pitch,
roll=roll,
elevation=elevation,
)
return values
def destroy(self):
for obj in self.objects:
if obj.carlaActor is not None:
if isinstance(obj.carlaActor, carla.Vehicle):
obj.carlaActor.set_autopilot(False, self.tm.get_port())
if isinstance(obj.carlaActor, carla.Walker):
obj.carlaController.stop()
obj.carlaController.destroy()
obj.carlaActor.destroy()
if self.render and self.cameraManager:
self.cameraManager.destroy_sensor()
self.client.stop_recorder()
self.world.tick()
super().destroy()