Module robofish.gym_guppy.guppies

Guppies and robots.

Expand source code
"""Guppies and robots."""
# isort:skip_file
from ._base_agents import (
    Agent,
    TurnBoostAgent,
    TorqueThrustAgent,
    ConstantVelocityAgent,
    TurnSpeedAgent,
    VelocityControlledAgent,
)
from ._guppy import (
    Guppy,
    DummyGuppy,
    TurnSpeedGuppy,
    TurnBoostGuppy,
    TorqueGuppy,
    BoostGuppy,
    RandomTurnBoostGuppy,
)
from ._robot import (
    Robot,
    TurnSpeedRobot,
    TurnBoostRobot,
    GoToRobot,
    GlobalTargetRobot,
    PolarCoordinateTargetRobot,
    TurnRobot,
)
from ._couzin_guppies import (
    BaseCouzinGuppy,
    ClassicCouzinGuppy,
    BoostCouzinGuppy,
    AdaptiveCouzinGuppy,
    BiasedAdaptiveCouzinGuppy,
)
from ._adaptive_agent import AdaptiveAgent
from ._perturbed_guppies import PerturbedAdaptiveCouzinGuppy

__all__ = [
    "Agent",
    "TurnBoostAgent",
    "ConstantVelocityAgent",
    "TurnSpeedAgent",
    "VelocityControlledAgent",
    "Guppy",
    "DummyGuppy",
    "TurnSpeedGuppy",
    "TurnBoostGuppy",
    "BaseCouzinGuppy",
    "ClassicCouzinGuppy",
    "BoostCouzinGuppy",
    "AdaptiveCouzinGuppy",
    "BiasedAdaptiveCouzinGuppy",
    "PerturbedAdaptiveCouzinGuppy",
    "Robot",
    "TurnSpeedRobot",
    "TurnBoostRobot",
    "GlobalTargetRobot",
    "PolarCoordinateTargetRobot",
    "TurnRobot",
    "GoToRobot",
    "TorqueGuppy",
    "TorqueThrustAgent",
    "AdaptiveAgent",
    "BoostGuppy",
    "RandomTurnBoostGuppy",
]
__pdoc__ = {
    "TurnBoostGuppy": False,
    "BaseCouzinGuppy": False,
    "ClassicCouzinGuppy": False,
    "BoostCouzinGuppy": False,
    "AdaptiveCouzinGuppy": False,
    "BiasedAdaptiveCouzinGuppy": False,
    "PerturbedAdaptiveCouzinGuppy": False,
    "TurnBoostRobot": False,
    "TorqueGuppy": False,
    "TorqueThrustAgent": False,
    "AdaptiveAgent": False,
    "BoostGuppy": False,
    "RandomTurnBoostGuppy": False,
    "Agent.set_color": False,
}

Classes

class Agent (*, world_bounds, **kwargs)

Base class for all agents.

Initialize the agent.

Args

world_bounds
The bounds of the world in meters.
Expand source code
class Agent(FishBody, abc.ABC):
    """Base class for all agents."""

    _max_linear_velocity = 0.2  # meters / s
    _max_angular_velocity = 0.5 * np.pi  # radians / s

    # adjust these parameters
    _density = 0.00025
    _friction = 0.0
    _restitution = 0.0

    _linear_damping = 15.0  # * _world_scale
    _angular_damping = 60.0  # * _world_scale

    def __init__(self, *, world_bounds, **kwargs):
        """Initialize the agent.

        Args:
            world_bounds: The bounds of the world in meters.
        """
        # all parameters in real world units
        super().__init__(length=0.02, width=0.004, **kwargs)
        self._body.bullet = True

        self._world_bounds = np.asarray(world_bounds)

        # will be set by the environment
        self._id = None

        # put all guppies into same group (negative so that no collisions are detected)
        self._fixtures[0].filterData.groupIndex = -1

        self._color = np.array((133, 133, 133))
        self._highlight_color = (255, 255, 255)

    @property
    def id(self):
        """Get the id of the agent."""
        return self._id

    @id.setter
    def id(self, id):
        """Set the id of the agent.

        Args:
            id: The id of the agent.

        Raises:
            Warning: If the id was already set.
        """
        if self._id is not None:
            warnings.warn(f"Agent id was changed from {self._id} to {id}")
        self._id = id

    @deprecated(
        version="0.4.0",
        reason="Rendering is deprecated. Use `robofish-io-render` instead.",
    )
    def set_color(self, color):
        """Set the color of the agent."""
        self._highlight_color = color

    @abc.abstractmethod
    def step(self, time_step: float) -> None:
        """Perform a step of the agent.

        This method is called by the environment.
        It should be implemented by the agent.

        Args:
            time_step (float): The time step in seconds.

        Returns:
            The reward for the step.
        """
        pass

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._base_agents.ConstantVelocityAgent
  • robofish.gym_guppy.guppies._base_agents.TorqueThrustAgent
  • robofish.gym_guppy.guppies._base_agents.TurnBoostAgent
  • robofish.gym_guppy.guppies._base_agents.TurnSpeedAgent
  • robofish.gym_guppy.guppies._base_agents.VelocityControlledAgent
  • robofish.gym_guppy.guppies._guppy.Guppy
  • robofish.gym_guppy.guppies._robot.Robot

