Skip to content

Commit

Permalink
Merge pull request #7 from tskisner/more_tests
Browse files Browse the repository at this point in the history
Expand unit tests.
  • Loading branch information
tskisner authored Jul 15, 2021
2 parents 3871b5b + 7a58398 commit b3e28d4
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 29 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ jobs:
run: ./test_scripts/install_deps_github_ubuntu.sh && pip install twine
- name: Install Package
run: pip install .
- name: Run Tests
run: mpirun -np 2 python -c 'import pshmem.test; pshmem.test.run()'
- name: Run Serial Test
run: MPI_DISABLE=1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 1 Process
run: mpirun -np 1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 2 Processes
run: mpirun -np 2 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 4 Processes
run: mpirun -np 4 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Build source package
run: rm -rf dist && python setup.py sdist
- name: Build wheels
Expand Down
16 changes: 14 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ jobs:
run: ./test_scripts/install_deps_github_ubuntu.sh
- name: Install Package
run: pip3 install .
- name: Run Tests
- name: Run Serial Test
run: MPI_DISABLE=1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 1 Process
run: mpirun -np 1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 2 Processes
run: mpirun -np 2 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 4 Processes
run: mpirun -np 4 python3 -c 'import pshmem.test; pshmem.test.run()'
macos:
runs-on: macos-latest
strategy:
Expand All @@ -43,5 +49,11 @@ jobs:
run: ./test_scripts/install_deps_github_macos.sh
- name: Install Package
run: pip3 install .
- name: Run Tests
- name: Run Serial Test
run: MPI_DISABLE=1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 1 Process
run: mpirun -np 1 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 2 Processes
run: mpirun -np 2 python3 -c 'import pshmem.test; pshmem.test.run()'
- name: Run MPI Test on 4 Processes
run: mpirun -np 4 python3 -c 'import pshmem.test; pshmem.test.run()'
82 changes: 57 additions & 25 deletions pshmem/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ def setUp(self):
def tearDown(self):
pass

def test_allocate(self):
def read_write(self, comm):
"""Run a sequence of various access tests."""
rank = 0
procs = 1
if comm is not None:
rank = comm.rank
procs = comm.size

# Dimensions of our shared memory array
datadims = (2, 5, 10)

Expand All @@ -63,21 +70,21 @@ def test_allocate(self):
# For testing the "set()" method, every process is going to
# create a full-sized data buffer and fill it with its process rank.
local = np.ones(datadims, dtype=datatype)
local *= self.rank
local *= rank

# A context manager is the pythonic way to make sure that the
# object has no dangling reference counts after leaving the context,
# and will ensure that the shared memory is freed properly.

with MPIShared(local.shape, local.dtype, self.comm) as shm:
for p in range(self.procs):
with MPIShared(local.shape, local.dtype, comm) as shm:
for p in range(procs):
# Every process takes turns writing to the buffer.
setdata = None
setoffset = (0, 0, 0)

