Skip to content

swarmrl.tasks.searching.species_search Module API Reference

Class for the species search task.

SpeciesSearch

Bases: Task

Class for the species search task.

Source code in swarmrl/tasks/searching/species_search.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class SpeciesSearch(Task):
    """
    Class for the species search task.
    """

    def __init__(
        self,
        decay_fn: callable,
        box_length: np.ndarray,
        sensing_type: int = 0,
        avoid: bool = False,
        scale_factor: int = 100,
        particle_type: int = 0,
    ):
        """
        Constructor for the observable.

        Parameters
        ----------
        decay_fn : callable
                Decay function of the field.
        box_size : np.ndarray
                Array for scaling of the distances.
        sensing_type : int (default=0)
                Type of particle to sense.
        scale_factor : int (default=100)
                Scaling factor for the observable.
        avoid : bool (default=False)
                Whether to avoid or move to the sensing type.
        particle_type : int (default=0)
                Particle type to compute the observable for.
        """
        super().__init__(particle_type=particle_type)

        self.decay_fn = decay_fn
        self.box_length = box_length
        self.sensing_type = sensing_type
        self.scale_factor = scale_factor
        self.avoid = avoid

        self.historical_field = {}

        self.task_fn = jax.vmap(
            self.compute_single_particle_task, in_axes=(0, 0, None, 0)
        )

    def initialize(self, colloids: List[Colloid]):
        """
        Initialize the observable with starting positions of the colloids.

        Parameters
        ----------
        colloids : List[Colloid]
                List of colloids with which to initialize the observable.

        Returns
        -------
        Updates the class state.
        """
        reference_ids = self.get_colloid_indices(colloids)
        historic_values = np.zeros(len(reference_ids))

        positions = []
        indices = []
        for index in reference_ids:
            indices.append(colloids[index].id)
            positions.append(colloids[index].pos)

        test_points = np.array(
            [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
        )

        out_indices, _, field_values = self.task_fn(
            np.array(indices), np.array(positions), test_points, historic_values
        )

        for index, value in zip(out_indices, onp.array(field_values)):
            self.historical_field[str(index)] = value

    def compute_single_particle_task(
        self,
        index: int,
        reference_position: np.ndarray,
        test_positions: np.ndarray,
        historic_value: float,
    ) -> tuple:
        """
        Compute the task for a single colloid.

        Parameters
        ----------
        index : int
                Index of the colloid to compute the observable for.
        reference_position : np.ndarray (3,)
                Position of the reference colloid.
        test_positions : np.ndarray (n_colloids, 3)
                Positions of the test colloids.
        historic_value : float
                Historic value of the observable.

        Returns
        -------
        tuple (index, task_value)
        index : int
                Index of the colloid to compute the observable for.
        task_value : float
                Value of the task.
        """
        distances = np.linalg.norm(
            (test_positions - reference_position) / self.box_length, axis=-1
        )
        indices = np.asarray(np.nonzero(distances, size=distances.shape[0] - 1))
        distances = np.take(distances, indices, axis=0)
        field_value = self.decay_fn(distances).sum()

        return index, field_value - historic_value, field_value

    def __call__(self, colloids: List[Colloid]):
        """
        Compute the reward on the colloids.

        Parameters
        ----------
        colloids : List[Colloid] (n_colloids, )
                List of all colloids in the system.

        Returns
        -------
        rewards : List[float] (n_colloids, dimension)
                List of rewards, one for each colloid.
        """
        if self.historical_field == {}:
            msg = (
                f"{type(self).__name__} requires initialization. Please set the "
                "initialize attribute of the gym to true and try again."
            )
            raise ValueError(msg)

        reference_ids = self.get_colloid_indices(colloids)
        positions = []
        indices = []
        historic_values = []
        for index in reference_ids:
            indices.append(colloids[index].id)
            positions.append(colloids[index].pos)
            historic_values.append(self.historical_field[str(colloids[index].id)])

        test_points = np.array(
            [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
        )

        out_indices, delta_values, field_values = self.task_fn(
            np.array(indices),
            np.array(positions),
            test_points,
            np.array(historic_values),
        )

        for index, value in zip(out_indices, onp.array(field_values)):
            self.historical_field[str(index)] = value

        if self.avoid:
            rewards = np.clip(delta_values, None, 0)
        else:
            rewards = np.clip(delta_values, 0, None)

        return self.scale_factor * rewards

__call__(colloids)

Compute the reward on the colloids.

Parameters

colloids : List[Colloid] (n_colloids, ) List of all colloids in the system.

Returns

rewards : List[float] (n_colloids, dimension) List of rewards, one for each colloid.

Source code in swarmrl/tasks/searching/species_search.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def __call__(self, colloids: List[Colloid]):
    """
    Compute the reward on the colloids.

    Parameters
    ----------
    colloids : List[Colloid] (n_colloids, )
            List of all colloids in the system.

    Returns
    -------
    rewards : List[float] (n_colloids, dimension)
            List of rewards, one for each colloid.
    """
    if self.historical_field == {}:
        msg = (
            f"{type(self).__name__} requires initialization. Please set the "
            "initialize attribute of the gym to true and try again."
        )
        raise ValueError(msg)

    reference_ids = self.get_colloid_indices(colloids)
    positions = []
    indices = []
    historic_values = []
    for index in reference_ids:
        indices.append(colloids[index].id)
        positions.append(colloids[index].pos)
        historic_values.append(self.historical_field[str(colloids[index].id)])

    test_points = np.array(
        [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
    )

    out_indices, delta_values, field_values = self.task_fn(
        np.array(indices),
        np.array(positions),
        test_points,
        np.array(historic_values),
    )

    for index, value in zip(out_indices, onp.array(field_values)):
        self.historical_field[str(index)] = value

    if self.avoid:
        rewards = np.clip(delta_values, None, 0)
    else:
        rewards = np.clip(delta_values, 0, None)

    return self.scale_factor * rewards

__init__(decay_fn, box_length, sensing_type=0, avoid=False, scale_factor=100, particle_type=0)

Constructor for the observable.

Parameters

decay_fn : callable Decay function of the field. box_size : np.ndarray Array for scaling of the distances. sensing_type : int (default=0) Type of particle to sense. scale_factor : int (default=100) Scaling factor for the observable. avoid : bool (default=False) Whether to avoid or move to the sensing type. particle_type : int (default=0) Particle type to compute the observable for.

Source code in swarmrl/tasks/searching/species_search.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    decay_fn: callable,
    box_length: np.ndarray,
    sensing_type: int = 0,
    avoid: bool = False,
    scale_factor: int = 100,
    particle_type: int = 0,
):
    """
    Constructor for the observable.

    Parameters
    ----------
    decay_fn : callable
            Decay function of the field.
    box_size : np.ndarray
            Array for scaling of the distances.
    sensing_type : int (default=0)
            Type of particle to sense.
    scale_factor : int (default=100)
            Scaling factor for the observable.
    avoid : bool (default=False)
            Whether to avoid or move to the sensing type.
    particle_type : int (default=0)
            Particle type to compute the observable for.
    """
    super().__init__(particle_type=particle_type)

    self.decay_fn = decay_fn
    self.box_length = box_length
    self.sensing_type = sensing_type
    self.scale_factor = scale_factor
    self.avoid = avoid

    self.historical_field = {}

    self.task_fn = jax.vmap(
        self.compute_single_particle_task, in_axes=(0, 0, None, 0)
    )

compute_single_particle_task(index, reference_position, test_positions, historic_value)

Compute the task for a single colloid.

Parameters

index : int Index of the colloid to compute the observable for. reference_position : np.ndarray (3,) Position of the reference colloid. test_positions : np.ndarray (n_colloids, 3) Positions of the test colloids. historic_value : float Historic value of the observable.

Returns

tuple (index, task_value) index : int Index of the colloid to compute the observable for. task_value : float Value of the task.

Source code in swarmrl/tasks/searching/species_search.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def compute_single_particle_task(
    self,
    index: int,
    reference_position: np.ndarray,
    test_positions: np.ndarray,
    historic_value: float,
) -> tuple:
    """
    Compute the task for a single colloid.

    Parameters
    ----------
    index : int
            Index of the colloid to compute the observable for.
    reference_position : np.ndarray (3,)
            Position of the reference colloid.
    test_positions : np.ndarray (n_colloids, 3)
            Positions of the test colloids.
    historic_value : float
            Historic value of the observable.

    Returns
    -------
    tuple (index, task_value)
    index : int
            Index of the colloid to compute the observable for.
    task_value : float
            Value of the task.
    """
    distances = np.linalg.norm(
        (test_positions - reference_position) / self.box_length, axis=-1
    )
    indices = np.asarray(np.nonzero(distances, size=distances.shape[0] - 1))
    distances = np.take(distances, indices, axis=0)
    field_value = self.decay_fn(distances).sum()

    return index, field_value - historic_value, field_value

initialize(colloids)

Initialize the observable with starting positions of the colloids.

Parameters

colloids : List[Colloid] List of colloids with which to initialize the observable.

Returns

Updates the class state.

Source code in swarmrl/tasks/searching/species_search.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def initialize(self, colloids: List[Colloid]):
    """
    Initialize the observable with starting positions of the colloids.

    Parameters
    ----------
    colloids : List[Colloid]
            List of colloids with which to initialize the observable.

    Returns
    -------
    Updates the class state.
    """
    reference_ids = self.get_colloid_indices(colloids)
    historic_values = np.zeros(len(reference_ids))

    positions = []
    indices = []
    for index in reference_ids:
        indices.append(colloids[index].id)
        positions.append(colloids[index].pos)

    test_points = np.array(
        [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
    )

    out_indices, _, field_values = self.task_fn(
        np.array(indices), np.array(positions), test_points, historic_values
    )

    for index, value in zip(out_indices, onp.array(field_values)):
        self.historical_field[str(index)] = value