Skip to content

swarmrl.tasks.object_movement.rod_rotation Module API Reference

Class for rod rotation task.

RotateRod

Bases: Task

Rotate a rod.

Source code in swarmrl/tasks/object_movement/rod_rotation.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class RotateRod(Task):
    """
    Rotate a rod.
    """

    def __init__(
        self,
        partition: bool = True,
        rod_type: int = 1,
        particle_type: int = 0,
        direction: str = "CCW",
        angular_velocity_scale: int = 1,
        velocity_history: int = 100,
    ):
        """
        Constructor for the find origin task.

        Parameters
        ----------
        partition : bool (default=True)
                Whether to partition the reward by particle contribution.
        rod_type : int (default=1)
                Type of particle making up the rod.
        scale_factor : float (default=100.0)
                The amount the velocity is scaled by to get the reward.
        direction : Union[None, str] (default=None)
                Direction of the rod to rotate. If None, the rod will
                rotate arbitrarily.
        particle_type : int (default=0)
                Type of particle receiving the reward.
        velocity_history : int (default=100)
                Number of steps to average the velocity over.
        """
        super().__init__(particle_type=particle_type)
        self.partition = partition
        self.rod_type = rod_type

        if direction == "CW":
            angular_velocity_scale *= -1  # CW is negative

        self.angular_velocity_scale = angular_velocity_scale
        self._velocity_history = np.nan * np.ones(velocity_history)
        self._append_index = int(velocity_history - 1)

        # Class only attributes
        self._historic_rod_director = None
        self._historic_velocity = 1.0

        self.decomp_fn = jax.jit(compute_torque_partition_on_rod)

    def initialize(self, colloids: List[Colloid]):
        """
        Prepare the task for running.

        In this case, as all rod directors are the same, we
        only need to take on for the historical value.

        Parameters
        ----------
        colloids : List[Colloid]
                List of colloids to be used in the task.

        Returns
        -------
        Updates the class state.
        """
        for item in colloids:
            if item.type == self.rod_type:
                self._historic_rod_director = onp.copy(item.director)
                break

    def _compute_angular_velocity(self, new_director: np.ndarray):
        """
        Compute the instantaneous angular velocity of the rod.

        Parameters
        ----------
        new_director : np.ndarray (3, )
                New rod director.

        Returns
        -------
        angular_velocity : float
                Angular velocity of the rod
        """
        angular_velocity = np.arctan2(
            np.cross(self._historic_rod_director[:2], new_director[:2]),
            np.dot(self._historic_rod_director[:2], new_director[:2]),
        )

        # Convert to degrees for better scaling.
        angular_velocity = np.rad2deg(angular_velocity)

        # Update the historical rod director and velocity.
        self._historic_rod_director = new_director
        self._velocity_history = np.roll(self._velocity_history, -1)
        self._velocity_history = self._velocity_history.at[self._append_index].set(
            angular_velocity
        )

        # Return the scaled average velocity.
        return self.angular_velocity_scale * np.nanmean(self._velocity_history)

    def partition_reward(
        self,
        reward: float,
        colloid_positions: np.ndarray,
        rod_positions: np.ndarray,
        rod_directors: np.ndarray,
    ) -> np.ndarray:
        """
        Partition a reward into colloid contributions.

        Parameters
        ----------
        reward : float
                Reward to be partitioned.
        colloid_positions : np.ndarray (n_colloids, 3)
                Positions of the colloids.
        rod_positions : np.ndarray (n_rod, 3)
                Positions of the rod particles.
        rod_directors : np.ndarray (n_rod, 3)
                Directors of the rod particles.

        Returns
        -------
        partitioned_reward : np.ndarray (n_colloids, )
                Partitioned reward for each colloid.
        """
        if self.partition:
            colloid_partitions = self.decomp_fn(
                colloid_positions, rod_positions, rod_directors
            )
        else:
            colloid_partitions = (
                np.ones(colloid_positions.shape[0]) / colloid_positions.shape[0]
            )

        return reward * colloid_partitions

    def _compute_angular_velocity_reward(
        self,
        rod_directors: np.ndarray,
        rod_positions: np.ndarray,
        colloid_positions: np.ndarray,
    ):
        """
        Compute the angular velocity reward.

        Parameters
        ----------
        rod_directors : np.ndarray (n_rod, 3)
                Directors of the rod particles.
        rod_positions : np.ndarray (n_rod, 3)
                Positions of the rod particles.
        colloid_positions : np.ndarray (n_colloids, 3)
                Positions of the colloids.

        Returns
        -------
        angular_velocity_reward : float
                Angular velocity reward.
        """
        # Compute angular velocity
        angular_velocity = self._compute_angular_velocity(rod_directors[0])
        # Compute colloid-wise rewards
        return self.partition_reward(
            angular_velocity, colloid_positions, rod_positions, rod_directors
        )

    def __call__(self, colloids: List[Colloid]):
        """
        Compute the reward.

        In this case of this task, the observable itself is the gradient of the field
        that the colloid is swimming in. Therefore, the change is simply scaled and
        returned.

        Parameters
        ----------
        colloids : List[Colloid] (n_colloids, )
                List of colloids to be used in the task.

        Returns
        -------
        rewards : List[float] (n_colloids, )
                Rewards for each colloid.
        """
        # Collect the important data.
        rod = [colloid for colloid in colloids if colloid.type == self.rod_type]
        rod_positions = np.array([colloid.pos for colloid in rod])
        rod_directors = np.array([colloid.director for colloid in rod])

        chosen_colloids = [
            colloid for colloid in colloids if colloid.type == self.particle_type
        ]
        colloid_positions = np.array([colloid.pos for colloid in chosen_colloids])

        # Compute the angular velocity reward
        angular_velocity_term = self._compute_angular_velocity_reward(
            rod_directors, rod_positions, colloid_positions
        )

        return angular_velocity_term