Instance variables

var id

Get the id of the agent.

Expand source code
@property
def id(self):
    """Get the id of the agent."""
    return self._id

Methods

def step(self, time_step: float) ‑> None

Perform a step of the agent.

This method is called by the environment. It should be implemented by the agent.

Args

time_step : float
The time step in seconds.

Returns

The reward for the step.

Expand source code
@abc.abstractmethod
def step(self, time_step: float) -> None:
    """Perform a step of the agent.

    This method is called by the environment.
    It should be implemented by the agent.

    Args:
        time_step (float): The time step in seconds.

    Returns:
        The reward for the step.
    """
    pass

Inherited members

class ConstantVelocityAgent (**kwargs)

Base class for agents that move with constant velocity.

Initialize the agent.

Expand source code
class ConstantVelocityAgent(Agent, abc.ABC):
    """Base class for agents that move with constant velocity."""

    _linear_damping = 0.0
    _angular_damping = 0.0

    def __init__(self, **kwargs):
        """Initialize the agent."""
        super().__init__(**kwargs)

        self._velocity = 0.2

        self._turn = None
        self._max_turn = np.pi / 10.0

    def step(self, time_step):
        """Perform a step of the agent.

        This method is called by the environment.

        Args:
            time_step (float): The time step in seconds. (unused)
        """
        if self._turn:
            t = np.minimum(np.maximum(self._turn, -self._max_turn), self._max_turn)
            self._body.angle += t
            self._turn -= t

        # wall collisions may cause angular velocity, set back to 0
        self.set_angular_velocity(0.0)
        self.set_linear_velocity((self._velocity, 0.0), local=True)

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._couzin_guppies.ClassicCouzinGuppy

Methods

def step(self, time_step)

Perform a step of the agent.

This method is called by the environment.

Args

time_step : float
The time step in seconds. (unused)
Expand source code
def step(self, time_step):
    """Perform a step of the agent.

    This method is called by the environment.

    Args:
        time_step (float): The time step in seconds. (unused)
    """
    if self._turn:
        t = np.minimum(np.maximum(self._turn, -self._max_turn), self._max_turn)
        self._body.angle += t
        self._turn -= t

    # wall collisions may cause angular velocity, set back to 0
    self.set_angular_velocity(0.0)
    self.set_linear_velocity((self._velocity, 0.0), local=True)

Inherited members

class DummyGuppy (robot_poses=None, id=None, env=None, *args, **kwargs)

Dummy guppy for rollouts with live guppy.

Initialize guppy.

Expand source code
class DummyGuppy(Guppy):
    """Dummy guppy for rollouts with live guppy."""

    def compute_next_action(self, state: List[Agent], kd_tree: cKDTree = None):
        """Compute next action.

        Does nothing.
        """
        pass

    def step(self, time_step):
        """Step the guppy.

        Does nothing.
        """
        pass

Ancestors

  • robofish.gym_guppy.guppies._guppy.Guppy
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Methods

def compute_next_action(self, state: typing.List[robofish.gym_guppy.guppies._base_agents.Agent], kd_tree: scipy.spatial._ckdtree.cKDTree = None)

Compute next action.

Does nothing.

Expand source code
def compute_next_action(self, state: List[Agent], kd_tree: cKDTree = None):
    """Compute next action.

    Does nothing.
    """
    pass
def step(self, time_step)

Step the guppy.

Does nothing.

Expand source code
def step(self, time_step):
    """Step the guppy.

    Does nothing.
    """
    pass

Inherited members

class GlobalTargetRobot (modulated: bool = False, **kwargs: typing.Any)

Robot that moves to a given target position.

Initialize the robot.

Args

