Skip to content

swarmrl.engine.espresso Module API Reference

Module for the espressoMD simulations.

EspressoMD

Bases: Engine

A class to manage the espressoMD environment.

Methods are allowed to add particles until the first call to integrate().

Dev Note: These methods need to register the new particles in self.colloid_radius_register The first call to integrate() will then setup the interactions and database output.

Source code in swarmrl/engine/espresso.py
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
class EspressoMD(Engine):
    """
    A class to manage the espressoMD environment.

    Methods are allowed to add particles until the first call to integrate().

    Dev Note: These methods need to register the new particles
    in self.colloid_radius_register
    The first call to integrate() will then setup the interactions and database output.
    """

    def __init__(
        self,
        md_params,
        n_dims=3,
        seed=42,
        out_folder=".",
        write_chunk_size=100,
        system=None,
    ):
        """
        Constructor for the espressoMD engine.

        Parameters
        ----------
        md_params : espresso.MDParams
                Parameter class for the espresso simulation.
        n_dims : int (default = 3)
                Number of dimensions to consider in the simulation
        seed : int
                Seed number for any generators.
        out_folder : str or pathlib.Path
                Path to an output folder to store data in. This file should have a
                reasonable amount of free space.
        write_chunk_size : int
                Chunk size to use in the hdf5 writing.
        system : espressomd.System (optional)
                Espresso system to use in this engine.
                If not provided, a new system will be created.
                Note: We try to clear the passed system of any previous contents,
                but do not guarantee that everything is reset completely. Use at
                own risk.
        """
        self.params: MDParams = md_params
        self.out_folder = pathlib.Path(out_folder).resolve()
        self.seed = seed
        self.rng = np.random.default_rng(self.seed)
        if n_dims not in [2, 3]:
            raise ValueError("Only 2d and 3d are allowed")
        self.n_dims = n_dims

        self._init_unit_system()
        self.write_chunk_size = write_chunk_size

        if system is None:
            self.system = espressomd.System(box_l=3 * [1.0])
        else:
            self.system = _reset_system(system)
        self._init_system()

        self.colloids = list()
        self.lbf: espressomd.lb.LBFluidWalberla = None

        # register to lookup which type has which radius
        self.colloid_radius_register = {}

        # after the first call to integrate, no more changes to the engine are allowed
        self.integration_initialised = False

        espressomd.assert_features(
            ["ROTATION", "EXTERNAL_FORCES", "THERMOSTAT_PER_PARTICLE"]
        )

    def _init_unit_system(self):
        """
        Initialize the unit registry managed by pint.

        Returns
        -------
        Updates the class state.
        """
        self.ureg = self.params.ureg

        # three basis units chosen arbitrarily
        self.ureg.define("sim_length = 1e-6 meter")
        self.ureg.define("sim_time = 1 second")
        self.ureg.define("sim_energy = 293 kelvin * boltzmann_constant")

        # derived units
        self.ureg.define("sim_velocity = sim_length / sim_time")
        self.ureg.define("sim_angular_velocity = 1 / sim_time")
        self.ureg.define("sim_mass = sim_energy / sim_velocity**2")
        self.ureg.define("sim_rinertia = sim_length**2 * sim_mass")
        self.ureg.define("sim_dyn_viscosity = sim_mass / (sim_length * sim_time)")
        self.ureg.define("sim_kin_viscosity = sim_length**2 / sim_time")
        self.ureg.define("sim_force = sim_mass * sim_length / sim_time**2")
        self.ureg.define("sim_torque = sim_length * sim_force")

    def _init_system(self):
        """
        Prepare the simulation box with the given parameters.

        Returns
        -------
        Update the class state.
        """
        # parameter unit conversion
        time_step = self.params.time_step.m_as("sim_time")
        # time slice: the amount of time the integrator runs before we look at the
        # configuration and change forces
        time_slice = self.params.time_slice.m_as("sim_time")

        write_interval = self.params.write_interval.m_as("sim_time")

        box_l = np.array(self.params.box_length.m_as("sim_length"))
        if np.isscalar(box_l):
            raise ValueError(
                "box_length must be a 3d vector (or 2d if you have a 2d system)"
            )
        if self.n_dims == 2 and len(box_l) == 2:
            # if a 2d system is simulated, the third dimension does not
            # matter but must still be set
            box_l = np.array([box_l[0], box_l[1], box_l[0]])
        if len(box_l) != 3:
            raise ValueError(
                f"box_length must be a 3d vector. You gave {self.params.box_length}"
            )

        # system setup. Skin is a verlet list parameter that has to be set, but only
        # affects performance
        self.system.box_l = box_l
        self.system.time_step = time_step
        self.system.cell_system.skin = 0.4
        self.system.periodicity = 3 * [self.params.periodic]

        # set writer params
        steps_per_write_interval = int(round(write_interval / time_step))
        self.params.steps_per_write_interval = steps_per_write_interval
        if abs(steps_per_write_interval - write_interval / time_step) > 1e-10:
            raise ValueError(
                "inconsistent parameters: write_interval must be integer multiple of"
                " time_step"
            )

        # set integrator params
        steps_per_slice = int(round(time_slice / time_step))
        self.params.steps_per_slice = steps_per_slice
        if abs(steps_per_slice - time_slice / time_step) > 1e-10:
            raise ValueError(
                "inconsistent parameters: time_slice must be integer multiple of"
                " time_step"
            )

    def _rotate_colloid_to_2d(self, colloid, theta):
        # rotate around the y-axis by 90 degrees
        colloid.rotate(axis=[0, 1, 0], angle=np.pi / 2.0)
        # now the particle points along the x-axis. The lab-z axis is the
        # body-frame (-x) -axis. We only allow rotation around the
        # labframe-z-axis from now on
        colloid.rotation = [True, False, False]
        # now rotate in-plane
        colloid.rotate(axis=[0, 0, 1], angle=theta)

    def _check_already_initialised(self):
        if self.integration_initialised:
            raise RuntimeError(
                "You cannot change the system configuration "
                "after the first call to integrate()"
            )

    def add_colloid_on_point(
        self,
        radius_colloid: pint.Quantity = None,
        init_position: pint.Quantity = None,
        init_direction: np.array = np.array([1, 0, 0]),
        type_colloid=0,
        gamma_translation: pint.Quantity = None,
        gamma_rotation: pint.Quantity = None,
        aspect_ratio: float = 1.0,
        mass: pint.Quantity = None,
        rinertia: pint.Quantity = None,
    ):
        """
        Parameters
        ----------
        radius_colloid
            default: 1 micrometer
        init_position
            default: center of the box
        init_direction
            default: along x
        type_colloid
            The colloids created from this method call will have this type.
            Multiple calls can be made with the same type_colloid.
            Interaction models need to be made aware if there are different types
            of colloids in the system if specific behaviour is desired.
        gamma_translation, gamma_rotation: pint.Quantity[np.array], optional
            If None, calculate these quantities from the radius and the fluid viscosity.
            You can provide friction coefficients as scalars or a 3-vector
            (the diagonal elements of the friction tensor).
        aspect_ratio: float, optional
            If you provide a value != 1, a gay-berne interaction will be set up
            instead of purely repulsive lennard jones.
            aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk
            (both swimming in the direction of symmetry).
        mass: optional
            Particle mass. Only relevant for Langevin integrator.
        rinertia: optional
            Diagonal elements of the rotational moment of inertia tensor
            of the particle, assuming the particle is oriented along z.

        Returns
        -------
        colloid.

        """

        self._check_already_initialised()

        if radius_colloid is None:
            radius_colloid = self.ureg.Quantity(1, "micrometer")
        if init_position is None:
            init_position = 0.5 * self.params.box_length

        if type_colloid in self.colloid_radius_register.keys():
            if self.colloid_radius_register[type_colloid][
                "radius"
            ] != radius_colloid.m_as("sim_length"):
                raise ValueError(
                    f"The chosen type {type_colloid} is already taken and used with a"
                    " different radius"
                    f" {self.colloid_radius_register[type_colloid]['radius']}. Choose a"
                    " new combination"
                )

        radius_simunits = radius_colloid.m_as("sim_length")
        init_pos = init_position.m_as("sim_length")
        init_direction = init_direction / np.linalg.norm(init_direction)

        (
            gamma_translation_sphere,
            gamma_rotation_sphere,
        ) = _calc_friction_coefficients(
            self.params.fluid_dyn_viscosity.m_as("sim_dyn_viscosity"), radius_simunits
        )
        if gamma_translation is None:
            gamma_translation = gamma_translation_sphere
        else:
            gamma_translation = gamma_translation.m_as("sim_force/sim_velocity")
        if gamma_rotation is None:
            gamma_rotation = gamma_rotation_sphere
        else:
            gamma_rotation = gamma_rotation.m_as("sim_torque/sim_angular_velocity")

        if self.params.thermostat_type == "langevin":
            if mass is None:
                raise ValueError(
                    "If you use the Langevin thermostat, you must set a particle mass"
                )
            if rinertia is None:
                raise ValueError(
                    "If you use the Langevin thermostat, you must set a particle"
                    " rotational inertia"
                )
        else:
            # mass and moment of inertia can still be relevant when calculating
            # the stochastic part of the particle velocity, see
            # https://espressomd.github.io/doc/integration.html#brownian-thermostat.
            # Provide defaults in case the user didn't set the values.
            water_dens = self.params.ureg.Quantity(1000, "kg/meter**3")
            if mass is None:
                mass = water_dens * 4.0 / 3.0 * np.pi * radius_colloid**3
            if rinertia is None:
                rinertia = 2.0 / 5.0 * mass * radius_colloid**2
                rinertia = utils.convert_array_of_pint_to_pint_of_array(
                    3 * [rinertia], self.params.ureg
                )

        if self.n_dims == 3:
            colloid = self.system.part.add(
                pos=init_pos,
                director=init_direction,
                rotation=3 * [True],
                gamma=gamma_translation,
                gamma_rot=gamma_rotation,
                fix=3 * [False],
                type=type_colloid,
                mass=mass.m_as("sim_mass"),
                rinertia=rinertia.m_as("sim_rinertia"),
            )
        else:
            # initialize with body-frame = lab-frame to set correct rotation flags
            # allow all rotations to bring the particle to correct state
            init_pos[2] = 0  # get rid of z-coordinate in 2D coordinates
            colloid = self.system.part.add(
                pos=init_pos,
                fix=[False, False, True],
                rotation=3 * [True],
                gamma=gamma_translation,
                gamma_rot=gamma_rotation,
                quat=[1, 0, 0, 0],
                type=type_colloid,
                mass=mass.m_as("sim_mass"),
                rinertia=rinertia.m_as("sim_rinertia"),
            )
            theta, phi = utils.angles_from_vector(init_direction)
            if abs(theta - np.pi / 2) > 10e-6:
                raise ValueError(
                    "It seems like you want to have a 2D simulation"
                    " with colloids that point some amount in Z-direction."
                    " Change something in your colloid setup."
                )
            self._rotate_colloid_to_2d(colloid, phi)

        self.colloids.append(colloid)

        self.colloid_radius_register.update(
            {type_colloid: {"radius": radius_simunits, "aspect_ratio": aspect_ratio}}
        )

        return colloid

    def add_colloids(
        self,
        n_colloids: int,
        radius_colloid: pint.Quantity = None,
        random_placement_center: pint.Quantity = None,
        random_placement_radius: pint.Quantity = None,
        type_colloid: int = 0,
        gamma_translation: pint.Quantity = None,
        gamma_rotation: pint.Quantity = None,
        aspect_ratio: float = 1.0,
        mass: pint.Quantity = None,
        rinertia: pint.Quantity = None,
    ):
        """
        Parameters
        ----------
        n_colloids
        radius_colloid
            default: 1 micrometer
        random_placement_center
            default: center of the box
        random_placement_radius
            default: half the box dimension
        type_colloid
            The colloids created from this method call will have this type.
            Multiple calls can be made with the same type_colloid.
            Interaction models need to be made aware if there are different types
            of colloids in the system if specific behaviour is desired.
        gamma_translation, gamma_rotation: optional
            If None, calculate these quantities from the radius and the fluid viscosity.
            You can provide friction coefficients as scalars or a 3-vector
            (the diagonal elements of the friction tensor)
        aspect_ratio
            If you provide a value != 1, a gay-berne interaction will be set up
            instead of purely repulsive lennard jones.
            aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk
            (both swimming in the direction of symmetry).
            The radius_colloid gives the radius perpendicular to the symmetry axis.
        mass: optional
            Particle mass. Only relevant for Langevin integrator.
        rinertia: optional
            Diagonal elements of the rotational moment of inertia tensor
            of the particle, assuming the particle is oriented along z.


        Returns
        -------

        """

        self._check_already_initialised()

        if random_placement_center is None:
            random_placement_center = self.ureg.Quantity(
                0.5 * self.params.box_length.m_as("sim_length"), "sim_length"
            )
        if random_placement_radius is None:
            random_placement_radius = 0.5 * min(self.params.box_length)

        init_center = random_placement_center.m_as("sim_length")
        init_rad = random_placement_radius.m_as("sim_length")

        for i in range(n_colloids):
            start_pos = (
                _get_random_start_pos(init_rad, init_center, self.n_dims, self.rng)
                * self.ureg.sim_length
            )

            if self.n_dims == 3:
                init_direction = utils.vector_from_angles(
                    *utils.get_random_angles(self.rng)
                )
            else:
                start_angle = 2 * np.pi * self.rng.random()
                init_direction = utils.vector_from_angles(np.pi / 2, start_angle)
            self.add_colloid_on_point(
                radius_colloid=radius_colloid,
                init_position=start_pos,
                init_direction=init_direction,
                type_colloid=type_colloid,
                gamma_translation=gamma_translation,
                gamma_rotation=gamma_rotation,
                aspect_ratio=aspect_ratio,
                mass=mass,
                rinertia=rinertia,
            )

    def add_rod(
        self,
        rod_center: pint.Quantity = None,
        rod_length: pint.Quantity = None,
        rod_thickness: pint.Quantity = None,
        rod_start_angle: float = None,
        n_particles: int = None,
        friction_trans: pint.Quantity = None,
        friction_rot: pint.Quantity = None,
        rod_particle_type: int = None,
        fixed: bool = True,
    ):
        """
        Add a rod to the system.
        A rod consists of n_particles point particles that are rigidly connected
        and rotate/move as a whole
        Parameters
        ----------
        rod_center
            default: center of the box
        rod_length
            default: 100 micrometer
        rod_thickness
            default: 5 micrometer
            Make sure there are enough particles.
            If the thickness is too thin, the rod might get holes
        rod_start_angle
            default: 0
        n_particles
            default: 101
            Must be uneven number such that there always is a central particle
        friction_trans
            Irrelevant if fixed==True
            must be provided
        friction_rot
            must be provided
        rod_particle_type
            The rod is made out of points so they get their own type.
        fixed
            Fixes the central particle of the rod.

        Returns
        -------
        The espresso handle to the central particle. For debugging purposes only
        """
        self._check_already_initialised()

        if rod_center is None:
            rod_center = self.params.box_length / 2.0
        if rod_length is None:
            rod_length = self.ureg.Quantity(100, "micrometer")
        if rod_thickness is None:
            rod_thickness = self.ureg.Quantity(5, "micrometer")
        if rod_start_angle is None:
            rod_start_angle = 0
        if n_particles is None:
            n_particles = 101
        if friction_trans is None and not fixed:
            raise ValueError(
                "If you want the rod to move, you must provide a friction coefficient"
            )
        if friction_rot is None:
            raise ValueError("You must provide a rotational friction coefficient")
        if rod_particle_type is None:
            raise ValueError("You must provide a particle type for the rod")

        if self.n_dims != 2:
            raise ValueError("Rod can only be added in 2d")
        if rod_center[2].magnitude != 0:
            raise ValueError(f"Rod center z-component must be 0. You gave {rod_center}")
        if n_particles % 2 != 1:
            raise ValueError(f"n_particles must be uneven. You gave {n_particles}")

        espressomd.assert_features(["VIRTUAL_SITES_RELATIVE"])
        import espressomd.virtual_sites as evs

        self.system.virtual_sites = evs.VirtualSitesRelative(have_quaternion=True)

        center_pos = rod_center.m_as("sim_length")
        fric_trans = friction_trans.m_as("sim_force/sim_velocity")  # [F / v]
        fric_rot = friction_rot.m_as(
            "sim_force * sim_length *  sim_time"
        )  # [M / omega]
        partcl_radius = rod_thickness.m_as("sim_length") / 2

        # place the real particle
        center_part = self.system.part.add(
            pos=center_pos,
            quat=[1, 0, 0, 0],
            rotation=3 * [True],
            fix=[fixed, fixed, True],
            gamma=fric_trans,
            gamma_rot=fric_rot,
            type=rod_particle_type,
        )
        self._rotate_colloid_to_2d(center_part, rod_start_angle)
        self.colloids.append(center_part)

        # place virtual
        point_span = rod_length.m_as("sim_length") - 2 * partcl_radius
        point_dist = point_span / (n_particles - 1)
        if point_dist > 2 * partcl_radius:
            logger.warning(
                "your rod has holes. "
                f"Particle radius {partcl_radius} "
                f"particle_distance {point_dist} "
                "(both in simulation units)"
            )

        director = utils.vector_from_angles(np.pi / 2, rod_start_angle)

        for k in range(n_particles - 1):
            dist_to_center = (-1) ** k * (k // 2 + 1) * point_dist
            pos_virt = center_pos + dist_to_center * director
            virtual_partcl = self.system.part.add(
                pos=pos_virt, director=director, virtual=True, type=rod_particle_type
            )
            virtual_partcl.vs_auto_relate_to(center_part)
            self.colloids.append(virtual_partcl)

        self.colloid_radius_register.update(
            {rod_particle_type: {"radius": partcl_radius, "aspect_ratio": 1.0}}
        )
        return center_part

    def add_confining_walls(self, wall_type: int):
        """
        Walls on the edges of the box, will interact with particles through WCA.
        Is NOT communicated to the interaction models, though.

        Parameters
        ----------
        wall_type : int
            Wall interacts with particles, so it needs its own type.

        Returns
        -------
        """
        self._check_already_initialised()
        if wall_type in self.colloid_radius_register.keys():
            raise ValueError(
                f"wall type {wall_type} is already taken "
                "by other system component. Choose a new one"
            )

        wall_shapes = []
        wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[1, 0, 0]))
        wall_shapes.append(
            espressomd.shapes.Wall(dist=-self.system.box_l[0], normal=[-1, 0, 0])
        )
        wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[0, 1, 0]))
        wall_shapes.append(
            espressomd.shapes.Wall(dist=-self.system.box_l[1], normal=[0, -1, 0])
        )
        if self.n_dims == 3:
            wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[0, 0, 1]))
            wall_shapes.append(
                espressomd.shapes.Wall(dist=-self.system.box_l[2], normal=[0, 0, -1])
            )

        for wall_shape in wall_shapes:
            constr = espressomd.constraints.ShapeBasedConstraint(
                shape=wall_shape, particle_type=wall_type, penetrable=False
            )
            self.system.constraints.add(constr)

        # the wall itself has no radius, only the particle radius counts
        self.colloid_radius_register.update(
            {wall_type: {"radius": 0.0, "aspect_ratio": 1.0}}
        )

    def add_walls(
        self,
        wall_start_point: pint.Quantity,
        wall_end_point: pint.Quantity,
        wall_type: int,
        wall_thickness: pint.Quantity,
    ):
        """
        User defined walls will interact with particles through WCA.
        Is NOT communicated to the interaction models, though.
        The walls have a large height resulting in 2D-walls in a 2D-simulation.
        The actual height adapts to the chosen box size.
        The shape of the underlying constraint is a square.

        Parameters
        ----------
        wall_start_point : pint.Quantity
        np.array (n,2) with wall coordinates
             [x_begin, y_begin]
        wall_end_point : pint.Quantity
        np.array (n,2) with wall coordinates
             [x_end, y_end]
        wall_type : int
            Wall interacts with particles, so it needs its own type.
        wall_thickness: pint.Quantity
            wall thickness

        Returns
        -------
        """

        wall_start_point = wall_start_point.m_as("sim_length")
        wall_end_point = wall_end_point.m_as("sim_length")
        wall_thickness = wall_thickness.m_as("sim_length")

        if len(wall_start_point) != len(wall_end_point):
            raise ValueError(
                " Please double check your walls. There are more or less "
                f" starting points {len(wall_start_point)} than "
                f" end points {len(wall_end_point)}. They should be equal."
            )

        self._check_already_initialised()
        if wall_type in self.colloid_radius_register.keys():
            if self.colloid_radius_register[wall_type] != 0.0:
                raise ValueError(
                    f" The chosen type {wall_type} is already taken"
                    "and used with a different radius "
                    f"{self.colloid_radius_register[wall_type]['radius']}."
                    " Choose a new combination"
                )

        z_height = self.system.box_l[2]
        wall_shapes = []

        for wall_index in range(len(wall_start_point)):
            a = [
                wall_end_point[wall_index, 0] - wall_start_point[wall_index, 0],
                wall_end_point[wall_index, 1] - wall_start_point[wall_index, 1],
                0,
            ]  # direction along lengthy wall
            c = [0, 0, z_height]  # direction along third axis of 2D simulation
            norm_a = np.linalg.norm(a)  # is also the norm of b
            norm_c = np.linalg.norm(c)
            b = (
                np.cross(a / norm_a, c / norm_c) * wall_thickness
            )  # direction along second axis
            # i.e along wall_thickness of lengthy wall
            corner = [
                wall_start_point[wall_index, 0] - b[0] / 2,
                wall_start_point[wall_index, 1] - b[1] / 2,
                0,
            ]  # anchor point of wall shifted by wall_thickness*1/2

            wall_shapes.append(
                espressomd.shapes.Rhomboid(corner=corner, a=a, b=b, c=c, direction=1)
            )

        for wall_shape in wall_shapes:
            constr = espressomd.constraints.ShapeBasedConstraint(
                shape=wall_shape, particle_type=wall_type, penetrable=False
            )
            self.system.constraints.add(constr)

        # the wall itself has no radius, only the particle radius counts
        self.colloid_radius_register.update(
            {wall_type: {"radius": 0.0, "aspect_ratio": 1.0}}
        )

    def _setup_interactions(self):
        aspect_ratios = [
            d["aspect_ratio"] for d in self.colloid_radius_register.values()
        ]
        if len(np.unique(aspect_ratios)) > 1:
            raise ValueError(
                "All particles in the system must have the same aspect ratio."
            )
        for type_0, prop_dict_0 in self.colloid_radius_register.items():
            for type_1, prop_dict_1 in self.colloid_radius_register.items():
                if type_0 > type_1:
                    continue
                if prop_dict_0["aspect_ratio"] == 1.0:
                    self.system.non_bonded_inter[type_0, type_1].wca.set_params(
                        sigma=(prop_dict_0["radius"] + prop_dict_1["radius"])
                        * 2 ** (-1 / 6),
                        epsilon=self.params.WCA_epsilon.m_as("sim_energy"),
                    )
                else:
                    espressomd.assert_features(["GAY_BERNE"])
                    aspect = prop_dict_0["aspect_ratio"]
                    self.system.non_bonded_inter[type_0, type_1].gay_berne.set_params(
                        sig=(prop_dict_0["radius"] + prop_dict_1["radius"])
                        * 2 ** (-1 / 6),
                        k1=prop_dict_0["aspect_ratio"],
                        k2=1.0,
                        nu=1,
                        mu=2,
                        cut=2 * prop_dict_0["radius"] * max([aspect, 1 / aspect]) * 2,
                        eps=self.params.WCA_epsilon.m_as("sim_energy"),
                    )

    def add_const_force_to_colloids(self, force: pint.Quantity, type: int):
        """
        Parameters
        ----------
        force: pint.Quantity
            A Quantity of numpy array, e.g. f = Quantity(np.array([1,2,3]), "newton")
        type: int
            The type of colloid that gets the force.
            Needs to be already added to the engine
        """
        force_simunits = force.m_as("sim_force")
        parts = self.system.part.select(type=type)
        if len(parts) == 0:
            raise ValueError(
                f"Particles of type {type} not added to engine. "
                f"You currently have {self.colloid_radius_register.keys()}"
            )
        parts.ext_force = force_simunits

    def add_lattice_boltzmann(
        self,
        agrid: pint.Quantity = None,
        lb_time_step: pint.Quantity = None,
        dynamic_viscosity: pint.Quantity = None,
        fluid_density: pint.Quantity = None,
        boundary_mask: np.array = None,
        ext_force_density: pint.Quantity = None,
        use_GPU: bool = False,
    ):
        """
        Add a lattice boltzmann fluid to the simulation.

        Parameters:
        -----------

        agrid: pint.Quantity, scalar
            The uniform grid spacing in all 3 dimensions.
            Must be compatible with params.box_length.
        lb_time_step: pint.Quantity, scalar, optional
            Lb time step, must be integer multiple of params.time_step.
            Default: params.time_step
        dynamic_viscosity: pint.Quantity, scalar, optional
            default: self.params.fluid_dyn_viscosity
            only change if you know what you are doing
        fluid_density: pint.Quantity, scalar, optional
            default: 1000kg/m**3
        boundary_mask: np.array, optional:
            A 3D boolean array that defines the no-slip boundary cells of the fluid.
            Must be compatible with the grid that gets generated
            from params.box_length and agrid.
        ext_force_density: pint.Quantity, 3d vector, optional.
            default: [0,0,0] N/m**3
        """
        if not self.params.thermostat_type == "langevin":
            raise RuntimeError(
                "Coupling to lattice boltzmann does not work with a Brownian"
                " thermostat. Use 'langevin'."
            )

        if agrid is None:
            raise ValueError("agrid must be provided")
        if lb_time_step is None:
            lb_time_step = self.params.time_step
        if dynamic_viscosity is None:
            dynamic_viscosity = self.params.fluid_dyn_viscosity
        if fluid_density is None:
            fluid_density = self.ureg.Quantity(1000, "kg/m**3")
        if ext_force_density is None:
            ext_force_density = self.ureg.Quantity(np.zeros(3), "N/m**3")
        if use_GPU:
            raise NotImplementedError(
                "GPU support is not yet implemented. Stay tuned tho"
            )

        lbf = espressomd.lb.LBFluidWalberla(
            tau=lb_time_step.m_as("sim_time"),
            kT=(self.params.temperature * self.ureg.boltzmann_constant).m_as(
                "sim_energy"
            ),
            density=fluid_density.m_as("sim_mass/sim_length**3"),
            kinematic_viscosity=(dynamic_viscosity / fluid_density).m_as(
                "sim_kin_viscosity"
            ),
            agrid=agrid.m_as("sim_length"),
            seed=self.seed,
            ext_force_density=ext_force_density.m_as("sim_force/sim_length**3"),
        )

        if boundary_mask is not None:
            from espressomd.script_interface import array_variant

            if not np.all(lbf.shape == boundary_mask.shape):
                raise ValueError(
                    "boundary_mask must have the same shape as the fluid grid"
                )

            lbf.call_method(
                "add_boundary_from_shape",
                raster=array_variant(boundary_mask.astype(int).flatten()),
                values=array_variant(np.zeros(3, dtype=float).flatten()),
            )

        self.lbf = lbf

        return lbf

    def add_flowfield(
        self,
        flowfield: pint.Quantity,
        friction_coeff: pint.Quantity,
        grid_spacings: pint.Quantity,
    ):
        """
        Parameters
        ----------
        flowfield: pint.Quantity[np.array]
            The flowfield to add, given as a pint Quantity of a numpy array
            with units of velocity.
            Must have shape (n_cells_x, n_cells_y, n_cells_z, 3)
            The velocity values must be centered in the corresponding grid,
            e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the velocity at
            np.dot([idx_x+0.5,idx_y+0.5,idx_z+0.5],[agrid_x,agrid_y,agrid_z]).
            From these points, the velocity is interpolated to the particle positions.
        friction_coeff: pint.Quantity[float]
            The friction coefficient in units of mass/time.
            Espresso does not yet support particle-specific or anisotropic
            friction coefficients for flow coupling, so one scalar value has to be
            provided here which will be used for all particles.
        grid_spacings: pint.Quantity[np.array]
            This grid spacing will be used to fit the flowfield into the simulation box.
            If you run a 2d-simulation, choose grid_spacings[2]=box_l.
        """

        if not self.params.thermostat_type == "langevin":
            raise RuntimeError(
                "Coupling to a flowfield does not work with a Brownian thermostat. Use"
                " 'langevin'."
            )

        flow = flowfield.m_as("sim_velocity")
        gamma = friction_coeff.m_as("sim_mass/sim_time")
        agrids = grid_spacings.m_as("sim_length")

        if not flow.ndim == 4:
            raise ValueError(
                "flowfield must have shape (n_cells_x, n_cells_y, n_cells_z, 3)"
            )
        if not len(grid_spacings) == 3:
            raise ValueError("Grid spacings must have length of 3")

        # espresso constraint field must be one grid larger in all directions
        # for interpolation. Apply periodic boundary conditions
        flow_padded = np.stack(
            [np.pad(flow[:, :, :, i], mode="wrap", pad_width=1) for i in range(3)],
            axis=3,
        )
        flow_constraint = espressomd.constraints.FlowField(
            field=flow_padded, gamma=gamma, grid_spacing=agrids
        )
        self.system.constraints.add(flow_constraint)

    def add_external_potential(
        self, potential: pint.Quantity, grid_spacings: pint.Quantity
    ):
        """
        Parameters
        ----------
        potential: pint.Quantity[np.array]
            The flowfield to add, given as a pint Quantity of a numpy array
            with units of energy.
            Must have shape (n_cells_x, n_cells_y, n_cells_z)
            The potential values must be centered in the corresponding grid,
            e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the potential at
            np.dot([idx_x+0.5, idx_y+0.5, idx_z+0.5], [agrid_x, agrid_y, agrid_z]).
            From these points, the potential is interpolated to the particle positions.
        grid_spacings: pint.Quantity[np.array]
            This grid spacing will be used to fit the potential into the simulation box.
            If you run a 2d-simulation, choose grid_spacings[2]=box_l.
        """

        pot = potential.m_as("sim_energy")
        agrids = grid_spacings.m_as("sim_length")

        if not pot.ndim == 3:
            raise ValueError(
                "potential must have shape (n_cells_x, n_cells_y, n_cells_z)"
            )
        if not len(grid_spacings) == 3:
            raise ValueError("Grid spacings must have length of 3")

        # Espresso constraint field must be one cell larger in all directions
        # for interpolation. Apply periodic boundary conditions.
        pot_padded = np.pad(
            pot,
            pad_width=1,
            mode="wrap",
        )
        pot_constraint = espressomd.constraints.PotentialField(
            field=pot_padded[:, :, :, np.newaxis],
            grid_spacing=agrids,
            default_scale=1.0,
        )
        self.system.constraints.add(pot_constraint)

    def get_friction_coefficients(self, type: int):
        """
        Returns both the translational and the rotational friction coefficient
        of the desired type in simulation units
        """
        property_dict = self.colloid_radius_register.get(type, None)
        if property_dict is None:
            raise ValueError(
                f"cannot get friction coefficient for type {type}. Did you actually add"
                " that particle type?"
            )
        return _calc_friction_coefficients(
            self.params.fluid_dyn_viscosity.m_as("sim_dyn_viscosity"),
            property_dict["radius"],
        )

    def _init_h5_output(self):
        """
        Initialize the hdf5 output.

        This method will create a directory for the data to be stored within. Follwing
        this, a hdf5 database is constructed for storing of the simulation data.


        Returns
        -------
        Creates hdf5 database and updates class state.
        """
        self.h5_filename = self.out_folder / "trajectory.hdf5"
        self.out_folder.mkdir(parents=True, exist_ok=True)
        self.traj_holder = {
            "Times": list(),
            "Ids": list(),
            "Types": list(),
            "Unwrapped_Positions": list(),
            "Velocities": list(),
            "Directors": list(),
        }

        n_colloids = len(self.colloids)

        with h5py.File(self.h5_filename.as_posix(), "a") as h5_outfile:
            part_group = h5_outfile.require_group("colloids")
            dataset_kwargs = dict(compression="gzip")
            traj_len = self.write_chunk_size

            part_group.require_dataset(
                "Times",
                shape=(traj_len, 1, 1),
                maxshape=(None, 1, 1),
                dtype=float,
                **dataset_kwargs,
            )
            for name in ["Ids", "Types"]:
                part_group.require_dataset(
                    name,
                    shape=(traj_len, n_colloids, 1),
                    maxshape=(None, n_colloids, 1),
                    dtype=int,
                    **dataset_kwargs,
                )
            for name in ["Unwrapped_Positions", "Velocities", "Directors"]:
                part_group.require_dataset(
                    name,
                    shape=(traj_len, n_colloids, 3),
                    maxshape=(None, n_colloids, 3),
                    dtype=float,
                    **dataset_kwargs,
                )
        self.write_idx = 0
        self.h5_time_steps_written = 0

    def _update_traj_holder(self):
        if len(self.colloids) == 0:
            logger.warning("No colloids in the system. Not writing to hdf5")
            return
        # need to add axes on the non-vectorial quantities
        self.traj_holder["Times"].append(np.array([self.system.time])[:, np.newaxis])
        self.traj_holder["Ids"].append(
            np.array([c.id for c in self.colloids])[:, np.newaxis]
        )
        self.traj_holder["Types"].append(
            np.array([c.type for c in self.colloids])[:, np.newaxis]
        )
        self.traj_holder["Unwrapped_Positions"].append(
            np.stack([c.pos for c in self.colloids], axis=0)
        )
        self.traj_holder["Velocities"].append(
            np.stack([c.v for c in self.colloids], axis=0)
        )
        self.traj_holder["Directors"].append(
            np.stack([c.director for c in self.colloids], axis=0)
        )

    def _write_traj_chunk_to_file(self):
        """
        Write a chunk of data to the HDF5 database

        Returns
        -------
        Adds data to the database and updates the class state.
        """
        n_new_timesteps = len(self.traj_holder["Times"])
        if n_new_timesteps == 0:
            return

        with h5py.File(self.h5_filename, "a") as h5_outfile:
            part_group = h5_outfile["colloids"]
            for key in self.traj_holder.keys():
                dataset = part_group[key]
                values = np.stack(self.traj_holder[key], axis=0)
                # save in format (time_step, n_particles, dimension)
                dataset.resize(self.h5_time_steps_written + n_new_timesteps, axis=0)
                dataset[
                    self.h5_time_steps_written : self.h5_time_steps_written
                    + n_new_timesteps,
                    ...,
                ] = values

        logger.debug(f"wrote {n_new_timesteps} time steps to hdf5 file")
        self.h5_time_steps_written += n_new_timesteps

    def _remove_overlap(self):
        # remove overlap
        self.system.integrator.set_steepest_descent(
            f_max=0.0, gamma=0.1, max_displacement=0.1
        )
        self.system.integrator.run(1000)

        # set the thermostat
        kT = (self.params.temperature * self.ureg.boltzmann_constant).m_as("sim_energy")

        allowed_integrators = ["brownian", "langevin"]
        if self.params.thermostat_type not in allowed_integrators:
            raise ValueError(f"integrator_type must be one of {allowed_integrators}")

        # Dummy gamma values, we set them for each particle separately.
        # If we forget to do so, the simulation will explode as a gentle reminder
        if self.params.thermostat_type == "brownian":
            self.system.thermostat.set_brownian(
                kT=kT,
                gamma=1e-20,
                gamma_rotation=1e-20,
                seed=self.seed,
                act_on_virtual=False,
            )
            self.system.integrator.set_brownian_dynamics()
        elif self.params.thermostat_type == "langevin":
            if self.lbf is None:
                self.system.thermostat.set_langevin(
                    kT=kT,
                    gamma=1e-20,
                    gamma_rotation=1e-20,
                    seed=self.seed,
                    act_on_virtual=False,
                )
            else:
                self.system.lb = self.lbf
                self.system.thermostat.set_lb(
                    LB_fluid=self.lbf, gamma=1e300, seed=self.seed
                )

            self.system.integrator.set_vv()

    def manage_forces(self, force_model: ForceFunction = None) -> bool:
        """
        Manage external forces.

        Collect the forces from the force function and apply them to the colloids.

        Parameters
        ----------
        force_model : ForceFunction
            Model with which to compute external forces.
        """
        swarmrl_colloids = []
        if force_model is not None:
            for col in self.colloids:
                swarmrl_colloids.append(
                    Colloid(
                        pos=col.pos,
                        velocity=col.v,
                        director=col.director,
                        id=col.id,
                        type=col.type,
                    )
                )
            actions = force_model.calc_action(swarmrl_colloids)
            for action, coll in zip(actions, self.colloids):
                coll.swimming = {"f_swim": action.force}
                coll.ext_torque = (
                    action.torque
                    if action.torque is not None
                    else np.zeros(
                        3,
                    )
                )
                new_direction = action.new_direction
                if new_direction is not None:
                    if self.n_dims == 3:
                        coll.director = new_direction
                    else:
                        old_direction = coll.director
                        rotation_angle = np.arccos(np.dot(new_direction, old_direction))
                        if rotation_angle > 1e-6:
                            rotation_axis = np.cross(old_direction, new_direction)
                            rotation_axis /= np.linalg.norm(rotation_axis)
                            # only values of [0,0,1], [0,0,-1] can come out here,
                            # plusminus numerical errors
                            rotation_axis = [0, 0, round(rotation_axis[2])]
                            coll.rotate(axis=rotation_axis, angle=rotation_angle)

    def integrate(self, n_slices, force_model: ForceFunction = None):
        """
        Integrate the system for n_slices steps.

        Parameters
        ----------
        n_slices : int
                Number of integration steps to run.
        force_model : ForceFunction
                A SwarmRL interaction model to decide particle interaction rules.

        Returns
        -------
        Runs the simulation environment.
        """

        if not self.integration_initialised:
            self.slice_idx = 0
            self.step_idx = 0
            self._setup_interactions()
            self._remove_overlap()
            self._init_h5_output()
            self.integration_initialised = True

        old_slice_idx = self.slice_idx

        while self.step_idx < self.params.steps_per_slice * (old_slice_idx + n_slices):
            if self.step_idx == self.params.steps_per_write_interval * self.write_idx:
                self._update_traj_holder()
                self.write_idx += 1

                if len(self.traj_holder["Times"]) >= self.write_chunk_size:
                    self._write_traj_chunk_to_file()
                    for val in self.traj_holder.values():
                        val.clear()

            # Break the simulaion if the kill switch is engaged.
            if force_model is not None:
                if force_model.kill_switch:
                    break

            if self.step_idx == self.params.steps_per_slice * self.slice_idx:
                self.slice_idx += 1
                self.manage_forces(force_model)

            steps_to_next_write = (
                self.params.steps_per_write_interval * self.write_idx - self.step_idx
            )
            steps_to_next_slice = (
                self.params.steps_per_slice * self.slice_idx - self.step_idx
            )
            steps_to_next = min(steps_to_next_write, steps_to_next_slice)

            self.system.integrator.run(
                steps_to_next, reuse_forces=True, recalc_forces=False
            )
            self.step_idx += steps_to_next

    def finalize(self):
        """
        Method to clean up after finishing the simulation

        Method will write the last chunks of trajectory
        """
        self._write_traj_chunk_to_file()

    def get_particle_data(self):
        """
        Collect specific particle information from the colloids.

        Returns
        -------
        information : dict
                A dict of information for all of the colloids in the system including
                unwrapped positions, velocities, and the directors of the colloids.
        """
        return {
            "Id": np.array([c.id for c in self.colloids]),
            "Type": np.array([c.type for c in self.colloids]),
            "Unwrapped_Positions": np.stack([c.pos for c in self.colloids]),
            "Velocities": np.stack([c.v for c in self.colloids]),
            "Directors": np.stack([c.director for c in self.colloids]),
        }

    def get_unit_system(self):
        """
        Collect the pin unit registry.

        Returns
        -------
        unit_registry: object
                The class unit registry.
        """
        return self.ureg