# Write to the whole data volume, but in small blocks
for upd in range(nupdate):
if p == self.rank:
if p == rank:
# My turn! Write my process rank to the buffer slab.
setdata = local[
setoffset[0] : setoffset[0] + updatedims[0],
Expand All @@ -89,22 +96,17 @@ def test_allocate(self):
shm.set(setdata, setoffset, fromrank=p)
except:
print(
"proc {} threw exception during set()".format(
self.rank
),
"proc {} threw exception during set()".format(rank),
flush=True,
)
if self.comm is not None:
self.comm.Abort()
if comm is not None:
comm.Abort()
else:
sys.exit(1)

try:
# Same as set(), but using __setitem__ with an
# allreduce to find which process is setting.
#
# key as a tuple of offsets
shm[setoffset] = setdata

# key as a tuple slices
if setdata is None:
Expand All @@ -118,20 +120,18 @@ def test_allocate(self):
except:
print(
"proc {} threw exception during __setitem__".format(
self.rank
rank
),
flush=True,
)
if self.comm is not None:
if comm is not None:
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(
exc_type, exc_value, exc_traceback
)
lines = [
"Proc {}: {}".format(self.rank, x) for x in lines
]
lines = ["Proc {}: {}".format(rank, x) for x in lines]
print("".join(lines), flush=True)
self.comm.Abort()
comm.Abort()
else:
raise

Expand Down Expand Up @@ -164,7 +164,7 @@ def test_allocate(self):

# Try full array assignment with slices containing None start
# values
if p != self.rank:
if p != rank:
shm[None] = None
else:
shm[:, :, :] = local
Expand All @@ -177,17 +177,49 @@ def test_allocate(self):
# buffer should appear as a C-contiguous ndarray whenever we slice
# along the last dimension.

for p in range(self.procs):
if p == self.rank:
for p in range(procs):
if p == rank:
slc = shm[1, 2]
print(
"proc {} slice has dims {}, dtype {}, C = {}".format(
p, slc.shape, slc.dtype.str, slc.flags["C_CONTIGUOUS"]
),
flush=True,
)
if self.comm is not None:
self.comm.barrier()
if comm is not None:
comm.barrier()

def test_world(self):
if self.comm is None:
print("Testing MPIShared without MPI...", flush=True)
elif self.comm.rank == 0:
print("Testing MPIShared with world communicator...", flush=True)
self.read_write(self.comm)

def test_split(self):
if self.comm is not None:
if self.comm.rank == 0:
print("Testing MPIShared with split grid communicator...", flush=True)
# Split the comm into a grid
n_y = int(np.sqrt(self.comm.size))
if n_y < 1:
n_y = 1
n_x = self.comm.size // n_y
y_rank = self.comm.rank // n_x
x_rank = self.comm.rank % n_x

x_comm = self.comm.Split(y_rank, x_rank)
y_comm = self.comm.Split(x_rank, y_rank)

self.read_write(x_comm)
self.read_write(y_comm)

def test_comm_self(self):
if self.comm is not None:
if self.comm.rank == 0:
print("Testing MPIShared with COMM_SELF...", flush=True)
# Every process does the operations on COMM_SELF
self.read_write(MPI.COMM_SELF)

def test_shape(self):
good_dims = [
Expand Down Expand Up @@ -256,7 +288,7 @@ def test_lock(self):

def run():
suite = unittest.TestSuite()
# suite.addTest(unittest.makeSuite(LockTest))
suite.addTest(unittest.makeSuite(LockTest))
suite.addTest(unittest.makeSuite(ShmemTest))
runner = unittest.TextTestRunner()
runner.run(suite)
Expand Down
49 changes: 49 additions & 0 deletions test_scripts/readme_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import numpy as np
from mpi4py import MPI

from pshmem import MPIShared

comm = MPI.COMM_WORLD

with MPIShared((3, 5), np.float64, comm) as shm:
# A copy of the data exists on every node and is initialized to zero.
# There is a numpy array "view" of that memory available with slice notation
# or by accessing the "data" member:
if comm.rank == 0:
# You can get a summary of the data by printing it:
print("String representation:\n")
print(shm)
print("\n===== Initialized Data =====")
for p in range(comm.size):
if p == comm.rank:
print("rank {}:\n".format(p), shm.data, flush=True)
comm.barrier()

set_data = None
set_offset = None
if comm.rank == 0:
set_data = np.arange(6, dtype=np.float64).reshape((2, 3))
set_offset = (1, 1)

# The set() method is collective, but the inputs only matter on one rank
shm.set(set_data, offset=set_offset, fromrank=0)

# You can also use the usual '[]' notation. However, this call must do an
# additional pre-communication to detect which process the data is coming from.
# And this line is still collective and must be called on all processes:
shm[set_offset] = set_data

# This updated data has now been replicated to the shared memory on all nodes.
if comm.rank == 0:
print("======= Updated Data =======")
for p in range(comm.size):
if p == comm.rank:
print("rank {}:\n".format(p), shm.data, flush=True)
comm.barrier()

# You can read from the node-local copy of the data from all processes,
# using either the "data" member or slice access:
if comm.rank == comm.size - 1:
print("==== Read-only access ======")
print("rank {}: shm[2, 3] = {}".format(comm.rank, shm[2, 3]), flush=True)
print("rank {}: shm.data = \n{}".format(comm.rank, shm.data), flush=True)
15 changes: 15 additions & 0 deletions test_scripts/test_cibuild.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash


docker run \
-it \
-v $(pwd):/home/pshmem \
quay.io/pypa/manylinux2014_x86_64:latest \
/bin/bash

# export PATH=/opt/python/cp38-cp38/bin:${PATH}
# python3 -m pip install --upgrade pip
# yum -y update
# yum -y install mpich-3.2-devel.x86_64 mpich-3.2-autoload.x86_64
# source /etc/profile.d/modules.sh
# source /etc/profile.d/mpich-3.2-x86_64.sh

0 comments on commit b3e28d4

Please sign in to comment.