modulated
If true, the robot can also modulate its speed.
**kwargs
Additional arguments.
Expand source code
class GlobalTargetRobot(Robot, VelocityControlledAgent):
    """Robot that moves to a given target position."""

    def __init__(self, modulated: bool = False, **kwargs: Any) -> None:
        """Initialize the robot.

        Args:
            modulated: If true, the robot can also modulate its speed.
            **kwargs: Additional arguments.
        """
        super(GlobalTargetRobot, self).__init__(**kwargs)
        # set position epsilon to 2cm
        self._pos_eps = 0.02
        self._modulated = modulated

        if self._modulated:
            self._action_space = gym.spaces.Box(
                low=np.r_[self._world_bounds[0], 0.0],
                high=np.r_[self._world_bounds[1], 0.2],
                dtype=np.float32,
            )
        else:
            self._action_space = gym.spaces.Box(
                low=self._world_bounds[0],
                high=self._world_bounds[1],
                dtype=np.float32,
            )

    @property
    def action_space(self) -> gym.spaces.Box:
        """Get the action space of the robot.

        The action space is a box with the following bounds:
            [world_bounds[0], world_bounds[1]] for the x and y target in meters.
            [0.0, 0.2] for the speed if modulated is true.
        """
        return self._action_space

    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute.

        The action must be two-dimensional and have the following format: [x, y].
        If modulated is true, the action must be three-dimensional and have the
        following format: [x, y, speed].
        x and y are the absolute target position in meters.
        """
        if self._modulated:
            self.two_wheels_controller.fwd_ctrl.speed = action[-1]
            self._target = action[:-1]
        else:
            self._target = action

    def action_completed(self) -> bool:
        """Return true if the robot is close enough to the target position."""
        pos_error: float = np.linalg.norm(self._target - self.get_position())
        return pos_error < self._pos_eps

Ancestors

  • robofish.gym_guppy.guppies._robot.Robot
  • robofish.gym_guppy.guppies._base_agents.VelocityControlledAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Subclasses

  • robofish.gym_guppy.guppies._adaptive_agent.AdaptiveAgent
  • robofish.gym_guppy.guppies._robot.PolarCoordinateTargetRobot

Instance variables

var action_space : gym.spaces.box.Box

Get the action space of the robot.

The action space is a box with the following bounds: [world_bounds[0], world_bounds[1]] for the x and y target in meters. [0.0, 0.2] for the speed if modulated is true.

Expand source code
@property
def action_space(self) -> gym.spaces.Box:
    """Get the action space of the robot.

    The action space is a box with the following bounds:
        [world_bounds[0], world_bounds[1]] for the x and y target in meters.
        [0.0, 0.2] for the speed if modulated is true.
    """
    return self._action_space

Methods

def action_completed(self) ‑> bool

Return true if the robot is close enough to the target position.

Expand source code
def action_completed(self) -> bool:
    """Return true if the robot is close enough to the target position."""
    pos_error: float = np.linalg.norm(self._target - self.get_position())
    return pos_error < self._pos_eps
def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

The action must be two-dimensional and have the following format: [x, y]. If modulated is true, the action must be three-dimensional and have the following format: [x, y, speed]. x and y are the absolute target position in meters.

Expand source code
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute.

    The action must be two-dimensional and have the following format: [x, y].
    If modulated is true, the action must be three-dimensional and have the
    following format: [x, y, speed].
    x and y are the absolute target position in meters.
    """
    if self._modulated:
        self.two_wheels_controller.fwd_ctrl.speed = action[-1]
        self._target = action[:-1]
    else:
        self._target = action

Inherited members

class GoToRobot (**kwargs: typing.Any)

Robot that moves to a given target position.

Initialize the robot.

Expand source code
class GoToRobot(Robot, VelocityControlledAgent):
    """Robot that moves to a given target position."""

    _linear_damping = 0.0  # * _world_scale
    _angular_damping = 0.0  # * _world_scale

    @property
    def action_space(self) -> gym.spaces.Box:
        """Get the action space of the robot.

        The action space is a box with the following bounds:
            [-0.05, -0.05] for the x and y position of the target in meters.

        Note that the target position is relative to the robot's position.
        """
        return gym.spaces.Box(
            low=np.float32([-0.05, -0.05]), high=np.float32([0.05, 0.05])
        )

    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute.

        The action must be two-dimensional and have the following format: [x, y].
        Note that the target position is relative to the robot's position.
        """
        self._target = self.get_world_point(action)

Ancestors

  • robofish.gym_guppy.guppies._robot.Robot
  • robofish.gym_guppy.guppies._base_agents.VelocityControlledAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Instance variables

var action_space : gym.spaces.box.Box

Get the action space of the robot.

The action space is a box with the following bounds: [-0.05, -0.05] for the x and y position of the target in meters.

Note that the target position is relative to the robot's position.

Expand source code
@property
def action_space(self) -> gym.spaces.Box:
    """Get the action space of the robot.

    The action space is a box with the following bounds:
        [-0.05, -0.05] for the x and y position of the target in meters.

    Note that the target position is relative to the robot's position.
    """
    return gym.spaces.Box(
        low=np.float32([-0.05, -0.05]), high=np.float32([0.05, 0.05])
    )

Methods

def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

The action must be two-dimensional and have the following format: [x, y]. Note that the target position is relative to the robot's position.

Expand source code
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute.

    The action must be two-dimensional and have the following format: [x, y].
    Note that the target position is relative to the robot's position.
    """
    self._target = self.get_world_point(action)

Inherited members

class Guppy (robot_poses=None, id=None, env=None, *args, **kwargs)

Abstract base class for guppies.

Initialize guppy.

