Relay MPI

The Conduit Relay MPI library enables MPI communication using conduit::Node instances as payloads. It provides two categories of functionality: Known Schema Methods and Generic Methods. These categories balance flexibility and performance tradeoffs. In all cases the implementation tries to avoid unnecessary reallocation, subject to the constraints of MPI’s API input requirements.

Known Schema Methods

Methods that transfer a Node’s data, assuming the schema is known. They assume that Nodes used for output are implicitly compatible with their sources.

Supported MPI Primitives:
  • send/recv
  • isend/irecv
  • reduce/all_reduce
  • broadcast
  • gather/all_gather

For both point to point and collectives, here is the basic logic for how input Nodes are treated by these methods:

  • For Nodes holding data to be sent:
  • If the Node is compact and contiguously allocated, the Node’s pointers are passed directly to MPI.
  • If the Node is not compact or not contiguously allocated, the data is compacted to temporary contiguous buffers that are passed to MPI.
  • For Nodes used to hold output data:
  • If the output Node is compact and contiguously allocated, the Node’s pointers are passed directly to MPI.
  • If the output Node is not compact or not contiguously allocated, a Node with a temporary contiguous buffer is created and that buffer is passed to MPI. An update call is used to copy out the data from the temporary buffer to the output Node. This avoids re-allocation and modifying the schema of the output Node.

Generic Methods

Methods that transfer both a Node’s data and schema. These are useful for generic messaging, since the schema does not need to be known by receiving tasks. The semantics of MPI place constraints on what can be supported in this category.

Supported MPI Primitives:
  • send/recv
  • gather/all_gather
  • broadcast
Unsupported MPI Primitives:
  • isend/irecv
  • reduce/all_reduce

For both point to point and collectives, here is the basic logic for how input Nodes are treated by these methods:

  • For Nodes holding data to be sent:
  • If the Node is compact and contiguously allocated:
  • The Node’s schema is sent as JSON
  • The Node’s pointers are passed directly to MPI
  • If the Node is not compact or not contiguously allocated:
  • The Node is compacted to temporary Node
  • The temporary Node’s schema is sent as JSON
  • The temporary Nodes’s pointers are passed to MPI
  • For Nodes used to hold output data:
  • If the output Node is not compatible with the received schema, it is reset using the received schema.
  • If the output Node is compact and contiguously allocated, its pointers are passed directly to MPI.
  • If the output Node is not compact or not contiguously allocated, a Node with a temporary contiguous buffer is created and that buffer is passed to MPI. An update call is used to copy out the data from the temporary buffer to the output Node. This avoids re-allocation and modifying the schema of the output Node.

Python Relay MPI Module

Relay MPI is supported in Python via the conduit.relay.mpi module. Methods take Fortran-style MPI communicator handles which are effectively integers. (We hope to also support direct use of mpi4py communicator objects in the future.)

Use the following to get a handle from the mpi4py world communicator:

from mpi4py import MPI
comm_id   = MPI.COMM_WORLD.py2f()

Python Relay MPI Module Examples

Send and Receive Using Schema

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# Note: example expects 2 mpi tasks

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# send a node and its schema from rank 0 to rank 1
n = conduit.Node()
if comm_rank == 0:
    # setup node to send on rank 0
    n["a/data"]   = 1.0
    n["a/more_data"] = 2.0
    n["a/b/my_string"] = "value"

# show node data on rank 0
if comm_rank == 0:
    print("[rank: {}] sending: {}".format(comm_rank,n.to_yaml()))

if comm_rank == 0:
    relay.mpi.send_using_schema(n,dest=1,tag=0,comm=comm_id)
else:
    relay.mpi.recv_using_schema(n,source=0,tag=0,comm=comm_id)

# show received node data on rank 1
if comm_rank == 1:
    print("[rank: {}] received: {}".format(comm_rank,n.to_yaml()))

  • Output:
 [rank: 0] sending: 
 a: 
   data: 1.0
   more_data: 2.0
   b: 
     my_string: "value"
 
 [rank: 1] received: 
 a: 
   data: 1.0
   more_data: 2.0
   b: 
     my_string: "value"
 
 

Send and Receive

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# Note: example expects 2 mpi tasks

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# send data from a node on rank 0 to rank 1
# (both ranks have nodes with compatible schemas)
n = conduit.Node(conduit.DataType.int64(4))
if comm_rank == 0:
    # setup node to send on rank 0
    vals = n.value()
    for i in range(4):
        vals[i] = i * i

# show node data on rank 0
if comm_rank == 0:
    print("[rank: {}] sending: {}".format(comm_rank,n.to_yaml()))

if comm_rank == 0:
    relay.mpi.send(n,dest=1,tag=0,comm=comm_id)
