17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 | class SubdividedVisionCones(Observable):
"""
The vision cone acts like a camera for the particles. It can either output all
colloids within its view, a function of the distances between one colloid and all
other colloids, or the normalised distance to the source.
"""
def __init__(
self,
vision_range: float,
vision_half_angle: float,
n_cones: int,
radii: List[float],
detected_types=None,
particle_type: int = 0,
):
"""
Constructor for the observable.
Parameters
----------
vision_range : float
How far the particles can see.
vision_half_angle : float
the half width of the field of view in radiant units
n_cones: int
In how many cone is the field of view subdivided
radii: list
List of the radii of the colloids in the experiment,
ordered by the ids of the colloids
detected_types: list
list of colloid types to be detected in requested order.
For example [0,2] here colloids of type 1 won't be detected.
if None then all will be detected.
particle_type : int (default=0)
Particle type to compute the observable for.
"""
super().__init__(particle_type=particle_type)
self.vision_range = vision_range
self.vision_half_angle = vision_half_angle
self.n_cones = n_cones
self.radii = radii
self.detected_types = detected_types
self.angle_fn = jit(calc_signed_angle_between_directors)
def _detect_all_things_to_see(self, colloids: List[Colloid]):
"""
Determines what types of colloids are present in the simulation
Parameters
----------
colloids : object
Colloids with all possible different types.
Returns
-------
One dimensional np.array with all possible types
in the corresponding index in which
they will also later be found when the observable is calculated.
"""
all_types = []
for c in colloids:
if c.type not in all_types:
all_types.append(c.type)
self.detected_types = np.array(np.sort(all_types))
@partial(jit, static_argnums=(0,))
def _calculate_cones_single_object(
self,
my_pos: np.ndarray,
my_director: np.ndarray,
c_type: int,
c_pos: np.ndarray,
radius: float,
):
"""
Compute the vision cones of one colloid from one colloid.
Parameters
----------
my_pos : np.ndarray
the 2D position of the colloid that has vision
my_director : np.ndarray
the 2D orientation of the colloid that has vision
c_type : int
The type of the colloid that is seen
c_pos : np.ndarray
The position of the colloid that is seen
radius : float
The radius of the colloid that has vision
Returns
-------
np.ndarray of shape (n_cones, num_of_detected_types) containing
the vision values for each cone and for each particle type that
can be visible. At most one value is unequal to zero.
"""
# generate a blue print of the output values
vision_val_out = jnp.ones((self.n_cones, len(self.detected_types)))
dist = c_pos - my_pos
dist_norm = jnp.linalg.norm(dist)
in_range = dist_norm < self.vision_range
# check if output values will correspond with
# no visible colloid at all because to far away
vision_val_out *= in_range
# calculation could stop here but is carried
# on with zeros due to parallelization
# keep on calculating even if there are only zeros
# adjust the amplitude of the vision to the correct value
vision_val_out *= jnp.min(jnp.array([1, 2 * radius / dist_norm]))
# generate a mask that is only true for the 2D array entries with the right type
type_mask = (
jnp.ones((self.n_cones, len(self.detected_types)))
* np.array(self.detected_types)[np.newaxis, :]
)
correct_type_mask = jnp.where(type_mask == c_type, True, False)
# Apply the mask on the output values
vision_val_out *= correct_type_mask
# compare to the direction of view with
# the direction in which the other colloid is.
# Get the singed angle between them
# call the jax.jit version of calc_signed_angle_between_directors()
angle = self.angle_fn(my_director, dist / dist_norm)
# get masks with True if the colloid is in the specific vision cone
rims = (
-self.vision_half_angle
+ jnp.arange(self.n_cones + 1) * self.vision_half_angle * 2 / self.n_cones
)
in_left_rim = jnp.where(rims[:-1] < angle, True, False)
in_right_rim = jnp.where(rims[1:] > angle, True, False)
in_a_cone = in_left_rim * in_right_rim
# apply mask to the output_values
vision_val_out *= in_a_cone[:, np.newaxis]
# 2D array with right vision amplitude for
# right type and cone only if distance < cut off radius
return vision_val_out
def _calculate_cones(self, my_pos, my_director, other_colloids: List[Colloid]):
"""
Calculates the vision cones of the colloid.
Parameters
----------
my_pos : np.ndarray
Position of the colloid.
my_director : np.ndarray
Normalised director of the colloid.
other_colloids : List[Colloid]
all the colloids besides the one with my_pos and my_director
Returns
-------
np.array of shape (n_cones, num_of_types) containing the vision values
for each cone and for each particle type that can be visible.
"""
# prepare traceable colloid data for vectorized vision cone evaluation
other_colloids_id = np.zeros((len(other_colloids)))
other_colloids_pos = np.zeros((len(other_colloids), 3))
other_colloids_types = np.zeros((len(other_colloids)))
for index, c in enumerate(other_colloids):
other_colloids_id[index] = c.id
other_colloids_types[index] = c.type
other_colloids_pos[index, :] = c.pos
# wrap jax.vmap around the vision cone
# evaluation concerning a single other colloid
# therefore spare out a for loop over all colloid to be seen
# make it parallelizable at the cost of calculating with sparse arrays
calculate_cones = vmap(
self._calculate_cones_single_object,
in_axes=(None, None, 0, 0, 0),
out_axes=0,
)
# executing the vectorized function
vision_val_out_expanded = calculate_cones(
my_pos,
my_director,
other_colloids_types,
other_colloids_pos,
np.array(self.radii),
)
# collapsing the data of every individual other_colloid and returning the result
return np.sum(vision_val_out_expanded, axis=0)
def compute_single_observable(
self, index: int, colloids: List[Colloid]
) -> np.ndarray:
"""
Compute the vision cones of the colloid.
Parameters
----------
index : int
Index of colloid for which the observable should be computed.
colloids
colloids in the system.
Returns
-------
np.array of shape (n_cones, num_of_detected_types) containing the vision values
for each cone and for each particle type that can be visible.
"""
colloid = colloids[index]
if self.detected_types is None:
self._detect_all_things_to_see(colloids)
my_pos, my_director = colloid.pos, colloid.director
of_others = [
[c, self.radii[i]] for i, c in enumerate(colloids) if c is not index
]
other_colloids = [of_others[i][0] for i in range(len(of_others))]
self.radii = [of_others[i][1] for i in range(len(of_others))]
observable = self._calculate_cones(my_pos, my_director, other_colloids)
return observable
def compute_observable(self, colloids: List[Colloid]):
"""
Compute the vision cones of the colloids.
Parameters
----------
colloids
colloids in the system.
Returns
-------
np.array of shape (n_colloids, n_cones, num_of_detected_types)
containing the vision values
"""
reference_ids = self.get_colloid_indices(colloids)
return [
self.compute_single_observable(index, colloids) for index in reference_ids
]
|