Expand source code
class Guppy(Agent, abc.ABC):
    """Abstract base class for guppies."""

    def __init__(self, robot_poses=None, id=None, env=None, *args, **kwargs):
        """Initialize guppy."""
        super().__init__(*args, **kwargs)

    @abc.abstractmethod
    def compute_next_action(self, state: List[Agent], kd_tree: cKDTree = None):
        """Compute next action.

        This method is called by the environment in the `step` method.

        Args:
            state: List of all agents in the environment.
            kd_tree: KDTree of all agents in the environment.
        """
        raise NotImplementedError

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._couzin_guppies.BaseCouzinGuppy
  • robofish.gym_guppy.guppies._guppy.BoostGuppy
  • robofish.gym_guppy.guppies._guppy.DummyGuppy
  • robofish.gym_guppy.guppies._guppy.RandomTurnBoostGuppy
  • robofish.gym_guppy.guppies._guppy.TorqueGuppy
  • robofish.gym_guppy.guppies._guppy.TurnBoostGuppy
  • robofish.gym_guppy.guppies._guppy.TurnSpeedGuppy

Methods

def compute_next_action(self, state: typing.List[robofish.gym_guppy.guppies._base_agents.Agent], kd_tree: scipy.spatial._ckdtree.cKDTree = None)

Compute next action.

This method is called by the environment in the step method.

Args

state
List of all agents in the environment.
kd_tree
KDTree of all agents in the environment.
Expand source code
@abc.abstractmethod
def compute_next_action(self, state: List[Agent], kd_tree: cKDTree = None):
    """Compute next action.

    This method is called by the environment in the `step` method.

    Args:
        state: List of all agents in the environment.
        kd_tree: KDTree of all agents in the environment.
    """
    raise NotImplementedError

Inherited members

class PolarCoordinateTargetRobot (**kwargs: typing.Any)

Robot that moves to a given target position in polar coordinates.

Initialize the robot.

Expand source code
class PolarCoordinateTargetRobot(GlobalTargetRobot):
    """Robot that moves to a given target position in polar coordinates."""

    def __init__(self, **kwargs: Any) -> None:
        """Initialize the robot."""
        super().__init__(**kwargs)

        self._action_space = gym.spaces.Box(
            low=np.float32((-np.pi, 0.0)),
            # TODO this was initially set to .3
            high=np.float32((np.pi, 0.15)),
        )

    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute.

        The action must be two-dimensional and have the following format:
        [angle, distance].
        (angle in radians and distance in meters.)

        This angle and distance are often referred to as turn and forward.

        Args:
            action (np.ndarray): The action to execute.
        """
        local_target = np.array((np.cos(action[0]), np.sin(action[0]))) * action[1]
        super(PolarCoordinateTargetRobot, self).set_action(
            self.get_global_point(local_target)
        )

Ancestors

  • robofish.gym_guppy.guppies._robot.GlobalTargetRobot
  • robofish.gym_guppy.guppies._robot.Robot
  • robofish.gym_guppy.guppies._base_agents.VelocityControlledAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Subclasses

  • robofish.gym_guppy.guppies._robot.TurnRobot

Methods

def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

The action must be two-dimensional and have the following format: [angle, distance]. (angle in radians and distance in meters.)

This angle and distance are often referred to as turn and forward.

Args

action : np.ndarray
The action to execute.
Expand source code
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute.

    The action must be two-dimensional and have the following format:
    [angle, distance].
    (angle in radians and distance in meters.)

    This angle and distance are often referred to as turn and forward.

    Args:
        action (np.ndarray): The action to execute.
    """
    local_target = np.array((np.cos(action[0]), np.sin(action[0]))) * action[1]
    super(PolarCoordinateTargetRobot, self).set_action(
        self.get_global_point(local_target)
    )

Inherited members

class Robot (**kwargs: typing.Any)

Base class for robots.

Initialize the robot.

Expand source code
class Robot(Agent, abc.ABC):
    """Base class for robots."""

    _linear_damping = 0.0
    _angular_damping = 0.0

    def __init__(self, **kwargs: Any) -> None:
        """Initialize the robot."""
        super().__init__(**kwargs)
        self._env_state = None
        self._env_kd_tree = None

        self._color = np.array((165, 98, 98))

    def set_env_state(self, env_state: np.ndarray, kd_tree: cKDTree) -> None:
        """Set the environment state and kd tree.

        Args:
            env_state: The environment state.
            kd_tree: The kd tree.
        """
        self._env_state = env_state
        self._env_kd_tree = kd_tree

    @property
    @abc.abstractmethod
    def action_space(self) -> gym.spaces.Box:
        """Get the action space of the robot."""
        pass

    @abc.abstractmethod
    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute."""
        pass

    @abc.abstractmethod
    def action_completed(self) -> bool:
        """Check if the current action is completed."""
        pass

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._robot.GlobalTargetRobot
  • robofish.gym_guppy.guppies._robot.GoToRobot
  • robofish.gym_guppy.guppies._robot.TurnBoostRobot
  • robofish.gym_guppy.guppies._robot.TurnSpeedRobot

Instance variables

var action_space : gym.spaces.box.Box

Get the action space of the robot.

Expand source code
@property
@abc.abstractmethod
def action_space(self) -> gym.spaces.Box:
    """Get the action space of the robot."""
    pass

Methods

def action_completed(self) ‑> bool

Check if the current action is completed.

Expand source code
@abc.abstractmethod
def action_completed(self) -> bool:
    """Check if the current action is completed."""
    pass
def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

Expand source code
@abc.abstractmethod
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute."""
    pass
