Source code for scenic.simulators.webots.road.interface

"""Python library supporting the main Scenic module."""

import itertools
import math
import sys
import time

import numpy as np
import shapely.geometry
import shapely.ops

from scenic.core.geometry import (
    cleanChain,
    normalizeAngle,
    plotPolygon,
    polygonUnion,
    rotateVector,
)
from scenic.core.regions import PolygonalRegion, PolylineRegion, nowhere
from scenic.core.vectors import PolygonalVectorField
from scenic.core.workspaces import Workspace
import scenic.simulators.webots.world_parser as world_parser
from scenic.syntax.veneer import verbosePrint


def polygonWithPoints(points):
    polygon = shapely.geometry.Polygon(points)
    if not polygon.is_valid:  # TODO improve hack?
        verbosePrint(f"WARNING: simplifying invalid polygon with points {points}")
        polygon = polygon.simplify(0.5)
        if not polygon.is_valid:
            raise RuntimeError(f"unable to simplify polygon {polygon}")
    return polygon


def regionWithPolygons(polygons, orientation=None):
    if polygons.is_empty:
        return nowhere
    else:
        return PolygonalRegion(polygon=polygons, orientation=orientation)


## Classes for WBT nodes we are interested in


[docs]class OSMObject: """Objects with OSM id tags""" def __init__(self, attrs): self.attrs = attrs self.osmID = attrs["id"]
[docs]class Road(OSMObject): """OSM roads""" def __init__(self, attrs, driveOnLeft=False): super().__init__(attrs) self.driveOnLeft = driveOnLeft self.translation = attrs["translation"] pts = [ np.array(webotsToScenicPosition(p + self.translation)) for p in attrs["wayPoints"] ] self.waypoints = tuple(cleanChain(pts, 0.05)) assert len(self.waypoints) > 1, pts self.width = float(attrs.get("width", 7)) self.lanes = int(attrs.get("numberOfLanes", 2)) if self.lanes < 1: raise RuntimeError(f"Road {self.osmID} has fewer than 1 lane!") self.forwardLanes = int(attrs.get("numberOfForwardLanes", 1)) # if self.forwardLanes < 1: # raise RuntimeError(f'Road {self.osmID} has fewer than 1 forward lane!') self.backwardLanes = self.lanes - self.forwardLanes self.hasLeftSidewalk = attrs.get("leftBorder", True) self.hasRightSidewalk = attrs.get("rightBorder", True) self.sidewalkWidths = list(attrs.get("roadBorderWidth", [0.8])) if (self.hasLeftSidewalk or self.hasRightSidewalk) and len( self.sidewalkWidths ) < 1: raise RuntimeError(f"Road {self.osmID} has sidewalk with empty width!") self.startCrossroad = attrs.get("startJunction") self.endCrossroad = attrs.get("endJunction") def computeGeometry(self, crossroads, snapTolerance=0.05): ## Approximate bounding polygon and sidewalks # TODO improve this!!! lefts, rights = [], [] leftSidewalk, rightSidewalk = [], [] headings = [] sidewalkWidths = itertools.chain( self.sidewalkWidths, itertools.repeat(self.sidewalkWidths[-1]) ) segments = zip(self.waypoints, sidewalkWidths) for i, segment in enumerate(segments): point, sidewalkWidth = segment if i + 1 < len(self.waypoints): nextPt = self.waypoints[i + 1] dx, dy = nextPt - point heading = normalizeAngle(math.atan2(dy, dx) - (math.pi / 2)) headings.append(heading) perp = np.array([-dy, dx]) perp /= np.linalg.norm(perp) else: pass # use perp from last segment toEdge = perp * (self.width / 2) left = point + toEdge right = point - toEdge lefts.append(left) rights.append(right) toEdge = perp * sidewalkWidth leftSidewalk.append(left + toEdge) rightSidewalk.append(right - toEdge) # Snap to adjacent crossroads if possible if snapTolerance > 0: sc = self.startCrossroad if sc is not None: if sc not in crossroads: raise RuntimeError( f"Road {self.osmID} begins at invalid crossroad {sc}" ) crossroad = crossroads[sc] if crossroad.region is not None: pt = shapely.geometry.Point(lefts[0]) pt = shapely.ops.snap(pt, crossroad.region.polygons, snapTolerance) lefts[0] = np.array([pt.x, pt.y]) pt = shapely.geometry.Point(rights[0]) pt = shapely.ops.snap(pt, crossroad.region.polygons, snapTolerance) rights[0] = np.array([pt.x, pt.y]) perp = lefts[0] - rights[0] toEdge = perp * (self.sidewalkWidths[0] / np.linalg.norm(perp)) leftSidewalk[0] = lefts[0] + toEdge rightSidewalk[0] = rights[0] - toEdge ec = self.endCrossroad if ec is not None: if ec not in crossroads: raise RuntimeError( f"Road {self.osmID} ends at invalid crossroad {ec}" ) crossroad = crossroads[ec] if crossroad.region is not None: pt = shapely.geometry.Point(lefts[-1]) pt = shapely.ops.snap(pt, crossroad.region.polygons, snapTolerance) lefts[-1] = np.array([pt.x, pt.y]) pt = shapely.geometry.Point(rights[-1]) pt = shapely.ops.snap(pt, crossroad.region.polygons, snapTolerance) rights[-1] = np.array([pt.x, pt.y]) perp = lefts[-1] - rights[-1] toEdge = perp * (self.sidewalkWidths[-1] / np.linalg.norm(perp)) leftSidewalk[-1] = lefts[-1] + toEdge rightSidewalk[-1] = rights[-1] - toEdge roadPoints = lefts + list(reversed(rights)) self.leftCurb = PolylineRegion(reversed(lefts)) self.rightCurb = PolylineRegion(rights) self.leftSidewalk = self.rightSidewalk = None if self.hasLeftSidewalk: points = lefts + list(reversed(leftSidewalk)) polygon = polygonWithPoints(points) assert polygon.is_valid, self.waypoints self.leftSidewalk = PolygonalRegion(polygon=polygon) if self.hasRightSidewalk: points = rights + list(reversed(rightSidewalk)) polygon = polygonWithPoints(points) assert polygon.is_valid, self.waypoints self.rightSidewalk = PolygonalRegion(polygon=polygon) ## Compute lanes and traffic directions cells = [] la, ra = lefts[0], rights[0] gapA = (ra - la) / self.lanes markerA = ra laneMarkers = [[] for lane in range(self.lanes)] for lb, rb, heading in zip(lefts[1:], rights[1:], headings): # Compute lanes for this segment of road gapB = (rb - lb) / self.lanes markerB = rb for lane, markers in enumerate(laneMarkers): forward = lane < self.forwardLanes if self.driveOnLeft: forward = not forward nextMarkerA = markerA - gapA nextMarkerB = markerB - gapB markers.append(nextMarkerA) cell = shapely.geometry.Polygon( (markerA, markerB, nextMarkerB, nextMarkerA) ) heading = heading if forward else normalizeAngle(heading + math.pi) cells.append((cell, heading)) markerA = nextMarkerA markerB = nextMarkerB gapA = gapB markerA = rb self.lanes = [] markerB = rb rightEdge = rights for lane, markers in enumerate(laneMarkers): markerB = markerB - gapB markers.append(markerB) self.lanes.append(PolygonalRegion(rightEdge + list(reversed(markers)))) rightEdge = markers self.laneMarkers = laneMarkers[:-1] self.cells = cells self.direction = PolygonalVectorField(f"Road{self.osmID}Direction", cells) roadPolygon = polygonWithPoints(roadPoints) self.region = PolygonalRegion(polygon=roadPolygon, orientation=self.direction) def show(self, plt): if self.hasLeftSidewalk: x, y = zip(*[p[:2] for p in self.leftSidewalk.boundary.points]) plt.fill(x, y, "#A0A0FF") if self.hasRightSidewalk: x, y = zip(*[p[:2] for p in self.rightSidewalk.boundary.points]) plt.fill(x, y, "#A0A0FF") self.region.show(plt, style="r:") x, y = zip(*[p[:2] for p in self.lanes[0].boundary.points]) plt.fill(x, y, color=(0.8, 1.0, 0.8)) for lane, markers in enumerate(self.laneMarkers): x, y = zip(*markers) color = (0.8, 0.8, 0) if lane == self.backwardLanes - 1 else (0.3, 0.3, 0.3) plt.plot(x, y, "--", color=color)
[docs]class Crossroad(OSMObject): """OSM crossroads""" def __init__(self, attrs): super().__init__(attrs) self.translation = attrs["translation"] points = list( np.array(webotsToScenicPosition(p + self.translation)) for p in attrs["shape"] ) if len(points) > 0: self.points = points self.region = PolygonalRegion(self.points) else: verbosePrint(f"WARNING: Crossroad {self.osmID} has empty shape field!") self.region = None def show(self, plt): if self.region is not None: x, y = zip(*self.points) plt.fill(x, y, color=(1, 0.9, 0.9)) plt.plot(x, y, ":", color=(1, 0.5, 0))
[docs]class PedestrianCrossing: """PedestrianCrossing nodes""" def __init__(self, attrs): self.translation = attrs.get("translation", np.array((0, 0, 0))) pos = np.array(webotsToScenicPosition(self.translation)) name = attrs.get("name", "") self.angle = webotsToScenicRotation(attrs.get("rotation", (0, 1, 0, 0))) if self.angle is None: raise RuntimeError(f'PedestrianCrossing "{name}" is not 2D!') size = attrs.get("size", (20, 8)) self.length, self.width = float(size[0]), float(size[1]) self.length += 0.2 # pad length to intersect sidewalks better # TODO improve? hl, hw = self.length / 2, self.width / 2 self.corners = tuple( pos + rotateVector(vec, -self.angle) for vec in ((hl, hw), (-hl, hw), (-hl, -hw), (hl, -hw)) ) self.region = PolygonalRegion(self.corners) def show(self, plt): x, y = zip(*self.corners) plt.fill(x, y, color="#A0A0FF")
## Workspace derived from a WBT world class WebotsWorkspace(Workspace): def __init__(self, world): # Find roads, crossroads, and pedestrian crossings nodeClasses = { "Road": Road, "Crossroad": Crossroad, "PedestrianCrossing": PedestrianCrossing, } self.roads, self.crossroads, self.crossings = world_parser.findNodeTypesIn( ("Road", "Crossroad", "PedestrianCrossing"), world, nodeClasses ) # Compute road geometry crossroadsByID = {crossroad.osmID: crossroad for crossroad in self.crossroads} for road in self.roads: road.computeGeometry(crossroadsByID) # Construct regions allCells = [] drivableAreas = [] for road in self.roads: assert road.region.polygons.is_valid, ( road.waypoints, road.region.boundary.points, ) allCells.extend(road.cells) for crossroad in self.crossroads: if crossroad.region is not None: for poly in crossroad.region.polygons.geoms: allCells.append((poly, None)) if not allCells: raise RuntimeError("Webots world has no drivable geometry!") self.roadDirection = PolygonalVectorField( "roadDirection", allCells, headingFunction=lambda pos: 0, defaultHeading=0, # TODO fix ) if not self.roads: roadPoly = None self.roadsRegion = nowhere self.curbsRegion = nowhere else: roadPoly = polygonUnion(road.region.polygons for road in self.roads) self.roadsRegion = PolygonalRegion( polygon=roadPoly, orientation=self.roadDirection ) drivableAreas.append(roadPoly) curbs = [road.leftCurb.lineString for road in self.roads] curbs.extend(road.rightCurb.lineString for road in self.roads) curbLine = shapely.ops.unary_union(curbs) self.curbsRegion = PolylineRegion(polyline=curbLine) if not self.crossroads: crossroadPoly = None self.crossroadsRegion = nowhere else: crossroadPoly = polygonUnion( cr.region.polygons for cr in self.crossroads if cr.region is not None ) self.crossroadsRegion = PolygonalRegion( polygon=crossroadPoly, orientation=self.roadDirection ) drivableAreas.append(crossroadPoly) sidewalks = [] walkableAreas = [] for road in self.roads: if road.hasLeftSidewalk: sidewalks.append(road.leftSidewalk.polygons) if road.hasRightSidewalk: sidewalks.append(road.rightSidewalk.polygons) if not sidewalks: sidewalksPoly = None self.sidewalksRegion = nowhere else: sidewalksPoly = polygonUnion(sidewalks) self.sidewalksRegion = regionWithPolygons(sidewalksPoly) walkableAreas.append(sidewalksPoly) if not self.crossings: crossingsPoly = None self.crossingsRegion = nowhere else: crossingsPoly = polygonUnion( crossing.region.polygons for crossing in self.crossings ) self.crossingsRegion = regionWithPolygons(crossingsPoly) walkableAreas.append(crossingsPoly) if not walkableAreas: self.walkableRegion = nowhere else: walkablePoly = polygonUnion(walkableAreas) self.walkableRegion = regionWithPolygons(walkablePoly) drivablePoly = polygonUnion(drivableAreas) self.drivableRegion = PolygonalRegion( polygon=drivablePoly, orientation=self.roadDirection ) workspacePoly = polygonUnion(drivableAreas + walkableAreas) self.workspaceRegion = PolygonalRegion(polygon=workspacePoly) # Identify various roads and lanes of interest slowCurbs = [] for road in self.roads: if road.forwardLanes > 1: slowCurbs.append(road.rightCurb) if road.backwardLanes > 1: slowCurbs.append(road.leftCurb) self.slowCurbs = slowCurbs super().__init__(self.workspaceRegion) def show2D(self, plt): self.drivableRegion.show(plt) for road in self.roads: road.show(plt) for crossroad in self.crossroads: crossroad.show(plt) for crossing in self.crossings: crossing.show(plt) for curb in self.slowCurbs: curb.show(plt, style="b-") @property def minimumZoomSize(self): return 30 # Old functions kept for backwards-compatibility
[docs]def webotsToScenicPosition(pos): """Convert a Webots position to a Scenic position. Drops the Webots Y coordinate. .. deprecated:: 2.1.0 Use ``WebotsCoordinateSystem`` instead. """ x, y, z = pos return (x, z)
[docs]def scenicToWebotsPosition(pos, y=0, coordinateSystem="ENU"): """Convert a Scenic position to a Webots position. .. deprecated:: 2.1.0 Use ``WebotsCoordinateSystem`` instead. """ x, z = pos return [x, y, z]
[docs]def webotsToScenicRotation(rot, tolerance2D=None): """Convert a Webots rotation vector to a Scenic heading. Assumes the object lies in the Webots X-Z plane, with a rotation axis close to the Y axis. If ``tolerance2D`` is given, returns ``None`` if the orientation of the object is not sufficiently close to being 2D. .. deprecated:: 2.1.0 Use ``WebotsCoordinateSystem`` instead. """ axis = np.array(rot[:3]) angle = rot[3] if tolerance2D is not None and np.linalg.norm(axis - (0, 1, 0)) > tolerance2D: return None return normalizeAngle(angle)
[docs]def scenicToWebotsRotation(heading): """Convert a Scenic heading to a Webots rotation vector. .. deprecated:: 2.1.0 Use ``WebotsCoordinateSystem`` instead. """ return [0, 1, 0, heading]