In this section, we give some sample code to get you started with programming your I/O using Sun MPI 4.0. We start with an example that shows how a parallel job can partition file data among its processes. Next we explore how you can adapt our initial example to use a broad range of other I/O programming styles supported by Sun MPI I/O. Finally, we present a sample code that illustrates the use of the nonblocking MPI I/O routines.
Before we start, remember that MPI I/O is part of MPI, so you must call MPI_Init before calling any MPI I/O routines and MPI_Finalize at the end of your program, even if you only use MPI I/O routines.
MPI I/O was designed to enable processes in a parallel job to request multiple data items that are noncontiguous within a file. Typically, a parallel job partitions file data among the processes.
One method of partitioning a file is to derive the offset at which to access data from the rank of the process. The rich set of MPI derived types also allows us to easily partition file data. For example, we could create an MPI vector type as the filetype passed into MPI_File_set_view. Since vector types do not end with a hole, a call must be made, either to MPI_Type_create_resized or to MPI_Type_ub, to complete the partition. This call extends the extent to include holes at the end of the type for processes with higher ranks. We create a partitioned file by passing different displacements to MPI_File_set_view. Each of these displacements would be derived from the process' rank. Consequently, offsets would not need to be derived from the ranks because only the data in that process' portion of the partition would be visible in that process' view.
In the following example, we use the first method where we derive the file offsets directly from the process' rank. Each process writes and reads NUM_INTS integers starting at the offset rank * NUM_INTS. We pass an explicit offset to our MPI I/O data-access routines MPI_File_write_at and MPI_File_read_at. We call MPI_Get_elements to find out how many elements were written or read. To verify that the write was successful, we compare the data written and read as well as set up an MPI_Barrier before calling MPI_File_get_size to verify that the file is the size that we expect upon completion of all the processes' writes.
Observe that we called MPI_File_set_view to set our view of the file as essentially an array of integers instead of the UNIX-like view of the file as an array of bytes. Thus, the offsets that we pass to MPI_File_write_at and MPI_File_read_at are indices into an array of integers and not a byte offset.
/* wr_at.c * * Example to demonstrate use of MPI_File_write_at and MPI_File_read_at * */ #include <stdio.h> #include "mpi.h" #define NUM_INTS 100 void sample_error(int error, char *string) { fprintf(stderr, "Error %d in %s\n", error, string); MPI_Finalize(); exit(-1); } void main( int argc, char **argv ) { char filename[128]; int i, rank, comm_size; int *buff1, *buff2; MPI_File fh; MPI_Offset disp, offset, file_size; MPI_Datatype etype, ftype, buftype; MPI_Info info; MPI_Status status; int result, count, differs; if(argc < 2) { fprintf(stdout, "Missing argument: filename\n"); exit(-1); } strcpy(filename, argv[1]); MPI_Init(&argc, &argv); /* get this processor's rank */ result = MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(result != MPI_SUCCESS) sample_error(result, "MPI_Comm_rank"); result = MPI_Comm_size(MPI_COMM_WORLD, &comm_size); if(result != MPI_SUCCESS) sample_error(result, "MPI_Comm_size"); /* communicator group MPI_COMM_WORLD opens file "foo" for reading and writing (and creating, if necessary) */ result = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDWR | MPI_MODE_CREATE, (int)NULL, &fh); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_open"); /* Set the file view which tiles the file type MPI_INT, starting at displacement 0. In this example, the etype is also MPI_INT. */ disp = 0; etype = MPI_INT; ftype = MPI_INT; info = (MPI_Info)NULL; result = MPI_File_set_view(fh, disp, etype, ftype, (char *)NULL, info); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_set_view"); /* Allocate and initialize a buffer (buff1) containing NUM_INTS integers, where the integer in location i is set to i. */ buff1 = (int *)malloc(NUM_INTS*sizeof(int)); for(i=0;i<NUM_INTS;i++) buff1[i] = i; /* Set the buffer type to also be MPI_INT, then write the buffer (buff1) starting at offset 0, i.e., the first etype in the file. */ buftype = MPI_INT; offset = rank * NUM_INTS; result = MPI_File_write_at(fh, offset, buff1, NUM_INTS, buftype, &status); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_write_at"); result = MPI_Get_elements(&status, MPI_BYTE, &count); if(result != MPI_SUCCESS) sample_error(result, "MPI_Get_elements"); if(count != NUM_INTS*sizeof(int)) fprintf(stderr, "Did not write the same number of bytes as requested\n"); else fprintf(stdout, "Wrote %d bytes\n", count); /* Allocate another buffer (buff2) to read into, then read NUM_INTS integers into this buffer. */ buff2 = (int *)malloc(NUM_INTS*sizeof(int)); result = MPI_File_read_at(fh, offset, buff2, NUM_INTS, buftype, &status); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_read_at"); /* Find out how many bytes were read and compare to how many we expected */ result = MPI_Get_elements(&status, MPI_BYTE, &count); if(result != MPI_SUCCESS) sample_error(result, "MPI_Get_elements"); if(count != NUM_INTS*sizeof(int)) fprintf(stderr, "Did not read the same number of bytes as requested\n"); else fprintf(stdout, "Read %d bytes\n", count); /* Check to see that each integer read from each location is the same as the integer written to that location. */ differs = 0; for(i=0; i<NUM_INTS; i++) { if(buff1[i] != buff2[i]) { fprintf(stderr, "Integer number %d differs\n", i); differs = 1; } } if(!differs) fprintf(stdout, "Wrote and read the same data\n"); MPI_Barrier(MPI_COMM_WORLD); result = MPI_File_get_size(fh, &file_size); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_get_size"); /* Compare the file size with what we expect */ /* To see a negative response, make the file preexist with a larger size than what is written by this program */ if(file_size != (comm_size * NUM_INTS * sizeof(int))) fprintf(stderr, "File size is not equal to the write size\n"); result = MPI_File_close(&fh); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_close"); MPI_Finalize(); free(buff1); free(buff2); } |
We can adapt our example above to support the I/O programming style that best suits our application. Essentially, there are three dimensions on which to choose an appropriate data access routine for your particular task: file pointer type, collective or noncollective, and blocking or nonblocking.
We need to choose which file pointer type to use: explicit, individual, or shared. In the example above, we used an explicit pointer and passed it directly as the offset parameter to the MPI_File_write_at and MPI_File_read_at routines. Using an explicit pointer is equivalent to calling MPI_File_seek to set the individual file pointer to offset, then calling MPI_File_write or MPI_File_read, which is directly analogous to calling UNIX lseek() and write() or read(). If each process accesses the file sequentially, individual file pointers save you the effort to recalculate offset for each data access. We would use a shared file pointer in situations where all the processes need to cooperatively access a file in a sequential way, for example, writing log files.
Collective data-access routines allow the user to enforce some implicit coordination among the processes in a parallel job when making data accesses. For example, if a parallel job alternately reads in a matrix and performs computation on it, but cannot progress to the next stage of computation until all processes have completed the last stage, then a coordinated effort between processes when accessing data might be more efficient. In the example above, we could easily append the suffix _all to MPI_File_write_at and MPI_File_read_at to make the accesses collective. By coordinating the processes, we could achieve greater efficiency in the MPI library or at the file system level in buffering or caching the next matrix. In contrast, noncollective accesses are used when it is not evident that any benefit would be gained by coordinating disparate accesses by each process. UNIX file accesses are noncollective.
MPI I/O also supports nonblocking versions of each of the data-access routines, that is, the data-access routines that have the letter i before write or read in the routine name (i stands for immediate). By definition, nonblocking I/O routines return immediately after the I/O request has been issued and does not wait until the I/O request has been completed. This functionality allows the user to perform computation and communication at the same time as the I/O. Since large I/O requests can take a long time to complete, this provides a way to more efficiently utilize your programs waiting time.
As in our example above, parallel jobs often partition large matrices stored in files. These parallel jobs may use many large matrices or matrices that are too large to fit into memory at once. Thus, each process may access the multiple and/or large matrices in stages. During each stage, a process reads in a chunk of data, then performs some computation on it (which may involve communicating with the other processes in the parallel job). While performing the computation and communication, the process could issue a nonblocking I/O read request for the next chunk of data. Similarly, once the computation on a particular chunk has completed, a nonblocking write request could be issued before performing computation and communication on the next chunk.
The following example code illustrates the use of a nonblocking data-access routine. Notice that, like nonblocking communication routines, the nonblocking I/O routines require a call to MPI_Wait to wait for the nonblocking request to complete or repeated calls to MPI_Test to determine when the nonblocking data access has completed. Once complete, the write or read buffer is available for use again by the program.
/* iwr_at.c * * Example to demonstrate use of MPI_File_iwrite_at and MPI_File_iread_at * */ #include <stdio.h> #include "mpi.h" #define NUM_BYTES 100 void sample_error(int error, char *string) { fprintf(stderr, "Error %d in %s\n", error, string); MPI_Finalize(); exit(-1); } void main( int argc, char **argv ) { char filename[128]; char *buff; MPI_File fh; MPI_Offset offset; MPI_Request request; MPI_Status status; int i, rank, flag, result; if(argc < 2) { fprintf(stdout, "Missing argument: filename\n"); exit(-1); } strcpy(filename, argv[1]); MPI_Init(&argc, &argv); result = MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(result != MPI_SUCCESS) sample_error(result, "MPI_Comm_rank"); result = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDWR | MPI_MODE_CREATE, (MPI_Info)NULL, &fh); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_open"); buff = (char *)malloc(NUM_BYTES*sizeof(char)); for(i=0;i<NUM_BYTES;i++) buff[i] = i; offset = rank * NUM_BYTES; result = MPI_File_iread_at(fh, offset, buff, NUM_BYTES, MPI_BYTE, &request); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_iread_at"); /* Perform some useful computation and/or communication */ result = MPI_Wait(&request, &status); buff = (char *)malloc(NUM_BYTES*sizeof(char)); for(i=0;i<NUM_BYTES;i++) buff[i] = i; result = MPI_File_iwrite_at(fh, offset, buff, NUM_BYTES, MPI_BYTE, &request); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_iwrite_at"); /* Perform some useful computation and/or communication */ flag = 0; i = 0; while(!flag) { result = MPI_Test(&request, &flag, &status); i++; /* Perform some more computation or communication, if possible */ } result = MPI_File_close(&fh); if(result != MPI_SUCCESS) sample_error(result, "MPI_File_close"); MPI_Finalize(); fprintf(stdout, "Successful completion\n"); free(buff); } |