def set_env_state(self, env_state: numpy.ndarray, kd_tree: scipy.spatial._ckdtree.cKDTree) ‑> None

Set the environment state and kd tree.

Args

env_state
The environment state.
kd_tree
The kd tree.
Expand source code
def set_env_state(self, env_state: np.ndarray, kd_tree: cKDTree) -> None:
    """Set the environment state and kd tree.

    Args:
        env_state: The environment state.
        kd_tree: The kd tree.
    """
    self._env_state = env_state
    self._env_kd_tree = kd_tree

Inherited members

class TurnBoostAgent (**kwargs)

Base class for agents that can turn and boost.

The TurnBoostGuppy approximates the behaviour observed with real fish, by first turning into a desired direction and then performing a forward boost.

The TurnBoostGuppy can be controlled by setting the turn and boost properties.

Initialize the agent.

Expand source code
class TurnBoostAgent(Agent, abc.ABC):
    """Base class for agents that can turn and boost.

    The TurnBoostGuppy approximates the behaviour observed with real fish,
    by first turning into a desired direction and then performing a forward boost.

    The TurnBoostGuppy can be controlled by setting the turn and boost properties.
    """

    def __init__(self, **kwargs):
        """Initialize the agent."""
        super().__init__(**kwargs)

        self._max_turn_per_step = np.pi / 4.0
        self._max_boost_per_step = 0.01
        self._max_turn = 4 * self._max_turn_per_step
        self._max_boost = 2 * self._max_boost_per_step

        self.__turn = None
        self.__boost = None
        self.__reset_velocity = False

    @property
    def turn(self):
        """Get the turn of the agent."""
        return self.__turn

    @turn.setter
    def turn(self, turn):
        """Set the turn of the agent.

        Args:
            turn (float): The turn of the agent in radians.
        """
        self.__reset_velocity = True
        self.__turn = np.minimum(np.maximum(turn, -self._max_turn), self._max_turn)

    @property
    def boost(self):
        """Get the boost of the agent."""
        return self.__boost

    @boost.setter
    def boost(self, boost):
        """Set the boost of the agent.

        Args:
            boost (float): The boost of the agent.
        """
        self.__reset_velocity = True
        self.__boost = np.minimum(np.maximum(boost, 0.0), self._max_boost)

    def step(self, time_step: float) -> None:
        """Perform a step of the agent.

        This method is called by the environment.

        Args:
            time_step (float): The time step in seconds. (unused)
        """
        if self.__reset_velocity:
            self._body.linearVelocity = b2Vec2(0.0, 0.0)
            self.__reset_velocity = False
        if self.turn:
            t = np.minimum(
                np.maximum(self.turn, -self._max_turn_per_step), self._max_turn_per_step
            )
            self._body.angle += t
            self.turn -= t
        elif self.boost:
            b = np.minimum(self.__boost, self._max_boost_per_step)
            self._body.ApplyLinearImpulse(
                self._body.GetWorldVector(b2Vec2(b, 0.0)),
                point=self._body.worldCenter,
                wake=True,
            )
            self.__boost -= b

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._couzin_guppies.BoostCouzinGuppy
  • robofish.gym_guppy.guppies._guppy.RandomTurnBoostGuppy
  • robofish.gym_guppy.guppies._guppy.TurnBoostGuppy
  • robofish.gym_guppy.guppies._robot.TurnBoostRobot

Instance variables

var boost

Get the boost of the agent.

Expand source code
@property
def boost(self):
    """Get the boost of the agent."""
    return self.__boost
var turn

Get the turn of the agent.

Expand source code
@property
def turn(self):
    """Get the turn of the agent."""
    return self.__turn

Methods

def step(self, time_step: float) ‑> None

Perform a step of the agent.

This method is called by the environment.

Args

time_step : float
The time step in seconds. (unused)
Expand source code
def step(self, time_step: float) -> None:
    """Perform a step of the agent.

    This method is called by the environment.

    Args:
        time_step (float): The time step in seconds. (unused)
    """
    if self.__reset_velocity:
        self._body.linearVelocity = b2Vec2(0.0, 0.0)
        self.__reset_velocity = False
    if self.turn:
        t = np.minimum(
            np.maximum(self.turn, -self._max_turn_per_step), self._max_turn_per_step
        )
        self._body.angle += t
        self.turn -= t
    elif self.boost:
        b = np.minimum(self.__boost, self._max_boost_per_step)
        self._body.ApplyLinearImpulse(
            self._body.GetWorldVector(b2Vec2(b, 0.0)),
            point=self._body.worldCenter,
            wake=True,
        )
        self.__boost -= b

