Source code for txpipe.utils.mpi_utils

import numpy as np

[docs]def mpi_reduce_large(data, comm, max_chunk_count=2**30, root=0, op=None, debug=False): """Use MPI reduce in-place on an array, even a very large one. MPI Reduce is a reduction operation that will, (e.g.) sum arrays from different processors on a single process. It fails whenever the size of the array is greater than 2**31, due to an overflow. This version detects that case and divides the array up into chunks, running reduction on each one separately This specific call does in-place reduction, so that the root process overwrites its own array with the result. This minimizes memory usage. The default is to do a sum of all the arrays, and to collect at process zero, but those can be overridden. Parameters ---------- data: array can be any shape but must be contiguous comm: MPI communictator max_chunk_count: int Optional, default=2**30. Max number of items to allow to be sent at once root: int Optional, default=0. Rank of process to receive final result op: MPI operation Optional, default=None. MPI operation, e.g. MPI.PROD, MPI.MAX, etc. Default is to SUM debug: bool Optional, default=False. Whether to print out information from each rank """ from mpi4py.MPI import IN_PLACE, SUM if not data.flags['C_CONTIGUOUS']: raise ValueError("Cannot reduce non-contiguous array") # do a sum by default. Oher operation if op is None: op = SUM # flatten the array. crucially this does not make a copy, # as long as the data is contiguous, which we just checked data = data.reshape(data.size) size = data.size if not data.flags['C_CONTIGUOUS']: raise RuntimeError("It seems numpy has changed its semantics and " "has returned a non-contiguous array from reshape. " "You will have to rewrite the mpi_utils code.") start = 0 while start < size: end = min(start + max_chunk_count, size) if comm.rank == root: if debug: print(f"{comm.rank} receiving & summing {start} - {end}") comm.Reduce(IN_PLACE, data[start:end], root=root) else: if debug: print(f"{comm.rank} sending {start} - {end}") comm.Reduce(data[start:end], None, root=root) start = end
def in_place_reduce(data, comm, allreduce=False): import mpi4py.MPI if allreduce: comm.Allreduce(mpi4py.MPI.IN_PLACE, data) else: if comm.Get_rank() == 0: comm.Reduce(mpi4py.MPI.IN_PLACE, data) else: comm.Reduce(data, None) def test_reduce(): from mpi4py.MPI import COMM_WORLD as comm data = np.zeros((100,200)) + comm.rank + 1 mpi_reduce_large(data, comm, max_chunk_count=4500, debug=True) if comm.rank == 0: expected = (comm.size * (comm.size + 1)) // 2 assert np.allclose(data, expected) if __name__ == '__main__': test_reduce()