__init__(md_params, n_dims=3, seed=42, out_folder='.', write_chunk_size=100, system=None)

Constructor for the espressoMD engine.

Parameters

md_params : espresso.MDParams Parameter class for the espresso simulation. n_dims : int (default = 3) Number of dimensions to consider in the simulation seed : int Seed number for any generators. out_folder : str or pathlib.Path Path to an output folder to store data in. This file should have a reasonable amount of free space. write_chunk_size : int Chunk size to use in the hdf5 writing. system : espressomd.System (optional) Espresso system to use in this engine. If not provided, a new system will be created. Note: We try to clear the passed system of any previous contents, but do not guarantee that everything is reset completely. Use at own risk.

Source code in swarmrl/engine/espresso.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def __init__(
    self,
    md_params,
    n_dims=3,
    seed=42,
    out_folder=".",
    write_chunk_size=100,
    system=None,
):
    """
    Constructor for the espressoMD engine.

    Parameters
    ----------
    md_params : espresso.MDParams
            Parameter class for the espresso simulation.
    n_dims : int (default = 3)
            Number of dimensions to consider in the simulation
    seed : int
            Seed number for any generators.
    out_folder : str or pathlib.Path
            Path to an output folder to store data in. This file should have a
            reasonable amount of free space.
    write_chunk_size : int
            Chunk size to use in the hdf5 writing.
    system : espressomd.System (optional)
            Espresso system to use in this engine.
            If not provided, a new system will be created.
            Note: We try to clear the passed system of any previous contents,
            but do not guarantee that everything is reset completely. Use at
            own risk.
    """
    self.params: MDParams = md_params
    self.out_folder = pathlib.Path(out_folder).resolve()
    self.seed = seed
    self.rng = np.random.default_rng(self.seed)
    if n_dims not in [2, 3]:
        raise ValueError("Only 2d and 3d are allowed")
    self.n_dims = n_dims

    self._init_unit_system()
    self.write_chunk_size = write_chunk_size

    if system is None:
        self.system = espressomd.System(box_l=3 * [1.0])
    else:
        self.system = _reset_system(system)
    self._init_system()

    self.colloids = list()
    self.lbf: espressomd.lb.LBFluidWalberla = None

    # register to lookup which type has which radius
    self.colloid_radius_register = {}

    # after the first call to integrate, no more changes to the engine are allowed
    self.integration_initialised = False

    espressomd.assert_features(
        ["ROTATION", "EXTERNAL_FORCES", "THERMOSTAT_PER_PARTICLE"]
    )