Inherited members

class TurnRobot (forward: float = 0.15, **kwargs: typing.Any)

Robot that turns to a given angle.

This robot's forward speed is not controlled by the action. However, the forward speed is not constant and depends on the robot's PID controller.

Initialize the robot.

Args

forward
The distance the robot will move forward.
**kwargs
Additional arguments.
Expand source code
class TurnRobot(PolarCoordinateTargetRobot):
    """Robot that turns to a given angle.

    This robot's forward speed is not controlled by the action.
    However, the forward speed is not constant and depends on the robot's PID
    controller.
    """

    def __init__(self, forward: float = 0.15, **kwargs: Any) -> None:
        """Initialize the robot.

        Args:
            forward: The distance the robot will move forward.
            **kwargs: Additional arguments.
        """
        super().__init__(**kwargs)

        self.forward = forward
        self._action_space = gym.spaces.Box(
            low=np.float32((-np.pi,)),
            high=np.float32((np.pi,)),
        )

    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute.

        The action must be one-dimensional and have the following format: [angle].
        angle is the angle in radians.
        """
        super().set_action(np.float32([action[0], self.forward]))

Ancestors

  • robofish.gym_guppy.guppies._robot.PolarCoordinateTargetRobot
  • robofish.gym_guppy.guppies._robot.GlobalTargetRobot
  • robofish.gym_guppy.guppies._robot.Robot
  • robofish.gym_guppy.guppies._base_agents.VelocityControlledAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Methods

def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

The action must be one-dimensional and have the following format: [angle]. angle is the angle in radians.

Expand source code
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute.

    The action must be one-dimensional and have the following format: [angle].
    angle is the angle in radians.
    """
    super().set_action(np.float32([action[0], self.forward]))

Inherited members

class TurnSpeedAgent (max_turn=3.141592653589793, max_speed=1.0, **kwargs)

Base class for agents that can turn and move with adjustable speed.

Initialize the agent.

Expand source code
class TurnSpeedAgent(Agent, abc.ABC):
    """Base class for agents that can turn and move with adjustable speed."""

    def __init__(self, max_turn=np.pi, max_speed=1.0, **kwargs):
        """Initialize the agent."""
        super().__init__(**kwargs)

        self._max_turn = max_turn
        self._max_speed = max_speed

        self.__turn = None
        self.__speed = None

    @property
    def turn(self):
        """Get the turn of the agent."""
        return self.__turn

    @turn.setter
    def turn(self, turn: float):
        """Set the turn of the agent.

        Args:
            turn (float): The turn of the agent in radians.
        """
        self.__turn = np.minimum(np.maximum(turn, -self._max_turn), self._max_turn)

    @property
    def speed(self):
        """Get the speed of the agent."""
        return self.__speed

    @speed.setter
    def speed(self, speed: float):
        """Set the speed of the agent.

        Args:
            speed (float): The speed of the agent.
        """
        self.__speed = np.minimum(np.maximum(speed, -self._max_speed), self._max_speed)

    def step(self, time_step: float) -> None:
        """Perform a step of the agent.

        This method is called by the environment.

        Args:
            time_step (float): The time step in seconds. (unused)
        """
        if self.turn:
            self._body.angle += self.turn
            self.__turn = None
        if self.speed is not None:
            # wall collisions may cause angular velocity, set back to 0
            self.set_angular_velocity(0.0)
            self.set_linear_velocity([self.speed, 0.0], local=True)
            self.__speed = None

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._guppy.TurnSpeedGuppy
  • robofish.gym_guppy.guppies._robot.TurnSpeedRobot

Instance variables

var speed

Get the speed of the agent.

Expand source code
@property
def speed(self):
    """Get the speed of the agent."""
    return self.__speed
var turn

Get the turn of the agent.

Expand source code
@property
def turn(self):
    """Get the turn of the agent."""
    return self.__turn

Methods

def step(self, time_step: float) ‑> None

Perform a step of the agent.

This method is called by the environment.

Args

time_step : float
The time step in seconds. (unused)
Expand source code
def step(self, time_step: float) -> None:
    """Perform a step of the agent.

    This method is called by the environment.

    Args:
        time_step (float): The time step in seconds. (unused)
    """
    if self.turn:
        self._body.angle += self.turn
        self.__turn = None
    if self.speed is not None:
        # wall collisions may cause angular velocity, set back to 0
        self.set_angular_velocity(0.0)
        self.set_linear_velocity([self.speed, 0.0], local=True)
        self.__speed = None

Inherited members

class TurnSpeedGuppy (robot_poses=None, id=None, env=None, *args, **kwargs)

