Skip to content

swarmrl.training_routines.ensemble_submit Module API Reference

Class for submitting many jobs in parallel to a cluster.

EnsembleTraining

Class for ensemble training.

Source code in swarmrl/training_routines/ensemble_submit.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class EnsembleTraining:
    """
    Class for ensemble training.
    """

    def __init__(
        self,
        trainer: ContinuousTrainer,
        simulation_runner_generator: callable,
        number_of_ensembles: int,
        episode_length: int,
        n_episodes: int,
        n_parallel_jobs: int = None,
        load_path: Path = None,
        cluster: JobQueueCluster = None,
        output_dir: Path = Path("./ensembled-training"),
    ) -> None:
        """
        Constructor for the ensemble training routine.

        Parameters
        ----------
        trainer : Trainer
            The trainer used to train.
        number_of_ensmbles : int
            The number of ensembles to train.
        episode_length : int
            The length of each episode.
        n_episodes : int
            The number of episodes to train for.
        simulation_runner_generator : callable
            A callable function that returns a simulation runner.
        n_parallel_jobs : int
            The number of parallel jobs to run.
        cluster : JobQueueCluster
            The cluster to run the jobs on.
            If None, the jobs will be run locally.
        load_path : Path or str or None (default)
            The path to load the models from.
        output_dir : Path or str or None (default)
            The directory to save the models to.

        """
        self.simulation_runner_generator = simulation_runner_generator
        self.output_dir = Path(output_dir)
        self.load_path = load_path
        self.episode_length = episode_length
        self.n_episodes = n_episodes

        # Update the default parameters.
        if n_parallel_jobs is None:
            n_parallel_jobs = number_of_ensembles

        self.trainer = trainer
        self.number_of_ensembles = number_of_ensembles
        self.n_parallel_jobs = n_parallel_jobs

        # Use default local cluster if None is given.
        if cluster is None:
            cluster = LocalCluster(
                processes=True,
                threads_per_worker=2,
                silence_logs=logging.ERROR,
                resources={"espresso": 1},
            )
        self.cluster = cluster
        self.client = Client(cluster)

        self.cluster.scale(n=self.n_parallel_jobs)
        webbrowser.open(self.client.dashboard_link)

        # Create the output directory if needed.
        if not self.output_dir.exists():
            os.makedirs(self.output_dir)

    @staticmethod
    def _train_model(
        save_path: str,
        trainer: ContinuousTrainer,
        system_runner: callable,
        load_directory: str = None,
        episode_length: int = 100,
        n_episodes: int = 100,
    ) -> List:
        """
        Job to submit to dask.

        Parameters
        ----------
        ensemble_id : int
            The ensemble id.
        trainer : Trainer
            The trainer to use in training.
        load_directory : str
            The directory to load the models from.
        episode_length : int
            The length of each episode.
        n_episodes : int
            The number of episodes to train for.
        """
        model_id = save_path.split("_")[-1]
        # Create the new paths.
        os.makedirs(save_path)
        os.chdir(save_path)

        # Get the system runner.
        system_runner = system_runner()
        if load_directory is not None:
            trainer.restore_models(directory=load_directory)
        else:
            trainer.initialize_models()

        # Train the gym.
        rewards = trainer.perform_rl_training(
            system_runner,
            n_episodes=n_episodes,
            episode_length=episode_length,
            load_bar=False,
        )
        trainer.export_models()

        return rewards, model_id

    def train_ensemble(self) -> None:
        """
        Train the ensemble.

        Returns
        -------
        model_performance : dict
            A dictionary of the model performance.
            structure of the dictionary is: {model_id: rewards}.
        """
        futures = []
        names = [
            (self.output_dir / f"ensemble_{i}").resolve().as_posix()
            for i in range(self.number_of_ensembles)
        ]

        for i in range(self.number_of_ensembles // self.n_parallel_jobs):
            block = self.client.map(
                self._train_model,
                names[i * self.n_parallel_jobs : (i + 1) * self.n_parallel_jobs],
                [self.trainer] * self.n_parallel_jobs,
                [self.simulation_runner_generator] * self.n_parallel_jobs,
                [self.load_path] * self.n_parallel_jobs,
                [self.episode_length] * self.n_parallel_jobs,
                [self.n_episodes] * self.n_parallel_jobs,
                resources={"espresso": 1},
            )
            _ = wait(block)
            futures += self.client.gather(block)
            _ = self.client.restart(wait_for_workers=False)
            _ = self.client.wait_for_workers(self.n_parallel_jobs)

        # shut down the cluster
        self.cluster.close()
        self.client.close()

        return {model_id: rewards for rewards, model_id in futures}

__init__(trainer, simulation_runner_generator, number_of_ensembles, episode_length, n_episodes, n_parallel_jobs=None, load_path=None, cluster=None, output_dir=Path('./ensembled-training'))

Constructor for the ensemble training routine.

Parameters

trainer : Trainer The trainer used to train. number_of_ensmbles : int The number of ensembles to train. episode_length : int The length of each episode. n_episodes : int The number of episodes to train for. simulation_runner_generator : callable A callable function that returns a simulation runner. n_parallel_jobs : int The number of parallel jobs to run. cluster : JobQueueCluster The cluster to run the jobs on. If None, the jobs will be run locally. load_path : Path or str or None (default) The path to load the models from. output_dir : Path or str or None (default) The directory to save the models to.

Source code in swarmrl/training_routines/ensemble_submit.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    trainer: ContinuousTrainer,
    simulation_runner_generator: callable,
    number_of_ensembles: int,
    episode_length: int,
    n_episodes: int,
    n_parallel_jobs: int = None,
    load_path: Path = None,
    cluster: JobQueueCluster = None,
    output_dir: Path = Path("./ensembled-training"),
) -> None:
    """
    Constructor for the ensemble training routine.

    Parameters
    ----------
    trainer : Trainer
        The trainer used to train.
    number_of_ensmbles : int
        The number of ensembles to train.
    episode_length : int
        The length of each episode.
    n_episodes : int
        The number of episodes to train for.
    simulation_runner_generator : callable
        A callable function that returns a simulation runner.
    n_parallel_jobs : int
        The number of parallel jobs to run.
    cluster : JobQueueCluster
        The cluster to run the jobs on.
        If None, the jobs will be run locally.
    load_path : Path or str or None (default)
        The path to load the models from.
    output_dir : Path or str or None (default)
        The directory to save the models to.

    """
    self.simulation_runner_generator = simulation_runner_generator
    self.output_dir = Path(output_dir)
    self.load_path = load_path
    self.episode_length = episode_length
    self.n_episodes = n_episodes

    # Update the default parameters.
    if n_parallel_jobs is None:
        n_parallel_jobs = number_of_ensembles

    self.trainer = trainer
    self.number_of_ensembles = number_of_ensembles
    self.n_parallel_jobs = n_parallel_jobs

    # Use default local cluster if None is given.
    if cluster is None:
        cluster = LocalCluster(
            processes=True,
            threads_per_worker=2,
            silence_logs=logging.ERROR,
            resources={"espresso": 1},
        )
    self.cluster = cluster
    self.client = Client(cluster)

    self.cluster.scale(n=self.n_parallel_jobs)
    webbrowser.open(self.client.dashboard_link)

    # Create the output directory if needed.
    if not self.output_dir.exists():
        os.makedirs(self.output_dir)