add_colloid_on_point(radius_colloid=None, init_position=None, init_direction=np.array([1, 0, 0]), type_colloid=0, gamma_translation=None, gamma_rotation=None, aspect_ratio=1.0, mass=None, rinertia=None)

Parameters

radius_colloid default: 1 micrometer init_position default: center of the box init_direction default: along x type_colloid The colloids created from this method call will have this type. Multiple calls can be made with the same type_colloid. Interaction models need to be made aware if there are different types of colloids in the system if specific behaviour is desired. gamma_translation, gamma_rotation: pint.Quantity[np.array], optional If None, calculate these quantities from the radius and the fluid viscosity. You can provide friction coefficients as scalars or a 3-vector (the diagonal elements of the friction tensor). aspect_ratio: float, optional If you provide a value != 1, a gay-berne interaction will be set up instead of purely repulsive lennard jones. aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk (both swimming in the direction of symmetry). mass: optional Particle mass. Only relevant for Langevin integrator. rinertia: optional Diagonal elements of the rotational moment of inertia tensor of the particle, assuming the particle is oriented along z.

Returns

colloid.

Source code in swarmrl/engine/espresso.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def add_colloid_on_point(
    self,
    radius_colloid: pint.Quantity = None,
    init_position: pint.Quantity = None,
    init_direction: np.array = np.array([1, 0, 0]),
    type_colloid=0,
    gamma_translation: pint.Quantity = None,
    gamma_rotation: pint.Quantity = None,
    aspect_ratio: float = 1.0,
    mass: pint.Quantity = None,
    rinertia: pint.Quantity = None,
):
    """
    Parameters
    ----------
    radius_colloid
        default: 1 micrometer
    init_position
        default: center of the box
    init_direction
        default: along x
    type_colloid
        The colloids created from this method call will have this type.
        Multiple calls can be made with the same type_colloid.
        Interaction models need to be made aware if there are different types
        of colloids in the system if specific behaviour is desired.
    gamma_translation, gamma_rotation: pint.Quantity[np.array], optional
        If None, calculate these quantities from the radius and the fluid viscosity.
        You can provide friction coefficients as scalars or a 3-vector
        (the diagonal elements of the friction tensor).
    aspect_ratio: float, optional
        If you provide a value != 1, a gay-berne interaction will be set up
        instead of purely repulsive lennard jones.
        aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk
        (both swimming in the direction of symmetry).
    mass: optional
        Particle mass. Only relevant for Langevin integrator.
    rinertia: optional
        Diagonal elements of the rotational moment of inertia tensor
        of the particle, assuming the particle is oriented along z.

    Returns
    -------
    colloid.

    """

    self._check_already_initialised()

    if radius_colloid is None:
        radius_colloid = self.ureg.Quantity(1, "micrometer")
    if init_position is None:
        init_position = 0.5 * self.params.box_length

    if type_colloid in self.colloid_radius_register.keys():
        if self.colloid_radius_register[type_colloid][
            "radius"
        ] != radius_colloid.m_as("sim_length"):
            raise ValueError(
                f"The chosen type {type_colloid} is already taken and used with a"
                " different radius"
                f" {self.colloid_radius_register[type_colloid]['radius']}. Choose a"
                " new combination"
            )

    radius_simunits = radius_colloid.m_as("sim_length")
    init_pos = init_position.m_as("sim_length")
    init_direction = init_direction / np.linalg.norm(init_direction)

    (
        gamma_translation_sphere,
        gamma_rotation_sphere,
    ) = _calc_friction_coefficients(
        self.params.fluid_dyn_viscosity.m_as("sim_dyn_viscosity"), radius_simunits
    )
    if gamma_translation is None:
        gamma_translation = gamma_translation_sphere
    else:
        gamma_translation = gamma_translation.m_as("sim_force/sim_velocity")
    if gamma_rotation is None:
        gamma_rotation = gamma_rotation_sphere
    else:
        gamma_rotation = gamma_rotation.m_as("sim_torque/sim_angular_velocity")

    if self.params.thermostat_type == "langevin":
        if mass is None:
            raise ValueError(
                "If you use the Langevin thermostat, you must set a particle mass"
            )
        if rinertia is None:
            raise ValueError(
                "If you use the Langevin thermostat, you must set a particle"
                " rotational inertia"
            )
    else:
        # mass and moment of inertia can still be relevant when calculating
        # the stochastic part of the particle velocity, see
        # https://espressomd.github.io/doc/integration.html#brownian-thermostat.
        # Provide defaults in case the user didn't set the values.
        water_dens = self.params.ureg.Quantity(1000, "kg/meter**3")
        if mass is None:
            mass = water_dens * 4.0 / 3.0 * np.pi * radius_colloid**3
        if rinertia is None:
            rinertia = 2.0 / 5.0 * mass * radius_colloid**2
            rinertia = utils.convert_array_of_pint_to_pint_of_array(
                3 * [rinertia], self.params.ureg
            )

    if self.n_dims == 3:
        colloid = self.system.part.add(
            pos=init_pos,
            director=init_direction,
            rotation=3 * [True],
            gamma=gamma_translation,
            gamma_rot=gamma_rotation,
            fix=3 * [False],
            type=type_colloid,
            mass=mass.m_as("sim_mass"),
            rinertia=rinertia.m_as("sim_rinertia"),
        )
    else:
        # initialize with body-frame = lab-frame to set correct rotation flags
        # allow all rotations to bring the particle to correct state
        init_pos[2] = 0  # get rid of z-coordinate in 2D coordinates
        colloid = self.system.part.add(
            pos=init_pos,
            fix=[False, False, True],
            rotation=3 * [True],
            gamma=gamma_translation,
            gamma_rot=gamma_rotation,
            quat=[1, 0, 0, 0],
            type=type_colloid,
            mass=mass.m_as("sim_mass"),
            rinertia=rinertia.m_as("sim_rinertia"),
        )
        theta, phi = utils.angles_from_vector(init_direction)
        if abs(theta - np.pi / 2) > 10e-6:
            raise ValueError(
                "It seems like you want to have a 2D simulation"
                " with colloids that point some amount in Z-direction."
                " Change something in your colloid setup."
            )
        self._rotate_colloid_to_2d(colloid, phi)

    self.colloids.append(colloid)

    self.colloid_radius_register.update(
        {type_colloid: {"radius": radius_simunits, "aspect_ratio": aspect_ratio}}
    )

    return colloid