Simple guppy that is intended as a parent class for learned fish models.

What needs to be implemented is the method compute_next_action, in which self.turn and self.speed need to be set.

If you want to move the robot forward by a distance, you may calculate the velocity like this: v = d / t Where t = 1 / guppy_actions_per_second.

Initialize guppy.

Expand source code
class TurnSpeedGuppy(Guppy, TurnSpeedAgent, abc.ABC):
    """Simple guppy that is intended as a parent class for learned fish models.

    What needs to be implemented is the method `compute_next_action`,
    in which self.turn and self.speed need to be set.

    If you want to move the robot forward by a distance, you may
    calculate the velocity like this:
        v = d / t
    Where t = 1 / guppy_actions_per_second.
    """

    _linear_damping = 0.0
    _angular_damping = 0.0

Ancestors

  • robofish.gym_guppy.guppies._guppy.Guppy
  • robofish.gym_guppy.guppies._base_agents.TurnSpeedAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Inherited members

class TurnSpeedRobot (**kwargs: typing.Any)

Simple robot that turns by a given angle and moves forward with a given speed.

This robot does not use a PID controller, so it should not be used to train a policy that should be executed later on a physical robot. The intended use case is to train a policy that should later be used as a fish model, e.g. with SQIL. Use the guppy counterpart TurnSpeedGuppy after training the policy.

If you want to move the robot forward by a distance, you may calculate the velocity like this: v = d / t Where t = 1 / robot_actions_per_second.

This robot is not intended to be used with VariableStepGuppyEnv.

Initialize the robot.

Expand source code
class TurnSpeedRobot(Robot, TurnSpeedAgent):
    """Simple robot that turns by a given angle and moves forward with a given speed.

    This robot does not use a PID controller, so it should not be used to train
    a policy that should be executed later on a physical robot.
    The intended use case is to train a policy that should later be used as a
    fish model, e.g. with SQIL.
    Use the guppy counterpart TurnSpeedGuppy after training the policy.

    If you want to move the robot forward by a distance, you may calculate the
    velocity like this:
        v = d / t
    Where t = 1 / robot_actions_per_second.

    This robot is not intended to be used with VariableStepGuppyEnv.
    """

    def action_completed(self) -> bool:
        """Check if the current action is completed.

        Always true for this robot.
        """
        return True

    @property
    def action_space(self) -> gym.spaces.Box:
        """Get the action space of the robot.

        The action space is a box with the following bounds:
            [-max_turn, max_turn] for the turn angle,
            [0.0, max_speed] for the speed.

        Returns:
            The action space.
        """
        return gym.spaces.Box(
            low=np.float32([-self._max_turn, 0.0]),
            high=np.float32([self._max_turn, self._max_speed]),
        )

    def set_action(self, action: np.ndarray) -> None:
        """Set a new action for the robot to execute.

        The action must be two-dimensional and have the following format: [turn, speed].

        Args:
            action: The action to execute.
        """
        self.turn = action[0]
        self.speed = action[1]

Ancestors

  • robofish.gym_guppy.guppies._robot.Robot
  • robofish.gym_guppy.guppies._base_agents.TurnSpeedAgent
  • robofish.gym_guppy.guppies._base_agents.Agent
  • FishBody
  • Polygon
  • Body
  • abc.ABC

Instance variables

var action_space : gym.spaces.box.Box

Get the action space of the robot.

The action space is a box with the following bounds: [-max_turn, max_turn] for the turn angle, [0.0, max_speed] for the speed.

Returns

The action space.

Expand source code
@property
def action_space(self) -> gym.spaces.Box:
    """Get the action space of the robot.

    The action space is a box with the following bounds:
        [-max_turn, max_turn] for the turn angle,
        [0.0, max_speed] for the speed.

    Returns:
        The action space.
    """
    return gym.spaces.Box(
        low=np.float32([-self._max_turn, 0.0]),
        high=np.float32([self._max_turn, self._max_speed]),
    )

Methods

def action_completed(self) ‑> bool

Check if the current action is completed.

Always true for this robot.

Expand source code
def action_completed(self) -> bool:
    """Check if the current action is completed.

    Always true for this robot.
    """
    return True
def set_action(self, action: numpy.ndarray) ‑> None

Set a new action for the robot to execute.

The action must be two-dimensional and have the following format: [turn, speed].

Args

action
The action to execute.
Expand source code
def set_action(self, action: np.ndarray) -> None:
    """Set a new action for the robot to execute.

    The action must be two-dimensional and have the following format: [turn, speed].

    Args:
        action: The action to execute.
    """
    self.turn = action[0]
    self.speed = action[1]

Inherited members

class VelocityControlledAgent (ctrl_params=None, **kwargs)

Base class for agents that can be controlled by velocity.

This agent is used by the robot that include a simulation of the PID controllers.

Initialize the agent.

