Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft for making semantic costmaps more flexible. #195

Merged
merged 8 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions requirements-resolver.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
-r requirements.txt
probabilistic_model>=5.0.3
random_events>=3.0.4

3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ pynput~=1.7.7
playsound~=1.3.0
pydub~=0.25.1
gTTS~=2.5.3
probabilistic_model>=5.1.0
random_events>=3.0.7
sympy
174 changes: 169 additions & 5 deletions src/pycram/costmaps.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
# used for delayed evaluation of typing until python 3.11 becomes mainstream
from __future__ import annotations

from typing_extensions import Tuple, List, Optional

import matplotlib.pyplot as plt
from dataclasses import dataclass

import matplotlib.pyplot as plt
import numpy as np
import psutil
import random_events
import rospy
import tf
from matplotlib import colors
from nav_msgs.msg import OccupancyGrid, MapMetaData

from probabilistic_model.probabilistic_circuit.nx.distributions import UniformDistribution
from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit, ProductUnit
from random_events.interval import Interval, reals, closed_open, closed
from random_events.product_algebra import Event, SimpleEvent
from random_events.variable import Continuous
from typing_extensions import Tuple, List, Optional, Iterator

from .datastructures.dataclasses import AxisAlignedBoundingBox
from .datastructures.pose import Pose
from .datastructures.world import UseProspectionWorld
from .world_concepts.world_object import Object
from .datastructures.world import World
from .description import Link
from .local_transformer import LocalTransformer
from .world_concepts.world_object import Object

from .datastructures.pose import Pose, Transform
from .datastructures.world import World
from .datastructures.dataclasses import AxisAlignedBoundingBox, BoxVisualShape, Color
Expand Down Expand Up @@ -802,6 +812,160 @@ def get_aabb_for_link(self) -> AxisAlignedBoundingBox:
return self.link.get_axis_aligned_bounding_box()


class AlgebraicSemanticCostmap(SemanticCostmap):
"""
Class for a semantic costmap that is based on an algebraic set-description of the valid area.
"""
x: Continuous = Continuous("x")
"""
The variable for height.
"""

y: Continuous = Continuous("y")
"""
The variable for width.
"""

original_valid_area: Optional[SimpleEvent]
"""
The original rectangle of the valid area.
"""

valid_area: Optional[Event]
"""
A description of the valid positions as set.
"""

number_of_samples: int
"""
The number of samples to generate for the iter.
"""

def __init__(self, object, urdf_link_name, world=None, number_of_samples=1000):
super().__init__(object, urdf_link_name, world=world)
self.number_of_samples = number_of_samples

def check_valid_area_exists(self):
assert self.valid_area is not None, ("The map has to be created before semantics can be applied. "
"Call 'generate_map first'")

def left(self, margin = 0.) -> Event:
"""
Create an event left of the origins Y-Coordinate.
:param margin: The margin of the events left bound.
:return: The left event.
"""
self.check_valid_area_exists()
y_origin = self.origin.position.y
left = self.original_valid_area[self.y].simple_sets[0].lower
left += margin
event = SimpleEvent(
{self.x: reals(), self.y: random_events.interval.open(left, y_origin)}).as_composite_set()
return event

def right(self, margin = 0.) -> Event:
"""
Create an event right of the origins Y-Coordinate.
:param margin: The margin of the events right bound.
:return: The right event.
"""
self.check_valid_area_exists()
y_origin = self.origin.position.y
right = self.original_valid_area[self.y].simple_sets[0].upper
right -= margin
event = SimpleEvent({self.x: reals(), self.y: closed_open(y_origin, right)}).as_composite_set()
return event

def top(self, margin = 0.) -> Event:
"""
Create an event above the origins X-Coordinate.
:param margin: The margin of the events upper bound.
:return: The top event.
"""
self.check_valid_area_exists()
x_origin = self.origin.position.x
top = self.original_valid_area[self.x].simple_sets[0].upper
top -= margin
event = SimpleEvent(
{self.x: random_events.interval.closed_open(x_origin, top), self.y: reals()}).as_composite_set()
return event

def bottom(self, margin = 0.) -> Event:
"""
Create an event below the origins X-Coordinate.
:param margin: The margin of the events lower bound.
:return: The bottom event.
"""
self.check_valid_area_exists()
x_origin = self.origin.position.x
lower = self.original_valid_area[self.x].simple_sets[0].lower
lower += margin
event = SimpleEvent(
{self.x: random_events.interval.open(lower, x_origin), self.y: reals()}).as_composite_set()
return event