add_colloids(n_colloids, radius_colloid=None, random_placement_center=None, random_placement_radius=None, type_colloid=0, gamma_translation=None, gamma_rotation=None, aspect_ratio=1.0, mass=None, rinertia=None)

Parameters

n_colloids radius_colloid default: 1 micrometer random_placement_center default: center of the box random_placement_radius default: half the box dimension type_colloid The colloids created from this method call will have this type. Multiple calls can be made with the same type_colloid. Interaction models need to be made aware if there are different types of colloids in the system if specific behaviour is desired. gamma_translation, gamma_rotation: optional If None, calculate these quantities from the radius and the fluid viscosity. You can provide friction coefficients as scalars or a 3-vector (the diagonal elements of the friction tensor) aspect_ratio If you provide a value != 1, a gay-berne interaction will be set up instead of purely repulsive lennard jones. aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk (both swimming in the direction of symmetry). The radius_colloid gives the radius perpendicular to the symmetry axis. mass: optional Particle mass. Only relevant for Langevin integrator. rinertia: optional Diagonal elements of the rotational moment of inertia tensor of the particle, assuming the particle is oriented along z.

Returns
Source code in swarmrl/engine/espresso.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def add_colloids(
    self,
    n_colloids: int,
    radius_colloid: pint.Quantity = None,
    random_placement_center: pint.Quantity = None,
    random_placement_radius: pint.Quantity = None,
    type_colloid: int = 0,
    gamma_translation: pint.Quantity = None,
    gamma_rotation: pint.Quantity = None,
    aspect_ratio: float = 1.0,
    mass: pint.Quantity = None,
    rinertia: pint.Quantity = None,
):
    """
    Parameters
    ----------
    n_colloids
    radius_colloid
        default: 1 micrometer
    random_placement_center
        default: center of the box
    random_placement_radius
        default: half the box dimension
    type_colloid
        The colloids created from this method call will have this type.
        Multiple calls can be made with the same type_colloid.
        Interaction models need to be made aware if there are different types
        of colloids in the system if specific behaviour is desired.
    gamma_translation, gamma_rotation: optional
        If None, calculate these quantities from the radius and the fluid viscosity.
        You can provide friction coefficients as scalars or a 3-vector
        (the diagonal elements of the friction tensor)
    aspect_ratio
        If you provide a value != 1, a gay-berne interaction will be set up
        instead of purely repulsive lennard jones.
        aspect_ratio > 1 will produce a cigar, aspect_ratio < 0 a disk
        (both swimming in the direction of symmetry).
        The radius_colloid gives the radius perpendicular to the symmetry axis.
    mass: optional
        Particle mass. Only relevant for Langevin integrator.
    rinertia: optional
        Diagonal elements of the rotational moment of inertia tensor
        of the particle, assuming the particle is oriented along z.


    Returns
    -------

    """

    self._check_already_initialised()

    if random_placement_center is None:
        random_placement_center = self.ureg.Quantity(
            0.5 * self.params.box_length.m_as("sim_length"), "sim_length"
        )
    if random_placement_radius is None:
        random_placement_radius = 0.5 * min(self.params.box_length)

    init_center = random_placement_center.m_as("sim_length")
    init_rad = random_placement_radius.m_as("sim_length")

    for i in range(n_colloids):
        start_pos = (
            _get_random_start_pos(init_rad, init_center, self.n_dims, self.rng)
            * self.ureg.sim_length
        )

        if self.n_dims == 3:
            init_direction = utils.vector_from_angles(
                *utils.get_random_angles(self.rng)
            )
        else:
            start_angle = 2 * np.pi * self.rng.random()
            init_direction = utils.vector_from_angles(np.pi / 2, start_angle)
        self.add_colloid_on_point(
            radius_colloid=radius_colloid,
            init_position=start_pos,
            init_direction=init_direction,
            type_colloid=type_colloid,
            gamma_translation=gamma_translation,
            gamma_rotation=gamma_rotation,
            aspect_ratio=aspect_ratio,
            mass=mass,
            rinertia=rinertia,
        )

