Skip to content

swarmrl.observables.particle_sensing Module API Reference

Observable for particle sensing.

ParticleSensing

Bases: Observable

Class for particle sensing.

Source code in swarmrl/observables/particle_sensing.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class ParticleSensing(Observable):
    """
    Class for particle sensing.
    """

    def __init__(
        self,
        decay_fn: callable,
        box_length: np.ndarray,
        sensing_type: int = 0,
        scale_factor: int = 100,
        particle_type: int = 0,
    ):
        """
        Constructor for the observable.

        Parameters
        ----------
        decay_fn : callable
                Decay function of the field.
        box_size : np.ndarray
                Array for scaling of the distances.
        sensing_type : int (default=0)
                Type of particle to sense.
        scale_factor : int (default=100)
                Scaling factor for the observable.
        particle_type : int (default=0)
                Particle type to compute the observable for.
        """
        super().__init__(particle_type=particle_type)

        self.decay_fn = decay_fn
        self.box_length = box_length
        self.sensing_type = sensing_type
        self.scale_factor = scale_factor

        self.historical_field = {}

        self.observable_fn = jax.vmap(
            self.compute_single_observable,
            in_axes=(0, 0, None, 0),
            # out_axes=()
        )

    def initialize(self, colloids: List[Colloid]):
        """
        Initialize the observable with starting positions of the colloids.

        Parameters
        ----------
        colloids : List[Colloid]
                List of colloids with which to initialize the observable.

        Returns
        -------
        Updates the class state.
        """
        reference_ids = self.get_colloid_indices(colloids)
        historic_values = np.zeros(len(reference_ids))

        positions = []
        indices = []
        for index in reference_ids:
            indices.append(colloids[index].id)
            positions.append(colloids[index].pos)

        sensed_colloids = np.array(
            [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
        )

        out_indices, _, field_values = self.observable_fn(
            np.array(indices), np.array(positions), sensed_colloids, historic_values
        )

        for index, value in zip(out_indices, onp.array(field_values)):
            self.historical_field[str(index)] = value

    def compute_single_observable(
        self,
        index: int,
        reference_position: np.ndarray,
        test_positions: np.ndarray,
        historic_value: float,
    ) -> tuple:
        """
        Compute the observable for a single colloid.

        Parameters
        ----------
        index : int
                Index of the colloid to compute the observable for.
        reference_position : np.ndarray (3,)
                Position of the reference colloid.
        test_positions : np.ndarray (n_colloids, 3)
                Positions of the test colloids.
        historic_value : float
                Historic value of the observable.

        Returns
        -------
        tuple (index, observable_value)
        index : int
                Index of the colloid to compute the observable for.
        observable_value : float
                Value of the observable.
        """
        distances = np.linalg.norm(
            (test_positions - reference_position) / self.box_length, axis=-1
        )
        indices = np.asarray(np.nonzero(distances, size=distances.shape[0] - 1))
        distances = np.take(distances, indices, axis=0)
        # Compute field value
        field_value = self.decay_fn(distances).sum()
        return index, field_value - historic_value, field_value

    def compute_observable(self, colloids: List[Colloid]):
        """
        Compute the position of the colloid.

        Parameters
        ----------
        colloids : List[Colloid] (n_colloids, )
                List of all colloids in the system.

        Returns
        -------
        observables : List[float] (n_colloids, dimension)
                List of observables, one for each colloid. In this case,
                current field value minus to previous field value.
        """
        if self.historical_field == {}:
            msg = (
                f"{type(self).__name__} requires initialization. Please set the "
                "initialize attribute of the gym to true and try again."
            )
            raise ValueError(msg)

        reference_ids = self.get_colloid_indices(colloids)
        positions = []
        indices = []
        historic_values = []
        for index in reference_ids:
            indices.append(colloids[index].id)
            positions.append(colloids[index].pos)
            historic_values.append(self.historical_field[str(colloids[index].id)])

        test_points = np.array(
            [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
        )

        out_indices, delta_values, field_values = self.observable_fn(
            np.array(indices),
            np.array(positions),
            test_points,
            np.array(historic_values),
        )

        for index, value in zip(out_indices, onp.array(field_values)):
            self.historical_field[str(index)] = value

        return self.scale_factor * delta_values.reshape(-1, 1)

__init__(decay_fn, box_length, sensing_type=0, scale_factor=100, particle_type=0)

Constructor for the observable.

Parameters

decay_fn : callable Decay function of the field. box_size : np.ndarray Array for scaling of the distances. sensing_type : int (default=0) Type of particle to sense. scale_factor : int (default=100) Scaling factor for the observable. particle_type : int (default=0) Particle type to compute the observable for.

Source code in swarmrl/observables/particle_sensing.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    decay_fn: callable,
    box_length: np.ndarray,
    sensing_type: int = 0,
    scale_factor: int = 100,
    particle_type: int = 0,
):
    """
    Constructor for the observable.

    Parameters
    ----------
    decay_fn : callable
            Decay function of the field.
    box_size : np.ndarray
            Array for scaling of the distances.
    sensing_type : int (default=0)
            Type of particle to sense.
    scale_factor : int (default=100)
            Scaling factor for the observable.
    particle_type : int (default=0)
            Particle type to compute the observable for.
    """
    super().__init__(particle_type=particle_type)

    self.decay_fn = decay_fn
    self.box_length = box_length
    self.sensing_type = sensing_type
    self.scale_factor = scale_factor

    self.historical_field = {}

    self.observable_fn = jax.vmap(
        self.compute_single_observable,
        in_axes=(0, 0, None, 0),
        # out_axes=()
    )

compute_observable(colloids)

Compute the position of the colloid.

Parameters

colloids : List[Colloid] (n_colloids, ) List of all colloids in the system.

Returns

observables : List[float] (n_colloids, dimension) List of observables, one for each colloid. In this case, current field value minus to previous field value.

Source code in swarmrl/observables/particle_sensing.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def compute_observable(self, colloids: List[Colloid]):
    """
    Compute the position of the colloid.

    Parameters
    ----------
    colloids : List[Colloid] (n_colloids, )
            List of all colloids in the system.

    Returns
    -------
    observables : List[float] (n_colloids, dimension)
            List of observables, one for each colloid. In this case,
            current field value minus to previous field value.
    """
    if self.historical_field == {}:
        msg = (
            f"{type(self).__name__} requires initialization. Please set the "
            "initialize attribute of the gym to true and try again."
        )
        raise ValueError(msg)

    reference_ids = self.get_colloid_indices(colloids)
    positions = []
    indices = []
    historic_values = []
    for index in reference_ids:
        indices.append(colloids[index].id)
        positions.append(colloids[index].pos)
        historic_values.append(self.historical_field[str(colloids[index].id)])

    test_points = np.array(
        [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
    )

    out_indices, delta_values, field_values = self.observable_fn(
        np.array(indices),
        np.array(positions),
        test_points,
        np.array(historic_values),
    )

    for index, value in zip(out_indices, onp.array(field_values)):
        self.historical_field[str(index)] = value

    return self.scale_factor * delta_values.reshape(-1, 1)

compute_single_observable(index, reference_position, test_positions, historic_value)

Compute the observable for a single colloid.

Parameters

index : int Index of the colloid to compute the observable for. reference_position : np.ndarray (3,) Position of the reference colloid. test_positions : np.ndarray (n_colloids, 3) Positions of the test colloids. historic_value : float Historic value of the observable.

Returns

tuple (index, observable_value) index : int Index of the colloid to compute the observable for. observable_value : float Value of the observable.

Source code in swarmrl/observables/particle_sensing.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def compute_single_observable(
    self,
    index: int,
    reference_position: np.ndarray,
    test_positions: np.ndarray,
    historic_value: float,
) -> tuple:
    """
    Compute the observable for a single colloid.

    Parameters
    ----------
    index : int
            Index of the colloid to compute the observable for.
    reference_position : np.ndarray (3,)
            Position of the reference colloid.
    test_positions : np.ndarray (n_colloids, 3)
            Positions of the test colloids.
    historic_value : float
            Historic value of the observable.

    Returns
    -------
    tuple (index, observable_value)
    index : int
            Index of the colloid to compute the observable for.
    observable_value : float
            Value of the observable.
    """
    distances = np.linalg.norm(
        (test_positions - reference_position) / self.box_length, axis=-1
    )
    indices = np.asarray(np.nonzero(distances, size=distances.shape[0] - 1))
    distances = np.take(distances, indices, axis=0)
    # Compute field value
    field_value = self.decay_fn(distances).sum()
    return index, field_value - historic_value, field_value

initialize(colloids)

Initialize the observable with starting positions of the colloids.

Parameters

colloids : List[Colloid] List of colloids with which to initialize the observable.

Returns

Updates the class state.

Source code in swarmrl/observables/particle_sensing.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def initialize(self, colloids: List[Colloid]):
    """
    Initialize the observable with starting positions of the colloids.

    Parameters
    ----------
    colloids : List[Colloid]
            List of colloids with which to initialize the observable.

    Returns
    -------
    Updates the class state.
    """
    reference_ids = self.get_colloid_indices(colloids)
    historic_values = np.zeros(len(reference_ids))

    positions = []
    indices = []
    for index in reference_ids:
        indices.append(colloids[index].id)
        positions.append(colloids[index].pos)

    sensed_colloids = np.array(
        [colloid.pos for colloid in colloids if colloid.type == self.sensing_type]
    )

    out_indices, _, field_values = self.observable_fn(
        np.array(indices), np.array(positions), sensed_colloids, historic_values
    )

    for index, value in zip(out_indices, onp.array(field_values)):
        self.historical_field[str(index)] = value