else:
    relay.mpi.recv(n,source=0,tag=0,comm=comm_id)

# show received node data on rank 1
if comm_rank == 1:
    print("[rank: {}] received: {}".format(comm_rank,n.to_yaml()))

  • Output:
 [rank: 0] sending: [0, 1, 4, 9]
 [rank: 1] received: [0, 1, 4, 9]
 

Send and Receive

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# Note: example expects 2 mpi tasks

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# send data from a node on rank 0 to rank 1
# (both ranks have nodes with compatible schemas)
n = conduit.Node(conduit.DataType.int64(4))
if comm_rank == 0:
    # setup node to send on rank 0
    vals = n.value()
    for i in range(4):
        vals[i] = i * i

# show node data on rank 0
if comm_rank == 0:
    print("[rank: {}] sending: {}".format(comm_rank,n.to_yaml()))

if comm_rank == 0:
    relay.mpi.send(n,dest=1,tag=0,comm=comm_id)
else:
    relay.mpi.recv(n,source=0,tag=0,comm=comm_id)

# show received node data on rank 1
if comm_rank == 1:
    print("[rank: {}] received: {}".format(comm_rank,n.to_yaml()))

  • Output:
 [rank: 0] sending: [0, 1, 4, 9]
 [rank: 1] received: [0, 1, 4, 9]
 

Sum All Reduce

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# gather data all ranks
# (ranks have nodes with compatible schemas)
n = conduit.Node(conduit.DataType.int64(4))
n_res = conduit.Node(conduit.DataType.int64(4))
# data to reduce
vals = n.value()
for i in range(4):
    vals[i] = 1

relay.mpi.sum_all_reduce(n,n_res,comm=comm_id)
# answer should be an array with each value == comm_size
# show result on rank 0
if comm_rank == 0:
    print("[rank: {}] sum reduce result: {}".format(comm_rank,n_res.to_yaml()))

  • Output:
 [rank: 0] sum reduce result: [2, 2, 2, 2]
 

Broadcast Using Schema

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# Note: example expects 2 mpi tasks

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# send a node and its schema from rank 0 to rank 1
n = conduit.Node()
if comm_rank == 0:
    # setup node to broadcast on rank 0
    n["a/data"]   = 1.0
    n["a/more_data"] = 2.0
    n["a/b/my_string"] = "value"

# show node data on rank 0
if comm_rank == 0:
    print("[rank: {}] broadcasting: {}".format(comm_rank,n.to_yaml()))

relay.mpi.broadcast_using_schema(n,root=0,comm=comm_id)

# show received node data on rank 1
if comm_rank == 1:
    print("[rank: {}] received: {}".format(comm_rank,n.to_yaml()))

  • Output:
 [rank: 0] broadcasting: 
 a: 
   data: 1.0
   more_data: 2.0
   b: 
     my_string: "value"
 
 [rank: 1] received: 
 a: 
   data: 1.0
   more_data: 2.0
   b: 
     my_string: "value"
 
 

Broadcast

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# Note: example expects 2 mpi tasks

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

# send data from a node on rank 0 to rank 1
# (both ranks have nodes with compatible schemas)
n = conduit.Node(conduit.DataType.int64(4))
if comm_rank == 0:
    # setup node to send on rank 0
    vals = n.value()
    for i in range(4):
        vals[i] = i * i

# show node data on rank 0
if comm_rank == 0:
    print("[rank: {}] broadcasting: {}".format(comm_rank,n.to_yaml()))

relay.mpi.broadcast_using_schema(n,root=0,comm=comm_id)

# show received node data on rank 1
if comm_rank == 1:
    print("[rank: {}] received: {}".format(comm_rank,n.to_yaml()))

  • Output:
 [rank: 0] broadcasting: [0, 1, 4, 9]
 

All Gather Using Schema

  • Python Source:
import conduit
import conduit.relay as relay
import conduit.relay.mpi
from mpi4py import MPI

# get a comm id from mpi4py world comm
comm_id   = MPI.COMM_WORLD.py2f()
# get our rank and the comm's size
comm_rank = relay.mpi.rank(comm_id)
comm_size = relay.mpi.size(comm_id)

n = conduit.Node(conduit.DataType.int64(4))
n_res = conduit.Node()
# data to gather
vals = n.value()
for i in range(4):
    vals[i] = comm_rank

relay.mpi.all_gather_using_schema(n,n_res,comm=comm_id)
# show result on rank 0
if comm_rank == 0:
    print("[rank: {}] all gather using schema result: {}".format(comm_rank,n_res.to_yaml()))

  • Output:
 [rank: 0] all gather using schema result: 
 - [0, 0, 0, 0]
 - [1, 1, 1, 1]