add_confining_walls(wall_type)

Walls on the edges of the box, will interact with particles through WCA. Is NOT communicated to the interaction models, though.

Parameters

wall_type : int Wall interacts with particles, so it needs its own type.

Returns
Source code in swarmrl/engine/espresso.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
def add_confining_walls(self, wall_type: int):
    """
    Walls on the edges of the box, will interact with particles through WCA.
    Is NOT communicated to the interaction models, though.

    Parameters
    ----------
    wall_type : int
        Wall interacts with particles, so it needs its own type.

    Returns
    -------
    """
    self._check_already_initialised()
    if wall_type in self.colloid_radius_register.keys():
        raise ValueError(
            f"wall type {wall_type} is already taken "
            "by other system component. Choose a new one"
        )

    wall_shapes = []
    wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[1, 0, 0]))
    wall_shapes.append(
        espressomd.shapes.Wall(dist=-self.system.box_l[0], normal=[-1, 0, 0])
    )
    wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[0, 1, 0]))
    wall_shapes.append(
        espressomd.shapes.Wall(dist=-self.system.box_l[1], normal=[0, -1, 0])
    )
    if self.n_dims == 3:
        wall_shapes.append(espressomd.shapes.Wall(dist=0, normal=[0, 0, 1]))
        wall_shapes.append(
            espressomd.shapes.Wall(dist=-self.system.box_l[2], normal=[0, 0, -1])
        )

    for wall_shape in wall_shapes:
        constr = espressomd.constraints.ShapeBasedConstraint(
            shape=wall_shape, particle_type=wall_type, penetrable=False
        )
        self.system.constraints.add(constr)

    # the wall itself has no radius, only the particle radius counts
    self.colloid_radius_register.update(
        {wall_type: {"radius": 0.0, "aspect_ratio": 1.0}}
    )