def inner(self, margin=0.2):
min_x = self.original_valid_area[self.x].simple_sets[0].lower
max_x = self.original_valid_area[self.x].simple_sets[0].upper
min_y = self.original_valid_area[self.y].simple_sets[0].lower
max_y = self.original_valid_area[self.y].simple_sets[0].upper

min_x += margin
max_x -= margin
min_y += margin
max_y -= margin

inner_event = SimpleEvent({self.x: closed(min_x, max_x),
self.y: closed(min_y, max_y)}).as_composite_set()
return inner_event

def border(self, margin=0.2):
return ~self.inner(margin)

def generate_map(self) -> None:
super().generate_map()
valid_area = Event()
for rectangle in self.partitioning_rectangles():
# rectangle.scale(1/self.resolution, 1/self.resolution)
rectangle.translate(self.origin.position.x, self.origin.position.y)
valid_area.simple_sets.add(SimpleEvent({self.x: closed(rectangle.x_lower, rectangle.x_upper),
self.y: closed(rectangle.y_lower, rectangle.y_upper)}))

assert len(valid_area.simple_sets) == 1, ("The map at the basis of a Semantic costmap must be an axis aligned"
"bounding box")
self.valid_area = valid_area
self.original_valid_area = self.valid_area.simple_sets[0]

def as_distribution(self) -> ProbabilisticCircuit:
p_xy = ProductUnit()
u_x = UniformDistribution(self.x, self.original_valid_area[self.x].simple_sets[0])
u_y = UniformDistribution(self.y, self.original_valid_area[self.y].simple_sets[0])
p_xy.add_subcircuit(u_x)
p_xy.add_subcircuit(u_y)

conditional, _ = p_xy.conditional(self.valid_area)
return conditional.probabilistic_circuit

def sample_to_pose(self, sample: np.ndarray) -> Pose:
"""
Convert a sample from the costmap to a pose.
:param sample: The sample to convert
:return: The pose corresponding to the sample
"""
x = sample[0]
y = sample[1]
position = [x, y, self.origin.position.z]
angle = np.arctan2(position[1] - self.origin.position.y, position[0] - self.origin.position.x) + np.pi
orientation = list(tf.transformations.quaternion_from_euler(0, 0, angle, axes="sxyz"))
return Pose(position, orientation, self.origin.frame)

def __iter__(self) -> Iterator[Pose]:
model = self.as_distribution()
samples = model.sample(self.number_of_samples)
for sample in samples:
yield self.sample_to_pose(sample)


cmap = colors.ListedColormap(['white', 'black', 'green', 'red', 'blue'])


Expand Down
16 changes: 8 additions & 8 deletions src/pycram/datastructures/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ class ExecutionType(Enum):
SIMULATED = auto()
SEMI_REAL = auto()

class Arms(Enum):
class Arms(int, Enum):
"""Enum for Arms."""
LEFT = auto()
RIGHT = auto()
BOTH = auto()
LEFT = 0
RIGHT = 1
BOTH = 2


class TaskStatus(Enum):
class TaskStatus(int, Enum):
"""
Enum for readable descriptions of a tasks' status.
"""
Expand All @@ -39,7 +39,7 @@ class JointType(Enum):
FLOATING = 7


class Grasp(Enum):
class Grasp(int, Enum):
"""
Enum for Grasp orientations.
"""
Expand All @@ -49,7 +49,7 @@ class Grasp(Enum):
TOP = 3


class ObjectType(Enum):
class ObjectType(int, Enum):
"""
Enum for Object types to easier identify different objects
"""
Expand All @@ -66,7 +66,7 @@ class ObjectType(Enum):
HUMAN = auto()


class State(Enum):
class State(int, Enum):
"""
Enumeration which describes the result of a language expression.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import numpy as np
import tqdm
from probabilistic_model.probabilistic_circuit.distributions import GaussianDistribution, SymbolicDistribution
from probabilistic_model.probabilistic_circuit.probabilistic_circuit import ProbabilisticCircuit, \
DecomposableProductUnit
from probabilistic_model.probabilistic_circuit.nx.distributions import GaussianDistribution, SymbolicDistribution
from probabilistic_model.probabilistic_circuit.nx.probabilistic_circuit import ProbabilisticCircuit, \
ProductUnit
from probabilistic_model.utils import MissingDict
from random_events.interval import *
from random_events.product_algebra import Event, SimpleEvent
Expand Down Expand Up @@ -124,7 +124,7 @@ def create_model_with_center(self) -> ProbabilisticCircuit:
"""
Create a fully factorized gaussian at the center of the map.
"""
centered_model = DecomposableProductUnit()
centered_model = ProductUnit()
centered_model.add_subcircuit(GaussianDistribution(self.relative_x, 0., np.sqrt(self.variance)))
centered_model.add_subcircuit(GaussianDistribution(self.relative_y, 0., np.sqrt(self.variance)))

Expand Down Expand Up @@ -300,8 +300,6 @@ def query_for_database():
def batch_rollout(self):
"""
Try the policy without conditioning on visibility and occupancy and count the successful tries.

