"""Simulator interface for CARLA."""
try:
import carla
except ImportError as e:
raise ModuleNotFoundError('CARLA scenarios require the "carla" Python package') from e
import math
import os
import warnings
import scenic.core.errors as errors
if errors.verbosityLevel == 0: # suppress pygame advertisement at zero verbosity
import os
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = 'hide'
import pygame
from scenic.domains.driving.simulators import DrivingSimulator, DrivingSimulation
from scenic.core.simulators import SimulationCreationError
from scenic.syntax.veneer import verbosePrint
from scenic.simulators.carla.blueprints import oldBlueprintNames
import scenic.simulators.carla.utils.utils as utils
import scenic.simulators.carla.utils.visuals as visuals
[docs]class CarlaSimulator(DrivingSimulator):
"""Implementation of `Simulator` for CARLA."""
def __init__(self, carla_map, map_path, address='127.0.0.1', port=2000, timeout=10,
render=True, record='', timestep=0.1, traffic_manager_port=None):
super().__init__()
verbosePrint(f'Connecting to CARLA on port {port}')
self.client = carla.Client(address, port)
self.client.set_timeout(timeout) # limits networking operations (seconds)
if carla_map is not None:
self.world = self.client.load_world(carla_map)
else:
if map_path.endswith('.xodr'):
with open(map_path) as odr_file:
self.world = self.client.generate_opendrive_world(odr_file.read())
else:
raise RuntimeError('CARLA only supports OpenDrive maps')
self.timestep = timestep
if traffic_manager_port is None:
traffic_manager_port = port + 6000
self.tm = self.client.get_trafficmanager(traffic_manager_port)
self.tm.set_synchronous_mode(True)
# Set to synchronous with fixed timestep
settings = self.world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = timestep # NOTE: Should not exceed 0.1
self.world.apply_settings(settings)
verbosePrint('Map loaded in simulator.')
self.render = render # visualization mode ON/OFF
self.record = record # whether to use the carla recorder
self.scenario_number = 0 # Number of the scenario executed
def createSimulation(self, scene, verbosity=0):
self.scenario_number += 1
return CarlaSimulation(scene, self.client, self.tm, self.timestep,
render=self.render, record=self.record,
scenario_number=self.scenario_number, verbosity=verbosity)
def destroy(self):
settings = self.world.get_settings()
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
self.world.apply_settings(settings)
self.tm.set_synchronous_mode(False)
super().destroy()
class CarlaSimulation(DrivingSimulation):
def __init__(self, scene, client, tm, timestep, render, record, scenario_number, verbosity=0):
super().__init__(scene, timestep=timestep, verbosity=verbosity)
self.client = client
self.world = self.client.get_world()
self.map = self.world.get_map()
self.blueprintLib = self.world.get_blueprint_library()
self.tm = tm
weather = scene.params.get("weather")
if weather is not None:
if isinstance(weather, str):
self.world.set_weather(getattr(carla.WeatherParameters, weather))
elif isinstance(weather, dict):
self.world.set_weather(carla.WeatherParameters(**weather))
# Reloads current world: destroys all actors, except traffic manager instances
# self.client.reload_world()
# Setup HUD
self.render = render
self.record = record
self.scenario_number = scenario_number
if self.render:
self.displayDim = (1280, 720)
self.displayClock = pygame.time.Clock()
self.camTransform = 0
pygame.init()
pygame.font.init()
self.hud = visuals.HUD(*self.displayDim)
self.display = pygame.display.set_mode(
self.displayDim,
pygame.HWSURFACE | pygame.DOUBLEBUF
)
self.cameraManager = None
if self.record:
if not os.path.exists(self.record):
os.mkdir(self.record)
name = "{}/scenario{}.log".format(self.record, self.scenario_number)
self.client.start_recorder(name)
# Create Carla actors corresponding to Scenic objects
self.ego = None
for obj in self.objects:
carlaActor = self.createObjectInSimulator(obj)
# Check if ego (from carla_scenic_taks.py)
if obj is self.objects[0]:
self.ego = obj
# Set up camera manager and collision sensor for ego
if self.render:
camIndex = 0
camPosIndex = 0
self.cameraManager = visuals.CameraManager(self.world, carlaActor, self.hud)
self.cameraManager._transform_index = camPosIndex
self.cameraManager.set_sensor(camIndex)
self.cameraManager.set_transform(self.camTransform)
self.world.tick() ## allowing manualgearshift to take effect # TODO still need this?
for obj in self.objects:
if isinstance(obj.carlaActor, carla.Vehicle):
obj.carlaActor.apply_control(carla.VehicleControl(manual_gear_shift=False))
self.world.tick()
# Set Carla actor's initial speed (if specified)
for obj in self.objects:
if obj.speed is not None:
equivVel = utils.scenicSpeedToCarlaVelocity(obj.speed, obj.heading)
if hasattr(obj.carlaActor, 'set_target_velocity'):
obj.carlaActor.set_target_velocity(equivVel)
else:
obj.carlaActor.set_velocity(equivVel)
def createObjectInSimulator(self, obj):
# Extract blueprint
try:
blueprint = self.blueprintLib.find(obj.blueprint)
except IndexError as e:
found = False
if obj.blueprint in oldBlueprintNames:
for oldName in oldBlueprintNames[obj.blueprint]:
try:
blueprint = self.blueprintLib.find(oldName)
found = True
warnings.warn(f'CARLA blueprint {obj.blueprint} not found; '
f'using older version {oldName}')
obj.blueprint = oldName
break
except IndexError:
continue
if not found:
raise SimulationCreationError(f'Unable to find blueprint {obj.blueprint}'
f' for object {obj}') from e
if obj.rolename is not None:
blueprint.set_attribute('role_name', obj.rolename)
# set walker as not invincible
if blueprint.has_attribute('is_invincible'):
blueprint.set_attribute('is_invincible', 'False')
# Set up transform
loc = utils.scenicToCarlaLocation(obj.position, world=self.world, blueprint=obj.blueprint)
rot = utils.scenicToCarlaRotation(obj.heading)
transform = carla.Transform(loc, rot)
# Color, cannot be set for Pedestrians
if blueprint.has_attribute('color') and obj.color is not None:
c = obj.color
c_str = f'{int(c.r*255)},{int(c.g*255)},{int(c.b*255)}'
blueprint.set_attribute('color', c_str)
# Create Carla actor
carlaActor = self.world.try_spawn_actor(blueprint, transform)
if carlaActor is None:
self.destroy()
raise SimulationCreationError(f'Unable to spawn object {obj}')
obj.carlaActor = carlaActor
carlaActor.set_simulate_physics(obj.physics)
if isinstance(carlaActor, carla.Vehicle):
# TODO should get dimensions at compile time, not simulation time
obj.width = carlaActor.bounding_box.extent.y * 2
obj.length = carlaActor.bounding_box.extent.x * 2
carlaActor.apply_control(carla.VehicleControl(manual_gear_shift=True, gear=1))
elif isinstance(carlaActor, carla.Walker):
carlaActor.apply_control(carla.WalkerControl())
# spawn walker controller
controller_bp = self.blueprintLib.find('controller.ai.walker')
controller = self.world.try_spawn_actor(controller_bp, carla.Transform(), carlaActor)
if controller is None:
self.destroy()
raise SimulationCreationError(f'Unable to spawn carla controller for object {obj}')
obj.carlaController = controller
return carlaActor
def executeActions(self, allActions):
super().executeActions(allActions)
# Apply control updates which were accumulated while executing the actions
for obj in self.agents:
ctrl = obj._control
if ctrl is not None:
obj.carlaActor.apply_control(ctrl)
obj._control = None
def step(self):
# Run simulation for one timestep
self.world.tick()
# Render simulation
if self.render:
# self.hud.tick(self.world, self.ego, self.displayClock)
self.cameraManager.render(self.display)
# self.hud.render(self.display)
pygame.display.flip()
def getProperties(self, obj, properties):
# Extract Carla properties
carlaActor = obj.carlaActor
currTransform = carlaActor.get_transform()
currLoc = currTransform.location
currRot = currTransform.rotation
currVel = carlaActor.get_velocity()
currAngVel = carlaActor.get_angular_velocity()
# Prepare Scenic object properties
velocity = utils.carlaToScenicPosition(currVel)
speed = math.hypot(*velocity)
values = dict(
position=utils.carlaToScenicPosition(currLoc),
elevation=utils.carlaToScenicElevation(currLoc),
heading=utils.carlaToScenicHeading(currRot),
velocity=velocity,
speed=speed,
angularSpeed=utils.carlaToScenicAngularSpeed(currAngVel),
)
return values
def destroy(self):
for obj in self.objects:
if obj.carlaActor is not None:
if isinstance(obj.carlaActor, carla.Vehicle):
obj.carlaActor.set_autopilot(False, self.tm.get_port())
if isinstance(obj.carlaActor, carla.Walker):
obj.carlaController.stop()
obj.carlaController.destroy()
obj.carlaActor.destroy()
if self.render and self.cameraManager:
self.cameraManager.destroy_sensor()
self.client.stop_recorder()
self.world.tick()
super().destroy()