add_const_force_to_colloids(force, type)

Parameters

force: pint.Quantity A Quantity of numpy array, e.g. f = Quantity(np.array([1,2,3]), "newton") type: int The type of colloid that gets the force. Needs to be already added to the engine

Source code in swarmrl/engine/espresso.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
def add_const_force_to_colloids(self, force: pint.Quantity, type: int):
    """
    Parameters
    ----------
    force: pint.Quantity
        A Quantity of numpy array, e.g. f = Quantity(np.array([1,2,3]), "newton")
    type: int
        The type of colloid that gets the force.
        Needs to be already added to the engine
    """
    force_simunits = force.m_as("sim_force")
    parts = self.system.part.select(type=type)
    if len(parts) == 0:
        raise ValueError(
            f"Particles of type {type} not added to engine. "
            f"You currently have {self.colloid_radius_register.keys()}"
        )
    parts.ext_force = force_simunits

add_external_potential(potential, grid_spacings)

Parameters

potential: pint.Quantity[np.array] The flowfield to add, given as a pint Quantity of a numpy array with units of energy. Must have shape (n_cells_x, n_cells_y, n_cells_z) The potential values must be centered in the corresponding grid, e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the potential at np.dot([idx_x+0.5, idx_y+0.5, idx_z+0.5], [agrid_x, agrid_y, agrid_z]). From these points, the potential is interpolated to the particle positions. grid_spacings: pint.Quantity[np.array] This grid spacing will be used to fit the potential into the simulation box. If you run a 2d-simulation, choose grid_spacings[2]=box_l.

Source code in swarmrl/engine/espresso.py
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
def add_external_potential(
    self, potential: pint.Quantity, grid_spacings: pint.Quantity
):
    """
    Parameters
    ----------
    potential: pint.Quantity[np.array]
        The flowfield to add, given as a pint Quantity of a numpy array
        with units of energy.
        Must have shape (n_cells_x, n_cells_y, n_cells_z)
        The potential values must be centered in the corresponding grid,
        e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the potential at
        np.dot([idx_x+0.5, idx_y+0.5, idx_z+0.5], [agrid_x, agrid_y, agrid_z]).
        From these points, the potential is interpolated to the particle positions.
    grid_spacings: pint.Quantity[np.array]
        This grid spacing will be used to fit the potential into the simulation box.
        If you run a 2d-simulation, choose grid_spacings[2]=box_l.
    """

    pot = potential.m_as("sim_energy")
    agrids = grid_spacings.m_as("sim_length")

    if not pot.ndim == 3:
        raise ValueError(
            "potential must have shape (n_cells_x, n_cells_y, n_cells_z)"
        )
    if not len(grid_spacings) == 3:
        raise ValueError("Grid spacings must have length of 3")

    # Espresso constraint field must be one cell larger in all directions
    # for interpolation. Apply periodic boundary conditions.
    pot_padded = np.pad(
        pot,
        pad_width=1,
        mode="wrap",
    )
    pot_constraint = espressomd.constraints.PotentialField(
        field=pot_padded[:, :, :, np.newaxis],
        grid_spacing=agrids,
        default_scale=1.0,
    )
    self.system.constraints.add(pot_constraint)

add_flowfield(flowfield, friction_coeff, grid_spacings)

Parameters

flowfield: pint.Quantity[np.array] The flowfield to add, given as a pint Quantity of a numpy array with units of velocity. Must have shape (n_cells_x, n_cells_y, n_cells_z, 3) The velocity values must be centered in the corresponding grid, e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the velocity at np.dot([idx_x+0.5,idx_y+0.5,idx_z+0.5],[agrid_x,agrid_y,agrid_z]). From these points, the velocity is interpolated to the particle positions. friction_coeff: pint.Quantity[float] The friction coefficient in units of mass/time. Espresso does not yet support particle-specific or anisotropic friction coefficients for flow coupling, so one scalar value has to be provided here which will be used for all particles. grid_spacings: pint.Quantity[np.array] This grid spacing will be used to fit the flowfield into the simulation box. If you run a 2d-simulation, choose grid_spacings[2]=box_l.

Source code in swarmrl/engine/espresso.py
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
def add_flowfield(
    self,
    flowfield: pint.Quantity,
    friction_coeff: pint.Quantity,
    grid_spacings: pint.Quantity,
):
    """
    Parameters
    ----------
    flowfield: pint.Quantity[np.array]
        The flowfield to add, given as a pint Quantity of a numpy array
        with units of velocity.
        Must have shape (n_cells_x, n_cells_y, n_cells_z, 3)
        The velocity values must be centered in the corresponding grid,
        e.g. the [idx_x,idx_y,idx_z, :] value of the array contains the velocity at
        np.dot([idx_x+0.5,idx_y+0.5,idx_z+0.5],[agrid_x,agrid_y,agrid_z]).
        From these points, the velocity is interpolated to the particle positions.
    friction_coeff: pint.Quantity[float]
        The friction coefficient in units of mass/time.
        Espresso does not yet support particle-specific or anisotropic
        friction coefficients for flow coupling, so one scalar value has to be
        provided here which will be used for all particles.
    grid_spacings: pint.Quantity[np.array]
        This grid spacing will be used to fit the flowfield into the simulation box.
        If you run a 2d-simulation, choose grid_spacings[2]=box_l.
    """

    if not self.params.thermostat_type == "langevin":
        raise RuntimeError(
            "Coupling to a flowfield does not work with a Brownian thermostat. Use"
            " 'langevin'."
        )

    flow = flowfield.m_as("sim_velocity")
    gamma = friction_coeff.m_as("sim_mass/sim_time")
    agrids = grid_spacings.m_as("sim_length")

    if not flow.ndim == 4:
        raise ValueError(
            "flowfield must have shape (n_cells_x, n_cells_y, n_cells_z, 3)"
        )
    if not len(grid_spacings) == 3:
        raise ValueError("Grid spacings must have length of 3")

    # espresso constraint field must be one grid larger in all directions
    # for interpolation. Apply periodic boundary conditions
    flow_padded = np.stack(
        [np.pad(flow[:, :, :, i], mode="wrap", pad_width=1) for i in range(3)],
        axis=3,
    )
    flow_constraint = espressomd.constraints.FlowField(
        field=flow_padded, gamma=gamma, grid_spacing=agrids
    )
    self.system.constraints.add(flow_constraint)

add_lattice_boltzmann(agrid=None, lb_time_step=None, dynamic_viscosity=None, fluid_density=None, boundary_mask=None, ext_force_density=None, use_GPU=False)

Add a lattice boltzmann fluid to the simulation.

Parameters:
pint.Quantity, scalar

The uniform grid spacing in all 3 dimensions. Must be compatible with params.box_length.