:amount: The amount of tries
"""

# initialize statistics
Expand Down
22 changes: 10 additions & 12 deletions src/pycram/pose_generator_and_validator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import tf
import numpy as np
import tf
from typing_extensions import Tuple, List, Union, Dict, Iterable

from .datastructures.pose import Pose, Transform
from .datastructures.world import World
from .external_interfaces.ik import request_ik
from .local_transformer import LocalTransformer
from .plan_failures import IKError
from .robot_description import RobotDescription
from .world_concepts.world_object import Object
from .world_reasoning import contact
from .costmaps import Costmap
from .local_transformer import LocalTransformer
from .datastructures.pose import Pose, Transform
from .robot_description import RobotDescription
from .external_interfaces.ik import request_ik
from .plan_failures import IKError
from .utils import _apply_ik
from typing_extensions import Tuple, List, Union, Dict, Iterable


class PoseGenerator:
Expand Down Expand Up @@ -186,7 +185,8 @@ def reachability_validator(pose: Pose,
res = False
arms = []
for description in manipulator_descs:
retract_target_pose = LocalTransformer().transform_pose(target, robot.get_link_tf_frame(description.end_effector.tool_frame))
retract_target_pose = LocalTransformer().transform_pose(target, robot.get_link_tf_frame(
description.end_effector.tool_frame))
retract_target_pose.position.x -= 0.07 # Care hard coded value copied from PlaceAction class

# retract_pose needs to be in world frame?
Expand Down Expand Up @@ -245,8 +245,6 @@ def collision_check(robot: Object, allowed_collision: Dict[Object, List]):
for obj in World.current_world.objects:
if obj.name == "floor":
continue
in_contact= _in_contact(robot, obj, allowed_collision, allowed_robot_links)
in_contact = _in_contact(robot, obj, allowed_collision, allowed_robot_links)

return in_contact


39 changes: 37 additions & 2 deletions test/test_costmaps.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import unittest

import numpy as np
from random_events.variable import Continuous
# import plotly.graph_objects as go
from random_events.product_algebra import Event, SimpleEvent
from random_events.interval import *

from bullet_world_testcase import BulletWorldTestCase
from pycram.costmaps import OccupancyCostmap
from pycram.costmaps import OccupancyCostmap, AlgebraicSemanticCostmap
from pycram.datastructures.pose import Pose
import plotly.graph_objects as go


class TestCostmapsCase(BulletWorldTestCase):
class CostmapTestCase(BulletWorldTestCase):

def test_raytest_bug(self):
for i in range(30):
Expand Down Expand Up @@ -55,3 +58,35 @@ def test_visualize(self):
o = OccupancyCostmap(0.2, from_ros=False, size=200, resolution=0.02,
origin=Pose([0, 0, 0], [0, 0, 0, 1]))
o.visualize()


class SemanticCostmapTestCase(BulletWorldTestCase):

def test_generate_map(self):
costmap = AlgebraicSemanticCostmap(self.kitchen, "kitchen_island_surface")
costmap.valid_area &= costmap.left()
costmap.valid_area &= costmap.top()
costmap.valid_area &= costmap.border(0.2)
self.assertEqual(len(costmap.valid_area.simple_sets), 2)

def test_as_distribution(self):
costmap = AlgebraicSemanticCostmap(self.kitchen, "kitchen_island_surface")
costmap.valid_area &= costmap.right() & costmap.bottom() & costmap.border(0.2)
model = costmap.as_distribution()
self.assertEqual(len(model.nodes), 7)
# fig = go.Figure(model.plot(), model.plotly_layout())
# fig.show()
# supp = model.support
# fig = go.Figure(supp.plot(), supp.plotly_layout())
# fig.show()

def test_iterate(self):
costmap = AlgebraicSemanticCostmap(self.kitchen, "kitchen_island_surface")
costmap.valid_area &= costmap.left() & costmap.top() & costmap.border(0.2)
for sample in iter(costmap):
self.assertIsInstance(sample, Pose)
self.assertTrue(costmap.valid_area.contains([sample.position.x, sample.position.y]))


class OntologySemanticLocationTestCase(unittest.TestCase):
...
Loading
Loading