Skip to content

swarmrl.engine.real_experiment Module API Reference

Parent class for the engine.

ConnectionClosedError

Bases: Exception

Exception to capture when Matlab closes the connection

Source code in swarmrl/engine/real_experiment.py
15
16
17
18
19
20
class ConnectionClosedError(Exception):
    """
    Exception to capture when Matlab closes the connection
    """

    pass

RealExperiment

Bases: Engine

Class for the real experiment interface.

Source code in swarmrl/engine/real_experiment.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class RealExperiment(swarmrl.engine.engine.Engine):
    """
    Class for the real experiment interface.
    """

    def __init__(self, connection):
        """
        Constructor for the experiment.

        Parameters
        ----------
        connection : object
                Communication object to be used to talk to the experiment.
        """
        self.connection = connection

    def setup_simulation(self) -> None:
        """
        Method not required in the real experiment.
        """
        pass

    def receive_colloids(self) -> typing.List[Colloid]:
        """
        Collect the colloid data from the experiment.

        Returns
        -------
        colloids : list
                A list of colloids.
        """
        print("Waiting for receiving data_size")
        data_size = self.connection.recv(8)
        # break if connection closed
        if not data_size:
            print("Received connection closed signal")
            raise ConnectionClosedError

        data_size_int = struct.unpack("I", data_size)[0]
        print(f"Received data_size = {data_size_int}")
        print("Waiting to receive actual data")
        data = self.connection.recv(8 * data_size_int)
        while data and len(data) < 8 * data_size_int:
            data.extend(self.connection.recv(8 * data_size_int))

        # cast bytestream to double array and reshape to [x y theta id]
        data = np.array(struct.unpack(str(len(data) // 8) + "d", data)).reshape((-1, 4))
        print(f"Received data with shape {np.shape(data)} \n")
        colloids = []
        for row in data:
            coll = Colloid(
                pos=np.array([row[0], row[1], 0]),
                director=vector_from_angle(row[2]),
                id=row[3],
            )
            colloids.append(coll)

        return colloids

    def get_actions(
        self,
        colloids: typing.List[Colloid],
        force_model: ForceFunction,
    ) -> np.array:
        """
        Collect the actions on the particles.

        This method calls the interaction models and collects all of the actions on the
        colloids.

        Parameters
        ----------
        colloids : list
                List of colloids on which to compute interactions.
        force_model : swarmrl.models.interaction_model.InteractionModel
                Interaction model to use in the action computation.

        Returns
        -------
        ret : np.ndarray
                A numpy array of the actions on each colloid.
        """
        n_colloids = len(colloids)
        ret = np.zeros((n_colloids, 2))
        actions = force_model.calc_action(colloids)
        for idx, coll in enumerate(colloids):
            action = actions[idx]
            action.torque = (
                action.torque
                if action.torque is not None
                else np.zeros(
                    3,
                )
            )

            if not action.force == 0.0:
                action_id = experiment_actions["be_active"]
            else:
                action_id = experiment_actions["do_nothing"]

            if not np.all(action.torque == 0):
                if action.torque[2] > 0:
                    action_id = experiment_actions["rotate_anticlockwise"]
                else:
                    action_id = experiment_actions["rotate_clockwise"]

            ret[idx, 0] = coll.id
            ret[idx, 1] = action_id
        return ret

    def send_actions(self, actions: np.ndarray):
        """
        Send the actions to the experiment apparatus.

        Parameters
        ----------
        actions : np.ndarray
                A numpy array of actions to send to the experiment.

        Returns
        -------
        Sends all data to the experiment.
        """
        # Flatten data in 'Fortran' style
        data = actions.flatten("F")
        print(f"Sending data with shape {np.shape(data)} \n")

        data_bytes = struct.pack(str(len(data)) + "d", *data)
        self.connection.sendall(data_bytes)

    def integrate(
        self,
        n_slices: int,
        force_model: ForceFunction,
    ) -> None:
        """
        Perform the real-experiment equivalent of an integration step.

        Parameters
        ----------
        n_slices : int
                Number of integration steps to perform.
        force_model : swarmrl.models.interaction_model.InteractionModel
                Model to use in the interaction computations.

        Returns
        -------
        Sends actions to the experiment.

        Notes
        -----
        Can we always refer to real time as discreet? I like it.
        """
        for _ in range(n_slices):
            try:
                colloids = self.receive_colloids()
            except ConnectionClosedError:
                # force_model.finalize()
                self.connection.close()
                break

            actions = self.get_actions(colloids, force_model)
            self.send_actions(actions)

__init__(connection)

Constructor for the experiment.

Parameters

connection : object Communication object to be used to talk to the experiment.

Source code in swarmrl/engine/real_experiment.py
40
41
42
43
44
45
46
47
48
49
def __init__(self, connection):
    """
    Constructor for the experiment.

    Parameters
    ----------
    connection : object
            Communication object to be used to talk to the experiment.
    """
    self.connection = connection

get_actions(colloids, force_model)

Collect the actions on the particles.

This method calls the interaction models and collects all of the actions on the colloids.

Parameters

colloids : list List of colloids on which to compute interactions. force_model : swarmrl.models.interaction_model.InteractionModel Interaction model to use in the action computation.

Returns

ret : np.ndarray A numpy array of the actions on each colloid.

Source code in swarmrl/engine/real_experiment.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_actions(
    self,
    colloids: typing.List[Colloid],
    force_model: ForceFunction,
) -> np.array:
    """
    Collect the actions on the particles.

    This method calls the interaction models and collects all of the actions on the
    colloids.

    Parameters
    ----------
    colloids : list
            List of colloids on which to compute interactions.
    force_model : swarmrl.models.interaction_model.InteractionModel
            Interaction model to use in the action computation.

    Returns
    -------
    ret : np.ndarray
            A numpy array of the actions on each colloid.
    """
    n_colloids = len(colloids)
    ret = np.zeros((n_colloids, 2))
    actions = force_model.calc_action(colloids)
    for idx, coll in enumerate(colloids):
        action = actions[idx]
        action.torque = (
            action.torque
            if action.torque is not None
            else np.zeros(
                3,
            )
        )

        if not action.force == 0.0:
            action_id = experiment_actions["be_active"]
        else:
            action_id = experiment_actions["do_nothing"]

        if not np.all(action.torque == 0):
            if action.torque[2] > 0:
                action_id = experiment_actions["rotate_anticlockwise"]
            else:
                action_id = experiment_actions["rotate_clockwise"]

        ret[idx, 0] = coll.id
        ret[idx, 1] = action_id
    return ret

integrate(n_slices, force_model)

Perform the real-experiment equivalent of an integration step.

Parameters

n_slices : int Number of integration steps to perform. force_model : swarmrl.models.interaction_model.InteractionModel Model to use in the interaction computations.

Returns

Sends actions to the experiment.

Notes

Can we always refer to real time as discreet? I like it.

Source code in swarmrl/engine/real_experiment.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def integrate(
    self,
    n_slices: int,
    force_model: ForceFunction,
) -> None:
    """
    Perform the real-experiment equivalent of an integration step.

    Parameters
    ----------
    n_slices : int
            Number of integration steps to perform.
    force_model : swarmrl.models.interaction_model.InteractionModel
            Model to use in the interaction computations.

    Returns
    -------
    Sends actions to the experiment.

    Notes
    -----
    Can we always refer to real time as discreet? I like it.
    """
    for _ in range(n_slices):
        try:
            colloids = self.receive_colloids()
        except ConnectionClosedError:
            # force_model.finalize()
            self.connection.close()
            break

        actions = self.get_actions(colloids, force_model)
        self.send_actions(actions)

receive_colloids()

Collect the colloid data from the experiment.

Returns

colloids : list A list of colloids.

Source code in swarmrl/engine/real_experiment.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def receive_colloids(self) -> typing.List[Colloid]:
    """
    Collect the colloid data from the experiment.

    Returns
    -------
    colloids : list
            A list of colloids.
    """
    print("Waiting for receiving data_size")
    data_size = self.connection.recv(8)
    # break if connection closed
    if not data_size:
        print("Received connection closed signal")
        raise ConnectionClosedError

    data_size_int = struct.unpack("I", data_size)[0]
    print(f"Received data_size = {data_size_int}")
    print("Waiting to receive actual data")
    data = self.connection.recv(8 * data_size_int)
    while data and len(data) < 8 * data_size_int:
        data.extend(self.connection.recv(8 * data_size_int))

    # cast bytestream to double array and reshape to [x y theta id]
    data = np.array(struct.unpack(str(len(data) // 8) + "d", data)).reshape((-1, 4))
    print(f"Received data with shape {np.shape(data)} \n")
    colloids = []
    for row in data:
        coll = Colloid(
            pos=np.array([row[0], row[1], 0]),
            director=vector_from_angle(row[2]),
            id=row[3],
        )
        colloids.append(coll)

    return colloids

send_actions(actions)

Send the actions to the experiment apparatus.

Parameters

actions : np.ndarray A numpy array of actions to send to the experiment.

Returns

Sends all data to the experiment.

Source code in swarmrl/engine/real_experiment.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def send_actions(self, actions: np.ndarray):
    """
    Send the actions to the experiment apparatus.

    Parameters
    ----------
    actions : np.ndarray
            A numpy array of actions to send to the experiment.

    Returns
    -------
    Sends all data to the experiment.
    """
    # Flatten data in 'Fortran' style
    data = actions.flatten("F")
    print(f"Sending data with shape {np.shape(data)} \n")

    data_bytes = struct.pack(str(len(data)) + "d", *data)
    self.connection.sendall(data_bytes)

setup_simulation()

Method not required in the real experiment.

Source code in swarmrl/engine/real_experiment.py
51
52
53
54
55
def setup_simulation(self) -> None:
    """
    Method not required in the real experiment.
    """
    pass