lb_time_step: pint.Quantity, scalar, optional Lb time step, must be integer multiple of params.time_step. Default: params.time_step dynamic_viscosity: pint.Quantity, scalar, optional default: self.params.fluid_dyn_viscosity only change if you know what you are doing fluid_density: pint.Quantity, scalar, optional default: 1000kg/m3 boundary_mask: np.array, optional: A 3D boolean array that defines the no-slip boundary cells of the fluid. Must be compatible with the grid that gets generated from params.box_length and agrid. ext_force_density: pint.Quantity, 3d vector, optional. default: [0,0,0] N/m3

Source code in swarmrl/engine/espresso.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
def add_lattice_boltzmann(
    self,
    agrid: pint.Quantity = None,
    lb_time_step: pint.Quantity = None,
    dynamic_viscosity: pint.Quantity = None,
    fluid_density: pint.Quantity = None,
    boundary_mask: np.array = None,
    ext_force_density: pint.Quantity = None,
    use_GPU: bool = False,
):
    """
    Add a lattice boltzmann fluid to the simulation.

    Parameters:
    -----------

    agrid: pint.Quantity, scalar
        The uniform grid spacing in all 3 dimensions.
        Must be compatible with params.box_length.
    lb_time_step: pint.Quantity, scalar, optional
        Lb time step, must be integer multiple of params.time_step.
        Default: params.time_step
    dynamic_viscosity: pint.Quantity, scalar, optional
        default: self.params.fluid_dyn_viscosity
        only change if you know what you are doing
    fluid_density: pint.Quantity, scalar, optional
        default: 1000kg/m**3
    boundary_mask: np.array, optional:
        A 3D boolean array that defines the no-slip boundary cells of the fluid.
        Must be compatible with the grid that gets generated
        from params.box_length and agrid.
    ext_force_density: pint.Quantity, 3d vector, optional.
        default: [0,0,0] N/m**3
    """
    if not self.params.thermostat_type == "langevin":
        raise RuntimeError(
            "Coupling to lattice boltzmann does not work with a Brownian"
            " thermostat. Use 'langevin'."
        )

    if agrid is None:
        raise ValueError("agrid must be provided")
    if lb_time_step is None:
        lb_time_step = self.params.time_step
    if dynamic_viscosity is None:
        dynamic_viscosity = self.params.fluid_dyn_viscosity
    if fluid_density is None:
        fluid_density = self.ureg.Quantity(1000, "kg/m**3")
    if ext_force_density is None:
        ext_force_density = self.ureg.Quantity(np.zeros(3), "N/m**3")
    if use_GPU:
        raise NotImplementedError(
            "GPU support is not yet implemented. Stay tuned tho"
        )

    lbf = espressomd.lb.LBFluidWalberla(
        tau=lb_time_step.m_as("sim_time"),
        kT=(self.params.temperature * self.ureg.boltzmann_constant).m_as(
            "sim_energy"
        ),
        density=fluid_density.m_as("sim_mass/sim_length**3"),
        kinematic_viscosity=(dynamic_viscosity / fluid_density).m_as(
            "sim_kin_viscosity"
        ),
        agrid=agrid.m_as("sim_length"),
        seed=self.seed,
        ext_force_density=ext_force_density.m_as("sim_force/sim_length**3"),
    )

    if boundary_mask is not None:
        from espressomd.script_interface import array_variant

        if not np.all(lbf.shape == boundary_mask.shape):
            raise ValueError(
                "boundary_mask must have the same shape as the fluid grid"
            )

        lbf.call_method(
            "add_boundary_from_shape",
            raster=array_variant(boundary_mask.astype(int).flatten()),
            values=array_variant(np.zeros(3, dtype=float).flatten()),
        )

    self.lbf = lbf

    return lbf

add_rod(rod_center=None, rod_length=None, rod_thickness=None, rod_start_angle=None, n_particles=None, friction_trans=None, friction_rot=None, rod_particle_type=None, fixed=True)

Add a rod to the system. A rod consists of n_particles point particles that are rigidly connected and rotate/move as a whole Parameters


rod_center default: center of the box rod_length default: 100 micrometer rod_thickness default: 5 micrometer Make sure there are enough particles. If the thickness is too thin, the rod might get holes rod_start_angle default: 0 n_particles default: 101 Must be uneven number such that there always is a central particle friction_trans Irrelevant if fixed==True must be provided friction_rot must be provided rod_particle_type The rod is made out of points so they get their own type. fixed Fixes the central particle of the rod.

Returns

The espresso handle to the central particle. For debugging purposes only