train_ensemble()

Train the ensemble.

Returns

model_performance : dict A dictionary of the model performance. structure of the dictionary is: {model_id: rewards}.

Source code in swarmrl/training_routines/ensemble_submit.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def train_ensemble(self) -> None:
    """
    Train the ensemble.

    Returns
    -------
    model_performance : dict
        A dictionary of the model performance.
        structure of the dictionary is: {model_id: rewards}.
    """
    futures = []
    names = [
        (self.output_dir / f"ensemble_{i}").resolve().as_posix()
        for i in range(self.number_of_ensembles)
    ]

    for i in range(self.number_of_ensembles // self.n_parallel_jobs):
        block = self.client.map(
            self._train_model,
            names[i * self.n_parallel_jobs : (i + 1) * self.n_parallel_jobs],
            [self.trainer] * self.n_parallel_jobs,
            [self.simulation_runner_generator] * self.n_parallel_jobs,
            [self.load_path] * self.n_parallel_jobs,
            [self.episode_length] * self.n_parallel_jobs,
            [self.n_episodes] * self.n_parallel_jobs,
            resources={"espresso": 1},
        )
        _ = wait(block)
        futures += self.client.gather(block)
        _ = self.client.restart(wait_for_workers=False)
        _ = self.client.wait_for_workers(self.n_parallel_jobs)

    # shut down the cluster
    self.cluster.close()
    self.client.close()

    return {model_id: rewards for rewards, model_id in futures}