Expand source code
class VelocityControlledAgent(Agent, abc.ABC):
    """Base class for agents that can be controlled by velocity.

    This agent is used by the robot that include a simulation of the PID controllers.
    """

    def __init__(self, ctrl_params=None, **kwargs):
        """Initialize the agent."""
        super().__init__(**kwargs)

        if ctrl_params is None:
            ctrl_params = {}

        self._two_wheels_controller = TwoWheelsController(**ctrl_params)
        self._target = None
        self._counter = 0

        # TODO: add explanation for magic numbers
        self._tau = 4
        self._vel_buffer = deque([(0.0, 0.0)] * self._tau, maxlen=self._tau)
        self._ctrl_buffer = deque([(0.0, 0.0)] * self._tau, maxlen=self._tau)
        self._pose_tau = 1
        self._pose_buffer = deque(
            [(0.0, 0.0, 0.0)] * self._pose_tau, maxlen=self._pose_tau
        )
        self._target_buffer = deque(
            [np.array((0.0, 0.0))] * self._pose_tau, maxlen=self._pose_tau
        )

    @property
    def target(self) -> Optional[np.ndarray]:
        """Get the current target of the agent."""
        return self._target

    @property
    def two_wheels_controller(self) -> TwoWheelsController:
        """Get the two wheels controller of the agent."""
        return self._two_wheels_controller

    def step(self, time_step: float) -> None:
        """Perform a step of the agent.

        This method is called by the environment.

        Args:
            time_step (float): The time step in seconds. (unused)
        """
        # run controller with 25Hz (simulation with 100Hz)
        if self._counter % 4 == 0:
            self._pose_buffer.append(self.get_pose())
            self._target_buffer.append(self._target)
            motor_speeds = self._two_wheels_controller.speeds(
                self._pose_buffer[0], self._target_buffer[0]
            )
            pwm_commands = motor_speeds.vel_to_pwm()
            self._ctrl_buffer.append(pwm_commands.pwm_to_vel().get_local_velocities())

            v_x_tau, v_r_tau = self._vel_buffer[0]
            c_x_tau, c_r_tau = self._ctrl_buffer[0]
            # x_vel, r_vel = np.mean(np.asarray(self._vel_buffer)[0, :], axis=0)
            # x_vel, r_vel = self._vel_buffer.popleft()
            x_vel = 0.975 * self._vel_buffer[-1][0] + 0.2 * (c_x_tau - v_x_tau)
            r_vel = 0.96 * self._vel_buffer[-1][1] + 0.2 * (c_r_tau - v_r_tau)

            self._vel_buffer.append((x_vel, r_vel))

            self.set_angular_velocity(r_vel)
            self.set_linear_velocity([x_vel, 0.0], local=True)
        self._counter += 1

Ancestors

Subclasses

  • robofish.gym_guppy.guppies._robot.GlobalTargetRobot
  • robofish.gym_guppy.guppies._robot.GoToRobot

Instance variables

var target : Optional[numpy.ndarray]

Get the current target of the agent.

Expand source code
@property
def target(self) -> Optional[np.ndarray]:
    """Get the current target of the agent."""
    return self._target
var two_wheels_controllerTwoWheelsController

Get the two wheels controller of the agent.

Expand source code
@property
def two_wheels_controller(self) -> TwoWheelsController:
    """Get the two wheels controller of the agent."""
    return self._two_wheels_controller

Methods

def step(self, time_step: float) ‑> None

Perform a step of the agent.

This method is called by the environment.

Args

time_step : float
The time step in seconds. (unused)
Expand source code
def step(self, time_step: float) -> None:
    """Perform a step of the agent.

    This method is called by the environment.

    Args:
        time_step (float): The time step in seconds. (unused)
    """
    # run controller with 25Hz (simulation with 100Hz)
    if self._counter % 4 == 0:
        self._pose_buffer.append(self.get_pose())
        self._target_buffer.append(self._target)
        motor_speeds = self._two_wheels_controller.speeds(
            self._pose_buffer[0], self._target_buffer[0]
        )
        pwm_commands = motor_speeds.vel_to_pwm()
        self._ctrl_buffer.append(pwm_commands.pwm_to_vel().get_local_velocities())

        v_x_tau, v_r_tau = self._vel_buffer[0]
        c_x_tau, c_r_tau = self._ctrl_buffer[0]
        # x_vel, r_vel = np.mean(np.asarray(self._vel_buffer)[0, :], axis=0)
        # x_vel, r_vel = self._vel_buffer.popleft()
        x_vel = 0.975 * self._vel_buffer[-1][0] + 0.2 * (c_x_tau - v_x_tau)
        r_vel = 0.96 * self._vel_buffer[-1][1] + 0.2 * (c_r_tau - v_r_tau)

        self._vel_buffer.append((x_vel, r_vel))

        self.set_angular_velocity(r_vel)
        self.set_linear_velocity([x_vel, 0.0], local=True)
    self._counter += 1

Inherited members