Source code in swarmrl/engine/espresso.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def add_rod(
    self,
    rod_center: pint.Quantity = None,
    rod_length: pint.Quantity = None,
    rod_thickness: pint.Quantity = None,
    rod_start_angle: float = None,
    n_particles: int = None,
    friction_trans: pint.Quantity = None,
    friction_rot: pint.Quantity = None,
    rod_particle_type: int = None,
    fixed: bool = True,
):
    """
    Add a rod to the system.
    A rod consists of n_particles point particles that are rigidly connected
    and rotate/move as a whole
    Parameters
    ----------
    rod_center
        default: center of the box
    rod_length
        default: 100 micrometer
    rod_thickness
        default: 5 micrometer
        Make sure there are enough particles.
        If the thickness is too thin, the rod might get holes
    rod_start_angle
        default: 0
    n_particles
        default: 101
        Must be uneven number such that there always is a central particle
    friction_trans
        Irrelevant if fixed==True
        must be provided
    friction_rot
        must be provided
    rod_particle_type
        The rod is made out of points so they get their own type.
    fixed
        Fixes the central particle of the rod.

    Returns
    -------
    The espresso handle to the central particle. For debugging purposes only
    """
    self._check_already_initialised()

    if rod_center is None:
        rod_center = self.params.box_length / 2.0
    if rod_length is None:
        rod_length = self.ureg.Quantity(100, "micrometer")
    if rod_thickness is None:
        rod_thickness = self.ureg.Quantity(5, "micrometer")
    if rod_start_angle is None:
        rod_start_angle = 0
    if n_particles is None:
        n_particles = 101
    if friction_trans is None and not fixed:
        raise ValueError(
            "If you want the rod to move, you must provide a friction coefficient"
        )
    if friction_rot is None:
        raise ValueError("You must provide a rotational friction coefficient")
    if rod_particle_type is None:
        raise ValueError("You must provide a particle type for the rod")

    if self.n_dims != 2:
        raise ValueError("Rod can only be added in 2d")
    if rod_center[2].magnitude != 0:
        raise ValueError(f"Rod center z-component must be 0. You gave {rod_center}")
    if n_particles % 2 != 1:
        raise ValueError(f"n_particles must be uneven. You gave {n_particles}")

    espressomd.assert_features(["VIRTUAL_SITES_RELATIVE"])
    import espressomd.virtual_sites as evs

    self.system.virtual_sites = evs.VirtualSitesRelative(have_quaternion=True)

    center_pos = rod_center.m_as("sim_length")
    fric_trans = friction_trans.m_as("sim_force/sim_velocity")  # [F / v]
    fric_rot = friction_rot.m_as(
        "sim_force * sim_length *  sim_time"
    )  # [M / omega]
    partcl_radius = rod_thickness.m_as("sim_length") / 2

    # place the real particle
    center_part = self.system.part.add(
        pos=center_pos,
        quat=[1, 0, 0, 0],
        rotation=3 * [True],
        fix=[fixed, fixed, True],
        gamma=fric_trans,
        gamma_rot=fric_rot,
        type=rod_particle_type,
    )
    self._rotate_colloid_to_2d(center_part, rod_start_angle)
    self.colloids.append(center_part)

    # place virtual
    point_span = rod_length.m_as("sim_length") - 2 * partcl_radius
    point_dist = point_span / (n_particles - 1)
    if point_dist > 2 * partcl_radius:
        logger.warning(
            "your rod has holes. "
            f"Particle radius {partcl_radius} "
            f"particle_distance {point_dist} "
            "(both in simulation units)"
        )

    director = utils.vector_from_angles(np.pi / 2, rod_start_angle)

    for k in range(n_particles - 1):
        dist_to_center = (-1) ** k * (k // 2 + 1) * point_dist
        pos_virt = center_pos + dist_to_center * director
        virtual_partcl = self.system.part.add(
            pos=pos_virt, director=director, virtual=True, type=rod_particle_type
        )
        virtual_partcl.vs_auto_relate_to(center_part)
        self.colloids.append(virtual_partcl)

    self.colloid_radius_register.update(
        {rod_particle_type: {"radius": partcl_radius, "aspect_ratio": 1.0}}
    )
    return center_part

add_walls(wall_start_point, wall_end_point, wall_type, wall_thickness)

User defined walls will interact with particles through WCA. Is NOT communicated to the interaction models, though. The walls have a large height resulting in 2D-walls in a 2D-simulation. The actual height adapts to the chosen box size. The shape of the underlying constraint is a square.

Parameters

wall_start_point : pint.Quantity np.array (n,2) with wall coordinates [x_begin, y_begin] wall_end_point : pint.Quantity np.array (n,2) with wall coordinates [x_end, y_end] wall_type : int Wall interacts with particles, so it needs its own type. wall_thickness: pint.Quantity wall thickness

Returns
Source code in swarmrl/engine/espresso.py
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
def add_walls(
    self,
    wall_start_point: pint.Quantity,
    wall_end_point: pint.Quantity,
    wall_type: int,
    wall_thickness: pint.Quantity,
):
    """
    User defined walls will interact with particles through WCA.
    Is NOT communicated to the interaction models, though.
    The walls have a large height resulting in 2D-walls in a 2D-simulation.
    The actual height adapts to the chosen box size.
    The shape of the underlying constraint is a square.

    Parameters
    ----------
    wall_start_point : pint.Quantity
    np.array (n,2) with wall coordinates
         [x_begin, y_begin]
    wall_end_point : pint.Quantity
    np.array (n,2) with wall coordinates
         [x_end, y_end]
    wall_type : int
        Wall interacts with particles, so it needs its own type.
    wall_thickness: pint.Quantity
        wall thickness

    Returns
    -------
    """

    wall_start_point = wall_start_point.m_as("sim_length")
    wall_end_point = wall_end_point.m_as("sim_length")
    wall_thickness = wall_thickness.m_as("sim_length")

    if len(wall_start_point) != len(wall_end_point):
        raise ValueError(
            " Please double check your walls. There are more or less "
            f" starting points {len(wall_start_point)} than "
            f" end points {len(wall_end_point)}. They should be equal."
        )

    self._check_already_initialised()
    if wall_type in self.colloid_radius_register.keys():
        if self.colloid_radius_register[wall_type] != 0.0:
            raise ValueError(
                f" The chosen type {wall_type} is already taken"
                "and used with a different radius "
                f"{self.colloid_radius_register[wall_type]['radius']}."
                " Choose a new combination"
            )

    z_height = self.system.box_l[2]
    wall_shapes = []

    for wall_index in range(len(wall_start_point)):
        a = [
            wall_end_point[wall_index, 0] - wall_start_point[wall_index, 0],
            wall_end_point[wall_index, 1] - wall_start_point[wall_index, 1],
            0,
        ]  # direction along lengthy wall
        c = [0, 0, z_height]  # direction along third axis of 2D simulation
        norm_a = np.linalg.norm(a)  # is also the norm of b
        norm_c = np.linalg.norm(c)
        b = (
            np.cross(a / norm_a, c / norm_c) * wall_thickness
        )  # direction along second axis
        # i.e along wall_thickness of lengthy wall
        corner = [
            wall_start_point[wall_index, 0] - b[0] / 2,
            wall_start_point[wall_index, 1] - b[1] / 2,
            0,
        ]  # anchor point of wall shifted by wall_thickness*1/2

        wall_shapes.append(
            espressomd.shapes.Rhomboid(corner=corner, a=a, b=b, c=c, direction=1)
        )

    for wall_shape in wall_shapes:
        constr = espressomd.constraints.ShapeBasedConstraint(
            shape=wall_shape, particle_type=wall_type, penetrable=False
        )
        self.system.constraints.add(constr)

    # the wall itself has no radius, only the particle radius counts
    self.colloid_radius_register.update(
        {wall_type: {"radius": 0.0, "aspect_ratio": 1.0}}
    )

finalize()

Method to clean up after finishing the simulation

Method will write the last chunks of trajectory

Source code in swarmrl/engine/espresso.py
1308
1309
1310
1311
1312
1313
1314
def finalize(self):
    """
    Method to clean up after finishing the simulation

    Method will write the last chunks of trajectory
    """
    self._write_traj_chunk_to_file()

get_friction_coefficients(type)

Returns both the translational and the rotational friction coefficient of the desired type in simulation units

Source code in swarmrl/engine/espresso.py
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
def get_friction_coefficients(self, type: int):
    """
    Returns both the translational and the rotational friction coefficient
    of the desired type in simulation units
    """
    property_dict = self.colloid_radius_register.get(type, None)
    if property_dict is None:
        raise ValueError(
            f"cannot get friction coefficient for type {type}. Did you actually add"
            " that particle type?"
        )
    return _calc_friction_coefficients(
        self.params.fluid_dyn_viscosity.m_as("sim_dyn_viscosity"),
        property_dict["radius"],
    )

get_particle_data()

Collect specific particle information from the colloids.

Returns

information : dict A dict of information for all of the colloids in the system including unwrapped positions, velocities, and the directors of the colloids.

Source code in swarmrl/engine/espresso.py
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
def get_particle_data(self):
    """
    Collect specific particle information from the colloids.

    Returns
    -------
    information : dict
            A dict of information for all of the colloids in the system including
            unwrapped positions, velocities, and the directors of the colloids.
    """
    return {
        "Id": np.array([c.id for c in self.colloids]),
        "Type": np.array([c.type for c in self.colloids]),
        "Unwrapped_Positions": np.stack([c.pos for c in self.colloids]),
        "Velocities": np.stack([c.v for c in self.colloids]),
        "Directors": np.stack([c.director for c in self.colloids]),
    }

get_unit_system()

Collect the pin unit registry.

Returns

unit_registry: object The class unit registry.

Source code in swarmrl/engine/espresso.py
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
def get_unit_system(self):
    """
    Collect the pin unit registry.

    Returns
    -------
    unit_registry: object
            The class unit registry.
    """
    return self.ureg

integrate(n_slices, force_model=None)

Integrate the system for n_slices steps.

Parameters

n_slices : int Number of integration steps to run. force_model : ForceFunction A SwarmRL interaction model to decide particle interaction rules.

Returns

Runs the simulation environment.

Source code in swarmrl/engine/espresso.py
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
def integrate(self, n_slices, force_model: ForceFunction = None):
    """
    Integrate the system for n_slices steps.

    Parameters
    ----------
    n_slices : int
            Number of integration steps to run.
    force_model : ForceFunction
            A SwarmRL interaction model to decide particle interaction rules.

    Returns
    -------
    Runs the simulation environment.
    """

    if not self.integration_initialised:
        self.slice_idx = 0
        self.step_idx = 0
        self._setup_interactions()
        self._remove_overlap()
        self._init_h5_output()
        self.integration_initialised = True

    old_slice_idx = self.slice_idx

    while self.step_idx < self.params.steps_per_slice * (old_slice_idx + n_slices):
        if self.step_idx == self.params.steps_per_write_interval * self.write_idx:
            self._update_traj_holder()
            self.write_idx += 1

            if len(self.traj_holder["Times"]) >= self.write_chunk_size:
                self._write_traj_chunk_to_file()
                for val in self.traj_holder.values():
                    val.clear()

        # Break the simulaion if the kill switch is engaged.
        if force_model is not None:
            if force_model.kill_switch:
                break

        if self.step_idx == self.params.steps_per_slice * self.slice_idx:
            self.slice_idx += 1
            self.manage_forces(force_model)

        steps_to_next_write = (
            self.params.steps_per_write_interval * self.write_idx - self.step_idx
        )
        steps_to_next_slice = (
            self.params.steps_per_slice * self.slice_idx - self.step_idx
        )
        steps_to_next = min(steps_to_next_write, steps_to_next_slice)

        self.system.integrator.run(
            steps_to_next, reuse_forces=True, recalc_forces=False
        )
        self.step_idx += steps_to_next

manage_forces(force_model=None)

Manage external forces.

Collect the forces from the force function and apply them to the colloids.

Parameters

force_model : ForceFunction Model with which to compute external forces.

Source code in swarmrl/engine/espresso.py
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
def manage_forces(self, force_model: ForceFunction = None) -> bool:
    """
    Manage external forces.

    Collect the forces from the force function and apply them to the colloids.

    Parameters
    ----------
    force_model : ForceFunction
        Model with which to compute external forces.
    """
    swarmrl_colloids = []
    if force_model is not None:
        for col in self.colloids:
            swarmrl_colloids.append(
                Colloid(
                    pos=col.pos,
                    velocity=col.v,
                    director=col.director,
                    id=col.id,
                    type=col.type,
                )
            )
        actions = force_model.calc_action(swarmrl_colloids)
        for action, coll in zip(actions, self.colloids):
            coll.swimming = {"f_swim": action.force}
            coll.ext_torque = (
                action.torque
                if action.torque is not None
                else np.zeros(
                    3,
                )
            )
            new_direction = action.new_direction
            if new_direction is not None:
                if self.n_dims == 3:
                    coll.director = new_direction
                else:
                    old_direction = coll.director
                    rotation_angle = np.arccos(np.dot(new_direction, old_direction))
                    if rotation_angle > 1e-6:
                        rotation_axis = np.cross(old_direction, new_direction)
                        rotation_axis /= np.linalg.norm(rotation_axis)
                        # only values of [0,0,1], [0,0,-1] can come out here,
                        # plusminus numerical errors
                        rotation_axis = [0, 0, round(rotation_axis[2])]
                        coll.rotate(axis=rotation_axis, angle=rotation_angle)

MDParams dataclass

class to hold all information needed to setup and run the MD simulation. Provide in whichever unit you want, all quantities will be converted to simulation units during setup

non-obvious attributes

time_slice: MD runs with internal time step of time_step. The external force/torque from the force_model will not be updated at every single time step, instead every time_slice. Therefore, time_slice must be an integer multiple of time_step. periodic: optional Enable/disable periodic boundary conditions. Default: enabled thermostat_type: optional One of "brownian", "langevin", see https://espressomd.github.io/doc/integration.html for details of the algorithms.

Source code in swarmrl/engine/espresso.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@dataclasses.dataclass()
class MDParams:
    """
    class to hold all information needed to setup and run the MD simulation.
    Provide in whichever unit you want, all quantities will be converted to simulation
    units during setup

    non-obvious attributes
    ----------------------
    time_slice:
        MD runs with internal time step of time_step. The external force/torque from
        the force_model will not be updated at every single time step, instead every
        time_slice. Therefore, time_slice must be an integer multiple of time_step.
    periodic: optional
        Enable/disable periodic boundary conditions. Default: enabled
    thermostat_type: optional
        One of "brownian", "langevin",
        see https://espressomd.github.io/doc/integration.html
        for details of the algorithms.
    """

    def __init__(
        self,
        ureg: pint.UnitRegistry,
        box_length: pint.Quantity = None,
        fluid_dyn_viscosity: pint.Quantity = None,
        WCA_epsilon: pint.Quantity = None,
        temperature: pint.Quantity = None,
        time_step: pint.Quantity = None,
        time_slice: pint.Quantity = None,
        write_interval: pint.Quantity = None,
        periodic: bool = True,
        thermostat_type: str = "brownian",
    ):

        if box_length is None:
            box_length = ureg.Quantity(3 * [1000], "micrometer")
        if fluid_dyn_viscosity is None:
            fluid_dyn_viscosity = ureg.Quantity(1e-3, "pascal*second")
        if WCA_epsilon is None:
            WCA_epsilon = ureg.Quantity(300, "kelvin") * ureg.boltzmann_constant
        if temperature is None:
            temperature = ureg.Quantity(300, "kelvin")
        if time_step is None:
            time_step = ureg.Quantity(1e-3, "second")
        if time_slice is None:
            time_slice = ureg.Quantity(1e-1, "second")
        if write_interval is None:
            write_interval = ureg.Quantity(1, "second")

        self.ureg: pint.UnitRegistry = ureg
        self.box_length: pint.Quantity = box_length
        self.fluid_dyn_viscosity: pint.Quantity = fluid_dyn_viscosity
        self.WCA_epsilon: pint.Quantity = WCA_epsilon
        self.temperature: pint.Quantity = temperature
        self.time_step: pint.Quantity = time_step
        self.time_slice: pint.Quantity = time_slice
        self.write_interval: pint.Quantity = write_interval
        self.periodic: bool = periodic
        self.thermostat_type: str = thermostat_type