__call__(colloids)

Compute the reward.

In this case of this task, the observable itself is the gradient of the field that the colloid is swimming in. Therefore, the change is simply scaled and returned.

Parameters

colloids : List[Colloid] (n_colloids, ) List of colloids to be used in the task.

Returns

rewards : List[float] (n_colloids, ) Rewards for each colloid.

Source code in swarmrl/tasks/object_movement/rod_rotation.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def __call__(self, colloids: List[Colloid]):
    """
    Compute the reward.

    In this case of this task, the observable itself is the gradient of the field
    that the colloid is swimming in. Therefore, the change is simply scaled and
    returned.

    Parameters
    ----------
    colloids : List[Colloid] (n_colloids, )
            List of colloids to be used in the task.

    Returns
    -------
    rewards : List[float] (n_colloids, )
            Rewards for each colloid.
    """
    # Collect the important data.
    rod = [colloid for colloid in colloids if colloid.type == self.rod_type]
    rod_positions = np.array([colloid.pos for colloid in rod])
    rod_directors = np.array([colloid.director for colloid in rod])

    chosen_colloids = [
        colloid for colloid in colloids if colloid.type == self.particle_type
    ]
    colloid_positions = np.array([colloid.pos for colloid in chosen_colloids])

    # Compute the angular velocity reward
    angular_velocity_term = self._compute_angular_velocity_reward(
        rod_directors, rod_positions, colloid_positions
    )

    return angular_velocity_term

__init__(partition=True, rod_type=1, particle_type=0, direction='CCW', angular_velocity_scale=1, velocity_history=100)

Constructor for the find origin task.

Parameters

partition : bool (default=True) Whether to partition the reward by particle contribution. rod_type : int (default=1) Type of particle making up the rod. scale_factor : float (default=100.0) The amount the velocity is scaled by to get the reward. direction : Union[None, str] (default=None) Direction of the rod to rotate. If None, the rod will rotate arbitrarily. particle_type : int (default=0) Type of particle receiving the reward. velocity_history : int (default=100) Number of steps to average the velocity over.

Source code in swarmrl/tasks/object_movement/rod_rotation.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    partition: bool = True,
    rod_type: int = 1,
    particle_type: int = 0,
    direction: str = "CCW",
    angular_velocity_scale: int = 1,
    velocity_history: int = 100,
):
    """
    Constructor for the find origin task.

    Parameters
    ----------
    partition : bool (default=True)
            Whether to partition the reward by particle contribution.
    rod_type : int (default=1)
            Type of particle making up the rod.
    scale_factor : float (default=100.0)
            The amount the velocity is scaled by to get the reward.
    direction : Union[None, str] (default=None)
            Direction of the rod to rotate. If None, the rod will
            rotate arbitrarily.
    particle_type : int (default=0)
            Type of particle receiving the reward.
    velocity_history : int (default=100)
            Number of steps to average the velocity over.
    """
    super().__init__(particle_type=particle_type)
    self.partition = partition
    self.rod_type = rod_type

    if direction == "CW":
        angular_velocity_scale *= -1  # CW is negative

    self.angular_velocity_scale = angular_velocity_scale
    self._velocity_history = np.nan * np.ones(velocity_history)
    self._append_index = int(velocity_history - 1)

    # Class only attributes
    self._historic_rod_director = None
    self._historic_velocity = 1.0

    self.decomp_fn = jax.jit(compute_torque_partition_on_rod)

initialize(colloids)

Prepare the task for running.

In this case, as all rod directors are the same, we only need to take on for the historical value.

Parameters

colloids : List[Colloid] List of colloids to be used in the task.

Returns

Updates the class state.

Source code in swarmrl/tasks/object_movement/rod_rotation.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def initialize(self, colloids: List[Colloid]):
    """
    Prepare the task for running.

    In this case, as all rod directors are the same, we
    only need to take on for the historical value.

    Parameters
    ----------
    colloids : List[Colloid]
            List of colloids to be used in the task.

    Returns
    -------
    Updates the class state.
    """
    for item in colloids:
        if item.type == self.rod_type:
            self._historic_rod_director = onp.copy(item.director)
            break

partition_reward(reward, colloid_positions, rod_positions, rod_directors)

Partition a reward into colloid contributions.

Parameters

reward : float Reward to be partitioned. colloid_positions : np.ndarray (n_colloids, 3) Positions of the colloids. rod_positions : np.ndarray (n_rod, 3) Positions of the rod particles. rod_directors : np.ndarray (n_rod, 3) Directors of the rod particles.

Returns

partitioned_reward : np.ndarray (n_colloids, ) Partitioned reward for each colloid.

Source code in swarmrl/tasks/object_movement/rod_rotation.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def partition_reward(
    self,
    reward: float,
    colloid_positions: np.ndarray,
    rod_positions: np.ndarray,
    rod_directors: np.ndarray,
) -> np.ndarray:
    """
    Partition a reward into colloid contributions.

    Parameters
    ----------
    reward : float
            Reward to be partitioned.
    colloid_positions : np.ndarray (n_colloids, 3)
            Positions of the colloids.
    rod_positions : np.ndarray (n_rod, 3)
            Positions of the rod particles.
    rod_directors : np.ndarray (n_rod, 3)
            Directors of the rod particles.

    Returns
    -------
    partitioned_reward : np.ndarray (n_colloids, )
            Partitioned reward for each colloid.
    """
    if self.partition:
        colloid_partitions = self.decomp_fn(
            colloid_positions, rod_positions, rod_directors
        )
    else:
        colloid_partitions = (
            np.ones(colloid_positions.shape[0]) / colloid_positions.shape[0]
        )

    return reward * colloid_partitions