From 1e95d8b1e25ac42e2fc9bfeac2259b2873763b5c Mon Sep 17 00:00:00 2001
From: Vishwanath Venkatesan <vvenkates@gmail.com>
Date: Fri, 13 Jan 2012 17:21:51 +0000
Subject: [PATCH] remove the MPI functions used in these files by the OMPI
 internal corresponding functionality and also add error checking in these for
 functions which did not have them'

This commit was SVN r25723.
---
 .../fcoll_two_phase_file_write_all.c          | 782 ++++++++++++------
 ompi/mca/io/ompio/io_ompio.c                  |  75 +-
 2 files changed, 585 insertions(+), 272 deletions(-)

diff --git a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c
index fabb358899..53af23659c 100644
--- a/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c
+++ b/ompi/mca/fcoll/two_phase/fcoll_two_phase_file_write_all.c
@@ -67,7 +67,7 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
 
 
 
-static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
+static int  two_phase_exchage_data(mca_io_ompio_file_t *fh,
 				   void *buf,
 				   struct iovec *offset_length,
 				   int *send_size, int *start_pos,
@@ -88,24 +88,24 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
 				   int striping_unit, int *aggregator_list);
 				   
 
-static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
-				       void *buf,
-				       Flatlist_node *flat_buf,
-				       char **send_buf,
-				       struct iovec *offset_length,
-				       int *send_size,
-				       MPI_Request *send_req,
-				       int *sent_to_proc,
-				       int contig_access_count, 
-				       OMPI_MPI_OFFSET_TYPE min_st_offset,
-				       OMPI_MPI_OFFSET_TYPE fd_size,
-				       OMPI_MPI_OFFSET_TYPE *fd_start,
-				       OMPI_MPI_OFFSET_TYPE *fd_end,
-				       int *send_buf_idx,
-				       int *curr_to_proc, 
-				       int *done_to_proc,
-				       int iter, MPI_Aint buftype_extent,
-				       int striping_unit, int *aggregator_list);
+static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
+				      void *buf,
+				      Flatlist_node *flat_buf,
+				      char **send_buf,
+				      struct iovec *offset_length,
+				      int *send_size,
+				      MPI_Request *send_req,
+				      int *sent_to_proc,
+				      int contig_access_count, 
+				      OMPI_MPI_OFFSET_TYPE min_st_offset,
+				      OMPI_MPI_OFFSET_TYPE fd_size,
+				      OMPI_MPI_OFFSET_TYPE *fd_start,
+				      OMPI_MPI_OFFSET_TYPE *fd_end,
+				      int *send_buf_idx,
+				      int *curr_to_proc, 
+				      int *done_to_proc,
+				      int iter, MPI_Aint buftype_extent,
+				      int striping_unit, int *aggregator_list);
 
 
 void two_phase_heap_merge(mca_io_ompio_access_array_t *others_req,
@@ -133,18 +133,16 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 
     int i, j,interleave_count=0, striping_unit=0;
     size_t max_data = 0, total_bytes = 0; 
-    int domain_size=0, *count_my_req_per_proc, count_my_req_procs;
-    int count_other_req_procs, *buf_indices;
-    /*    uint32_t iov_count = 0;
-	  struct iovec *decoded_iov = NULL; */
+    int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs;
+    int count_other_req_procs, *buf_indices, ret=OMPI_SUCCESS;
     int local_count = 0, local_size=0,*aggregator_list = NULL;
     struct iovec *iov = NULL;
 
     OMPI_MPI_OFFSET_TYPE start_offset, end_offset, fd_size;
-    OMPI_MPI_OFFSET_TYPE *start_offsets, *end_offsets;
-    OMPI_MPI_OFFSET_TYPE *fd_start, *fd_end, min_st_offset;
+    OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
+    OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset;
     Flatlist_node *flat_buf=NULL;
-    mca_io_ompio_access_array_t *my_req, *others_req;
+    mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
 
 
     if (opal_datatype_is_contiguous_memory_layout(&datatype->super,1)) {
@@ -157,9 +155,13 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
     
     
     if(-1 == mca_fcoll_two_phase_num_io_procs){
-      ompi_io_ompio_set_aggregator_props (fh, 
-					  mca_fcoll_two_phase_num_io_procs,
-					  max_data);
+      ret = ompi_io_ompio_set_aggregator_props (fh, 
+						mca_fcoll_two_phase_num_io_procs,
+						max_data);
+      if ( OMPI_SUCCESS != ret){
+	return  ret;
+      }
+      
       mca_fcoll_two_phase_num_io_procs = 
 	ceil((float)fh->f_size/fh->f_procs_per_group);
       
@@ -172,41 +174,57 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 #if DEBUG_ON
     printf("Number of aggregators : %ld\n", mca_fcoll_two_phase_num_io_procs);
 #endif
+
     aggregator_list = (int *) malloc (mca_fcoll_two_phase_num_io_procs *
 				      sizeof(int));
     
+    if ( NULL == aggregator_list ) {
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
     
     for (i =0; i< mca_fcoll_two_phase_num_io_procs; i++){
       aggregator_list[i] = i;
     }
     
     
-    ompi_io_ompio_generate_current_file_view (fh, 
-                                              max_data, 
-                                              &iov, 
-                                              &local_count);
+    ret = ompi_io_ompio_generate_current_file_view (fh, 
+						    max_data, 
+						    &iov, 
+						    &local_count);
     
        
+    if ( OMPI_SUCCESS != ret ){
+      goto exit;
+    }
+
+
+    ret = fh->f_comm->c_coll.coll_allreduce (&max_data,
+					     &total_bytes,
+					     1,
+					     MPI_DOUBLE,
+					     MPI_SUM,
+					     fh->f_comm,
+					     fh->f_comm->c_coll.coll_allreduce_module);
+    
+    if ( OMPI_SUCCESS != ret ) {
+      goto exit;
+    }
     
 
-
-    fh->f_comm->c_coll.coll_allreduce (&max_data,
-                                       &total_bytes,
-                                       1,
-                                       MPI_DOUBLE,
-                                       MPI_SUM,
-                                       fh->f_comm,
-                                       fh->f_comm->c_coll.coll_allreduce_module);
-
-
-     if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
-
+    
+    if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
+      
       /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
       /* But helps to re-use romio's code for handling non-contiguous file-type*/
-       flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
-       flat_buf->type = datatype;
-       flat_buf->next = NULL;
-       flat_buf->count = 0;
+      flat_buf = (Flatlist_node *)malloc(sizeof(Flatlist_node));
+      if ( NULL == flat_buf ){
+	ret = OMPI_ERR_OUT_OF_RESOURCE;
+	goto exit;
+      }
+
+      flat_buf->type = datatype;
+      flat_buf->next = NULL;
+      flat_buf->count = 0;
        
        if(iov[0].iov_base == 0 ||
 	  (OMPI_MPI_OFFSET_TYPE)iov[local_count-1].iov_base +
@@ -218,9 +236,22 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
        flat_buf->indices = 
 	 (OMPI_MPI_OFFSET_TYPE *)malloc(local_size * 
 					sizeof(OMPI_MPI_OFFSET_TYPE));
+
+       if ( NULL == flat_buf->indices ){
+	 ret = OMPI_ERR_OUT_OF_RESOURCE;
+	 goto exit;
+
+       }
+
        flat_buf->blocklens = 
 	(OMPI_MPI_OFFSET_TYPE *)malloc(local_size * 
-				     sizeof(OMPI_MPI_OFFSET_TYPE));
+				       sizeof(OMPI_MPI_OFFSET_TYPE));
+       
+       if ( NULL == flat_buf->blocklens ){
+	 ret = OMPI_ERR_OUT_OF_RESOURCE;
+	 goto exit;
+       }
+       
        flat_buf->count = local_size;
        i=0;j=0;
        while(j < local_size){
@@ -254,11 +285,6 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
        }
 #endif
      }
-     
-     
-
-
-
 
 #if DEBUG_ON
     printf("%d: fcoll:two_phase:write_all->total_bytes:%ld, local_count: %ld\n",
@@ -287,13 +313,54 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 
     start_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
 	(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
+ 
+    if ( NULL == start_offsets ){
+      ret = OMPI_ERR_OUT_OF_RESOURCE;
+      goto exit; 
+    }
+
     end_offsets = (OMPI_MPI_OFFSET_TYPE *)malloc
 	(fh->f_size*sizeof(OMPI_MPI_OFFSET_TYPE));
     
-    MPI_Allgather(&start_offset, 1,MPI_OFFSET, start_offsets, 1,
-		  MPI_OFFSET, fh->f_comm);
-    MPI_Allgather(&end_offset, 1, MPI_OFFSET, end_offsets, 1,
-		  MPI_OFFSET, fh->f_comm);
+    if ( NULL == end_offsets ){
+      ret =  OMPI_ERR_OUT_OF_RESOURCE;
+      goto exit;
+    }
+
+
+    ret = fh->f_comm->c_coll.coll_allgather(&start_offset,
+					    1,
+					    MPI_LONG,
+					    start_offsets,
+					    1,
+					    MPI_LONG,
+					    fh->f_comm,
+					    fh->f_comm->c_coll.coll_allgather_module);
+
+    if ( OMPI_SUCCESS != ret ){
+      goto exit;
+    }
+
+
+    ret = fh->f_comm->c_coll.coll_allgather(&end_offset,
+					    1,
+					    MPI_LONG,
+					    end_offsets,
+					    1,
+					    MPI_LONG,
+					    fh->f_comm,
+					    fh->f_comm->c_coll.coll_allgather_module);
+
+
+    if ( OMPI_SUCCESS != ret ){
+      goto exit;
+    }
+				      
+    
+    /*MPI_Allgather(&start_offset, 1,MPI_OFFSET, start_offsets, 1,
+      MPI_OFFSET, fh->f_comm);
+      MPI_Allgather(&end_offset, 1, MPI_OFFSET, end_offsets, 1,
+      MPI_OFFSET, fh->f_comm);*/
 
     
 #if DEBUG_ON
@@ -307,10 +374,12 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 
 
 
-    for (i=1; i<fh->f_size; i++)
-	if ((start_offsets[i] < end_offsets[i-1]) && 
-	    (start_offsets[i] <= end_offsets[i]))
-	    interleave_count++;
+    for (i=1; i<fh->f_size; i++){
+      if ((start_offsets[i] < end_offsets[i-1]) && 
+	  (start_offsets[i] <= end_offsets[i])){
+	interleave_count++;
+      }
+    }
 
 #if DEBUG_ON
     	printf("%d: fcoll:two_phase:write_all:interleave_count:%d\n",
@@ -318,16 +387,19 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 #endif 
 	
 	
-	ompi_io_ompio_domain_partition(fh,
-				       start_offsets,
-				       end_offsets,
-				       &min_st_offset,
-				       &fd_start,
-				       &fd_end,
-				       domain_size, 
-				       &fd_size,
-				       striping_unit,
-				       mca_fcoll_two_phase_num_io_procs);
+	ret = ompi_io_ompio_domain_partition(fh,
+					     start_offsets,
+					     end_offsets,
+					     &min_st_offset,
+					     &fd_start,
+					     &fd_end,
+					     domain_size, 
+					     &fd_size,
+					     striping_unit,
+					     mca_fcoll_two_phase_num_io_procs);
+	if ( OMPI_SUCCESS != ret ){
+	  goto exit;
+	}
 
 
 #if  DEBUG_ON
@@ -338,58 +410,89 @@ mca_fcoll_two_phase_file_write_all (mca_io_ompio_file_t *fh,
 #endif
 
 
-	ompi_io_ompio_calc_my_requests (fh,
-					iov,
-					local_count,
-					min_st_offset,
-					fd_start,
-					fd_end,
-					fd_size,
-					&count_my_req_procs,
-					&count_my_req_per_proc,
-					&my_req,
-					&buf_indices,
-					striping_unit,
-					mca_fcoll_two_phase_num_io_procs,
-					aggregator_list);
-	
-	
-
-
-
-	
-	ompi_io_ompio_calc_others_requests(fh,
-					   count_my_req_procs,
-					   count_my_req_per_proc,
-					   my_req,
-					   &count_other_req_procs,
-					   &others_req);
-	
-
-	#if DEBUG_ON
-	printf("count_other_req_procs : %d\n", count_other_req_procs);
-	#endif
-
-	if(!(OMPI_SUCCESS == two_phase_exch_and_write(fh,
-						      buf,
-						      datatype,
-						      others_req,
-						      iov,
-						      local_count,
-						      min_st_offset,
-						      fd_size,
-						      fd_start,
-						      fd_end,
-						      flat_buf,
-						      buf_indices,
-						      striping_unit,
-						      aggregator_list))){
-	    perror("Error in exch and write\n");
-	    return OMPI_ERROR;
+	ret = ompi_io_ompio_calc_my_requests (fh,
+					      iov,
+					      local_count,
+					      min_st_offset,
+					      fd_start,
+					      fd_end,
+					      fd_size,
+					      &count_my_req_procs,
+					      &count_my_req_per_proc,
+					      &my_req,
+					      &buf_indices,
+					      striping_unit,
+					      mca_fcoll_two_phase_num_io_procs,
+					      aggregator_list);
+	if ( OMPI_SUCCESS != ret ){
+	  goto exit;
 	}
 	
 	
-	return OMPI_SUCCESS;
+
+	ret = ompi_io_ompio_calc_others_requests(fh,
+						 count_my_req_procs,
+						 count_my_req_per_proc,
+						 my_req,
+						 &count_other_req_procs,
+						 &others_req);
+	if (OMPI_SUCCESS != ret ){
+	  goto exit;
+	}
+	
+	
+#if DEBUG_ON
+	printf("count_other_req_procs : %d\n", count_other_req_procs);
+#endif
+	
+	ret = two_phase_exch_and_write(fh,
+				       buf,
+				       datatype,
+				       others_req,
+				       iov,
+				       local_count,
+				       min_st_offset,
+				       fd_size,
+				       fd_start,
+				       fd_end,
+				       flat_buf,
+				       buf_indices,
+				       striping_unit,
+				       aggregator_list);
+
+	if (OMPI_SUCCESS != ret){
+	  goto exit;
+	}
+	
+
+
+ exit : 
+	if (flat_buf != NULL) {
+	  free (flat_buf);
+
+	  if (flat_buf->blocklens != NULL) {
+	    free (flat_buf->blocklens);
+	  }
+	  
+	  if (flat_buf->indices != NULL) {
+	    free (flat_buf->indices);
+	  }
+	}
+
+
+
+	if (start_offsets != NULL) {
+	  free(start_offsets);
+	}
+	
+	if (end_offsets != NULL){
+	  free(end_offsets);
+	}
+	if (aggregator_list != NULL){
+	  free(aggregator_list);
+	}
+
+	return ret;
 }
 
 
@@ -411,14 +514,15 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
 
   
     int i, j, ntimes, max_ntimes, m;
-    int *curr_offlen_ptr, *count, *send_size, *recv_size;
-    int *partial_recv, *start_pos, req_len, flag;
-    int *sent_to_proc;
-    int *send_buf_idx, *curr_to_proc, *done_to_proc;
+    int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
+    int *partial_recv=NULL, *start_pos=NULL, req_len, flag;
+    int *sent_to_proc=NULL, ret = OMPI_SUCCESS;
+    int *send_buf_idx=NULL, *curr_to_proc=NULL, *done_to_proc=NULL;
     OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off, done;
     OMPI_MPI_OFFSET_TYPE size=0, req_off, len;
     MPI_Aint buftype_extent;
     int byte_size;
+
     #if DEBUG_ON
     int ii,jj;
     #endif
@@ -458,22 +562,74 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
 				       fh->f_comm,
 				       fh->f_comm->c_coll.coll_allreduce_module);
 
-    if (ntimes) write_buf = (char *) malloc (mca_fcoll_two_phase_cycle_buffer_size);
+    if (ntimes){
+      write_buf = (char *) malloc (mca_fcoll_two_phase_cycle_buffer_size);
+      if ( NULL == write_buf ){
+	return OMPI_ERR_OUT_OF_RESOURCE;
+      }
+    }
+
     curr_offlen_ptr = (int *) calloc(fh->f_size, sizeof(int)); 
+    
+    if ( NULL == curr_offlen_ptr ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+    
     count = (int *) malloc(fh->f_size*sizeof(int));
+    
+    if ( NULL == count ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+   
     partial_recv = (int *)calloc(fh->f_size, sizeof(int));
 
+    if ( NULL == partial_recv ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     send_size = (int *) calloc(fh->f_size,sizeof(int));
+
+    if ( NULL == send_size ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     recv_size = (int *) calloc(fh->f_size,sizeof(int));
 
+    if ( NULL == recv_size ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
 
     send_buf_idx = (int *) malloc(fh->f_size*sizeof(int));
+
+    if ( NULL == send_buf_idx ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+    
     sent_to_proc = (int *) calloc(fh->f_size, sizeof(int));
+    
+    if ( NULL == sent_to_proc){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     curr_to_proc = (int *) malloc(fh->f_size*sizeof(int));
+    
+    if ( NULL == curr_to_proc ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     done_to_proc = (int *) malloc(fh->f_size*sizeof(int));
+    
+    if ( NULL == done_to_proc ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
 
     start_pos = (int *) malloc(fh->f_size*sizeof(int));
     
+    if ( NULL == start_pos ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+ 
+  
     done = 0;
     off = st_loc;
     
@@ -533,18 +689,21 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
 	    }
 	}	
 
-	two_phase_exchage_data(fh, buf, offset_len,send_size,
-			       start_pos,recv_size,off,size,
-			       count, partial_recv, sent_to_proc,
-			       contig_access_count,
-			       min_st_offset,
-			       fd_size, fd_start,
-			       fd_end, flat_buf, others_req,
-			       send_buf_idx, curr_to_proc,
-			       done_to_proc, m, buf_idx, buftype_extent,
-			       striping_unit, aggregator_list);
-
-
+	ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
+				     start_pos,recv_size,off,size,
+				     count, partial_recv, sent_to_proc,
+				     contig_access_count,
+				     min_st_offset,
+				     fd_size, fd_start,
+				     fd_end, flat_buf, others_req,
+				     send_buf_idx, curr_to_proc,
+				     done_to_proc, m, buf_idx, buftype_extent,
+				     striping_unit, aggregator_list);
+	
+	if ( OMPI_SUCCESS != ret ){
+	  goto exit;
+	}
+	
 	
 	
 	flag = 0;
@@ -604,77 +763,124 @@ static int two_phase_exch_and_write(mca_io_ompio_file_t *fh,
     }
     for (i=0; i<fh->f_size; i++) count[i] = recv_size[i] = 0;
     for (m=ntimes; m<max_ntimes; m++) {
-	two_phase_exchage_data(fh, buf, offset_len,send_size,
-			       start_pos,recv_size,off,size,
-			       count, partial_recv, sent_to_proc,
-			       contig_access_count,
-			       min_st_offset,
-			       fd_size, fd_start,
-			       fd_end, flat_buf,others_req,
-			       send_buf_idx, curr_to_proc,
-			       done_to_proc, m, buf_idx, buftype_extent,
-			       striping_unit, aggregator_list);
+	ret = two_phase_exchage_data(fh, buf, offset_len,send_size,
+				     start_pos,recv_size,off,size,
+				     count, partial_recv, sent_to_proc,
+				     contig_access_count,
+				     min_st_offset,
+				     fd_size, fd_start,
+				     fd_end, flat_buf,others_req,
+				     send_buf_idx, curr_to_proc,
+				     done_to_proc, m, buf_idx, buftype_extent,
+				     striping_unit, aggregator_list);
+	if ( OMPI_SUCCESS != ret ){
+	  goto exit;
+	}
     }
     
-    if (ntimes) free(write_buf);
-    free(curr_offlen_ptr);
-    free(count);
-    free(partial_recv);
-    free(send_size);
-    free(recv_size);
-    free(sent_to_proc);
-    free(start_pos);
-    free(send_buf_idx);
-    free(curr_to_proc);
-    free(done_to_proc);
+ exit:    
+    
+    if (ntimes){
+      if ( NULL != write_buf ){
+	free(write_buf);
+      }
+    }
+    if ( NULL != curr_offlen_ptr ){
+      free(curr_offlen_ptr);
+    }
+    if ( NULL != count ){ 
+      free(count);
+    }
+    if ( NULL != partial_recv ){
+      free(partial_recv);
+    }
+    if ( NULL != send_size ){
+      free(send_size);
+    }
+    if ( NULL != recv_size ){
+      free(recv_size);
+    }
+    if ( NULL != sent_to_proc ){
+      free(sent_to_proc);
+    }
+    if ( NULL != start_pos ){
+      free(start_pos);
+    }
+    if ( NULL != send_buf_idx ){
+      free(send_buf_idx);
+    }
+    if ( NULL != curr_to_proc ){
+      free(curr_to_proc);
+    }
+    if ( NULL != done_to_proc ){
+      free(done_to_proc);
+    }
 
-    return OMPI_SUCCESS;
+    return ret;
 }
 
-static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
-				   void *buf,
-				   struct iovec *offset_length,
-				   int *send_size,int *start_pos,
-				   int *recv_size,
-				   OMPI_MPI_OFFSET_TYPE off,
-				   OMPI_MPI_OFFSET_TYPE size, int *count,
-				   int *partial_recv, int *sent_to_proc,
-				   int contig_access_count,
-				   OMPI_MPI_OFFSET_TYPE min_st_offset,
-				   OMPI_MPI_OFFSET_TYPE fd_size,
-				   OMPI_MPI_OFFSET_TYPE *fd_start,
-				   OMPI_MPI_OFFSET_TYPE *fd_end,
-				   Flatlist_node *flat_buf,
-				   mca_io_ompio_access_array_t *others_req,
-				   int *send_buf_idx, int *curr_to_proc,
-				   int *done_to_proc, int iter,
-				   int *buf_idx,MPI_Aint buftype_extent,
-				   int striping_unit, int *aggregator_list){
-    
-    int *tmp_len, sum, *srt_len,nprocs_recv, nprocs_send,  k,i,j;
-    MPI_Status *statuses;
-    MPI_Request *requests, *send_req;
-    MPI_Datatype *recv_types;
-    OMPI_MPI_OFFSET_TYPE *srt_off;
+static int two_phase_exchage_data(mca_io_ompio_file_t *fh,
+				  void *buf,
+				  struct iovec *offset_length,
+				  int *send_size,int *start_pos,
+				  int *recv_size,
+				  OMPI_MPI_OFFSET_TYPE off,
+				  OMPI_MPI_OFFSET_TYPE size, int *count,
+				  int *partial_recv, int *sent_to_proc,
+				  int contig_access_count,
+				  OMPI_MPI_OFFSET_TYPE min_st_offset,
+				  OMPI_MPI_OFFSET_TYPE fd_size,
+				  OMPI_MPI_OFFSET_TYPE *fd_start,
+				  OMPI_MPI_OFFSET_TYPE *fd_end,
+				  Flatlist_node *flat_buf,
+				  mca_io_ompio_access_array_t *others_req,
+				  int *send_buf_idx, int *curr_to_proc,
+				  int *done_to_proc, int iter,
+				  int *buf_idx,MPI_Aint buftype_extent,
+				  int striping_unit, int *aggregator_list){
+  
+    int *tmp_len=NULL, sum, *srt_len=NULL, nprocs_recv, nprocs_send,  k,i,j;
+    int ret=OMPI_SUCCESS;
+    MPI_Request *requests=NULL, *send_req=NULL;
+    MPI_Datatype *recv_types=NULL;
+    OMPI_MPI_OFFSET_TYPE *srt_off=NULL;
     char **send_buf = NULL; 
     
     
-    fh->f_comm->c_coll.coll_alltoall (recv_size,
-				      1,
-				      MPI_INT,
-				      send_size,
-				      1,
-				      MPI_INT,
-				      fh->f_comm,
-				      fh->f_comm->c_coll.coll_alltoall_module);
+    ret = fh->f_comm->c_coll.coll_alltoall (recv_size,
+					    1,
+					    MPI_INT,
+					    send_size,
+					    1,
+					    MPI_INT,
+					    fh->f_comm,
+					    fh->f_comm->c_coll.coll_alltoall_module);
+    
+    if ( OMPI_SUCCESS != ret ){
+      return ret;
+    }
 
     nprocs_recv = 0;
-    for (i=0;i<fh->f_size;i++)
-	if (recv_size[i]) nprocs_recv++;
+    for (i=0;i<fh->f_size;i++){
+      if (recv_size[i]){
+	nprocs_recv++;
+      }
+    }
+    
+    
     recv_types = (MPI_Datatype *)
 	malloc (( nprocs_recv + 1 ) * sizeof(MPI_Datatype *));
     
+    if ( NULL == recv_types ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     tmp_len = (int *) malloc(fh->f_size*sizeof(int));
+    
+    if ( NULL == tmp_len ) {
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     j = 0;
     for (i=0;i<fh->f_size;i++){
 	if (recv_size[i]) {
@@ -695,7 +901,16 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
     sum = 0;
     for (i=0;i<fh->f_size;i++) sum += count[i];
     srt_off = (OMPI_MPI_OFFSET_TYPE *) malloc((sum+1)*sizeof(OMPI_MPI_OFFSET_TYPE));
+    
+    if ( NULL == srt_off ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+    
     srt_len = (int *) malloc((sum+1)*sizeof(int));
+    
+    if ( NULL == srt_len ) {
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
 
 
     two_phase_heap_merge(others_req, count, srt_off, srt_len, start_pos, fh->f_size,fh->f_rank,  nprocs_recv, sum);
@@ -707,9 +922,15 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
             others_req[i].lens[k] = tmp_len[i];
         }
     
-    free(tmp_len); 
-    free(srt_off);
-    free(srt_len);
+    if ( NULL != tmp_len ){
+      free(tmp_len); 
+    }
+    if ( NULL != srt_off ){
+      free(srt_off);
+    }
+    if ( NULL != srt_len ){
+      free(srt_len);
+    }
     
     nprocs_send = 0;
     for (i=0; i <fh->f_size; i++) if (send_size[i]) nprocs_send++;
@@ -721,13 +942,28 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
     requests = (MPI_Request *) 	
 	malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); 
 
-
+    if ( NULL == requests ){
+      return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+    
     j = 0;
     for (i=0; i<fh->f_size; i++) {
 	if (recv_size[i]) {
+	  ret = MCA_PML_CALL(irecv(MPI_BOTTOM,
+				   1,
+				   recv_types[j],
+				   i,
+				   fh->f_rank+i+100*iter,
+				   fh->f_comm,
+				   requests+j));
+	  /*
 	    MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, fh->f_rank+i+100*iter,
-		      fh->f_comm, requests+j);
-	    j++;
+	    fh->f_comm, requests+j);
+	  */
+	  if ( OMPI_SUCCESS != ret ){
+	    return ret;
+	  }
+	  j++;
 	}
     }
     send_req = requests + nprocs_recv;
@@ -736,42 +972,69 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
     if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
 	j = 0;
 	for (i=0; i <fh->f_size; i++) 
-	    if (send_size[i]) {
-		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], 
+	  if (send_size[i]) {
+	    ret = MCA_PML_CALL(isend(((char *) buf) + buf_idx[i],
+				     send_size[i],
+				     MPI_BYTE,
+				     i,
+				     fh->f_rank+i+100*iter,
+				     MCA_PML_BASE_SEND_STANDARD, 
+				     fh->f_comm,
+				     send_req+j));	
+
+	    /*		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], 
 			  MPI_BYTE, i,  fh->f_rank+i+100*iter, fh->f_comm, 
-			  send_req+j);
-		j++;
-		buf_idx[i] += send_size[i];
+			  send_req+j);*/
+	    if ( OMPI_SUCCESS != ret ){
+	      return ret;
 	    }
+	    
+	    j++;
+	    buf_idx[i] += send_size[i];
+	  }
     }
     else if(nprocs_send && (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY))){
-	send_buf = (char **) malloc(fh->f_size*sizeof(char*));
-	for (i=0; i < fh->f_size; i++) 
-	    if (send_size[i]) 
-		send_buf[i] = (char *) malloc(send_size[i]);
-
-	two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
-				   offset_length, send_size,
-				   send_req,sent_to_proc,
-				   contig_access_count, 
-				   min_st_offset, fd_size,
-				   fd_start, fd_end, send_buf_idx,
-				   curr_to_proc, done_to_proc,
-				   iter, buftype_extent, striping_unit,
-				   aggregator_list);
-	
+      send_buf = (char **) malloc(fh->f_size*sizeof(char*));
+      if ( NULL == send_buf ){
+	return OMPI_ERR_OUT_OF_RESOURCE;
+      }
+      for (i=0; i < fh->f_size; i++){
+	if (send_size[i]) {
+	  send_buf[i] = (char *) malloc(send_size[i]);
+	  
+	  if ( NULL == send_buf[i] ){
+	    return OMPI_ERR_OUT_OF_RESOURCE;
+	  }
+	}
+      }
+      
+      ret = two_phase_fill_send_buffer(fh, buf,flat_buf, send_buf,
+				       offset_length, send_size,
+				       send_req,sent_to_proc,
+				       contig_access_count, 
+				       min_st_offset, fd_size,
+				       fd_start, fd_end, send_buf_idx,
+				       curr_to_proc, done_to_proc,
+				       iter, buftype_extent, striping_unit,
+				       aggregator_list);
+      
+      if ( OMPI_SUCCESS != ret ){
+	return ret;
+      }
     }
 
     for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
     free(recv_types);
-    statuses = (MPI_Status *) malloc((nprocs_send+nprocs_recv+1) *
-				     sizeof(MPI_Status)); 
-
-    MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
+    ret = ompi_request_wait_all (nprocs_send+nprocs_recv,
+				 requests,
+				 MPI_STATUS_IGNORE);
     
-    free(statuses);
-    free(requests);
 
+    if ( NULL != requests ){
+      free(requests);
+    }
+    
+    return ret;
 }
 
 
@@ -825,28 +1088,28 @@ static void two_phase_exchage_data(mca_io_ompio_file_t *fh,
 
 
 
-static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
-				       void *buf,
-				       Flatlist_node *flat_buf,
-				       char **send_buf,
-				       struct iovec *offset_length,
-				       int *send_size,
-				       MPI_Request *requests,
-				       int *sent_to_proc,
-				       int contig_access_count, 
-				       OMPI_MPI_OFFSET_TYPE min_st_offset,
-				       OMPI_MPI_OFFSET_TYPE fd_size,
-				       OMPI_MPI_OFFSET_TYPE *fd_start,
-				       OMPI_MPI_OFFSET_TYPE *fd_end,
-				       int *send_buf_idx,
-				       int *curr_to_proc, 
-				       int *done_to_proc,
-				       int iter, MPI_Aint buftype_extent,
-				       int striping_unit, int *aggregator_list){
+static int two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
+				      void *buf,
+				      Flatlist_node *flat_buf,
+				      char **send_buf,
+				      struct iovec *offset_length,
+				      int *send_size,
+				      MPI_Request *requests,
+				      int *sent_to_proc,
+				      int contig_access_count, 
+				      OMPI_MPI_OFFSET_TYPE min_st_offset,
+				      OMPI_MPI_OFFSET_TYPE fd_size,
+				      OMPI_MPI_OFFSET_TYPE *fd_start,
+				      OMPI_MPI_OFFSET_TYPE *fd_end,
+				      int *send_buf_idx,
+				      int *curr_to_proc, 
+				      int *done_to_proc,
+				      int iter, MPI_Aint buftype_extent,
+				      int striping_unit, int *aggregator_list){
 
     int i, p, flat_buf_idx;
     OMPI_MPI_OFFSET_TYPE flat_buf_sz, size_in_buf, buf_incr, size;
-    int jj, n_buftypes;
+    int jj, n_buftypes, ret=OMPI_SUCCESS;
     OMPI_MPI_OFFSET_TYPE off, len, rem_len, user_buf_idx;
 
     for (i=0; i < fh->f_size; i++) {
@@ -897,8 +1160,20 @@ static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
 			TWO_PHASE_BUF_COPY
 		    }
 		    if (send_buf_idx[p] == send_size[p]) {
-			MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, 
-				  fh->f_rank+p+100*iter, fh->f_comm, requests+jj);
+
+		      ret = MCA_PML_CALL(isend(send_buf[p],
+					       send_size[p],
+					       MPI_BYTE,
+					       p,
+					       fh->f_rank+p+100*iter,
+					       MCA_PML_BASE_SEND_STANDARD, 
+					       fh->f_comm,
+					       requests+jj));	
+		      
+		      if ( OMPI_SUCCESS != ret ){
+			return ret;
+		      }
+		      /*			MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,  fh->f_rank+p+100*iter, fh->f_comm, requests+jj);*/
 			jj++;
 		    }
 		}
@@ -916,8 +1191,13 @@ static void two_phase_fill_send_buffer(mca_io_ompio_file_t *fh,
 	    rem_len -= len;
 	}
     }
-    for (i=0; i < fh->f_size; i++) 
-	if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
+    for (i=0; i < fh->f_size; i++) {
+      if (send_size[i]){
+	sent_to_proc[i] = curr_to_proc[i];
+      }
+    }
+
+    return ret;
 }
     
 
diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c
index e9084e8d57..329f2eeba4 100644
--- a/ompi/mca/io/ompio/io_ompio.c
+++ b/ompi/mca/io/ompio/io_ompio.c
@@ -958,6 +958,7 @@ int ompi_io_ompio_set_aggregator_props (mca_io_ompio_file_t *fh,
 
 
 /*Based on ROMIO's domain partitioning implementaion 
+Series of functions implementations for two-phase implementation
 Functions to support Domain partitioning and aggregator
 selection for two_phase .
 This is commom to both two_phase_read and write. */
@@ -977,7 +978,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
    
 
     
-    OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
+    OMPI_MPI_OFFSET_TYPE min_st_offset, max_end_offset, *fd_start=NULL, *fd_end=NULL, fd_size;
     int i;
 
 
@@ -1000,9 +1001,18 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
     
     *fd_st_ptr = (OMPI_MPI_OFFSET_TYPE *)
 	malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); 
+
+    if ( NULL == *fd_st_ptr ) {
+	return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     *fd_end_ptr = (OMPI_MPI_OFFSET_TYPE *)
 	malloc(nprocs_for_coll*sizeof(OMPI_MPI_OFFSET_TYPE)); 
 
+    if ( NULL == *fd_end_ptr ) {
+	return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     
     fd_start = *fd_st_ptr;
     fd_end = *fd_end_ptr;
@@ -1010,7 +1020,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
     
     if (striping_unit > 0){
 	
-	/*Wei-keng Liao's  implementation for field domain alignment to nearest lock boundary. */
+	/* Lock Boundary based domain partitioning */
 	int rem_front, rem_back;
 	OMPI_MPI_OFFSET_TYPE end_off;
 	
@@ -1025,7 +1035,7 @@ int ompi_io_ompio_domain_partition (mca_io_ompio_file_t *fh,
 		end_off += rem_back;
         fd_end[0] = end_off - 1;
     
-	      /* align fd_end[i] to the nearest file lock boundary */
+	/* align fd_end[i] to the nearest file lock boundary */
         for (i=1; i<nprocs_for_coll; i++) {
             fd_start[i] = fd_end[i-1] + 1;
             end_off     = min_st_offset + fd_size * (i+1);
@@ -1129,26 +1139,29 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
     mca_io_ompio_access_array_t *others_req=NULL;
     
     count_others_req_per_proc = (int *)malloc(fh->f_size*sizeof(int));
+
     if ( NULL == count_others_req_per_proc ) {
 	return OMPI_ERR_OUT_OF_RESOURCE;
     }
     
     /* Change it to the ompio specific alltoall in coll module : VVN*/
-    fh->f_comm->c_coll.coll_alltoall (
-	count_my_req_per_proc,
-	1,
-	MPI_INT,
-	count_others_req_per_proc, 
-	1,
-	MPI_INT,
-	fh->f_comm, 
-	fh->f_comm->c_coll.coll_alltoall_module);
+    ret =  fh->f_comm->c_coll.coll_alltoall (count_my_req_per_proc,
+					     1,
+					     MPI_INT,
+					     count_others_req_per_proc, 
+					     1,
+					     MPI_INT,
+					     fh->f_comm, 
+					     fh->f_comm->c_coll.coll_alltoall_module);
+    if ( OMPI_SUCCESS != ret ) {
+	return ret;
+    }
     
 #if 0
     for( i = 0; i< fh->f_size; i++){
 	printf("my: %d, others: %d\n",count_my_req_per_proc[i],
 	       count_others_req_per_proc[i]);
-
+	
     }
 #endif
 
@@ -1176,6 +1189,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
     requests = (MPI_Request *)
 	malloc(1+2*(count_my_req_procs+count_others_req_procs)*
 	       sizeof(MPI_Request)); 
+
     if ( NULL == requests ) {
 	ret = OMPI_ERR_OUT_OF_RESOURCE;
 	goto exit;
@@ -1186,7 +1200,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
 	if (others_req[i].count){
             ret = MCA_PML_CALL(irecv(others_req[i].offsets,
 				     others_req[i].count,
-				     MPI_OFFSET,
+				     MPI_LONG,
 				     i,
 				     i+fh->f_rank,
 				     fh->f_comm,
@@ -1217,7 +1231,7 @@ int ompi_io_ompio_calc_others_requests(mca_io_ompio_file_t *fh,
 	if (my_req[i].count) {
 	    ret = MCA_PML_CALL(isend(my_req[i].offsets,
 				     my_req[i].count,
-				     MPI_OFFSET,
+				     MPI_LONG,
 				     i,
 				     i+fh->f_rank,
 				     MCA_PML_BASE_SEND_STANDARD, 
@@ -1290,15 +1304,23 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
     
     
     *count_my_req_per_proc_ptr = (int*)malloc(fh->f_size*sizeof(int)); 
+    
+    if ( NULL == count_my_req_per_proc_ptr ){
+	return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+
     count_my_req_per_proc = *count_my_req_per_proc_ptr;
     
     for (i=0;i<fh->f_size;i++){
 	count_my_req_per_proc[i] = 0;
     }
-    
-    
-    
+        
     buf_idx = (int *) malloc (fh->f_size * sizeof(int));
+    
+    if ( NULL == buf_idx ){ 
+	return OMPI_ERR_OUT_OF_RESOURCE;
+    }
+    
     for (i=0; i < fh->f_size; i++) buf_idx[i] = -1;
     
     for (i=0;i<contig_access_count; i++){
@@ -1308,8 +1330,7 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
 	off = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_base;
 	fd_len = (OMPI_MPI_OFFSET_TYPE)offset_len[i].iov_len;
 	proc = ompi_io_ompio_calc_aggregator(fh, off, min_st_offset, &fd_len, fd_size, 
-					     fd_start, fd_end, striping_unit, num_aggregators,
-					     aggregator_list);
+					     fd_start, fd_end, striping_unit, num_aggregators,aggregator_list);
 	count_my_req_per_proc[proc]++;
 	rem_len = offset_len[i].iov_len - fd_len;
 	
@@ -1329,6 +1350,9 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
 /*    printf("%d: fh->f_size : %d\n", fh->f_rank,fh->f_size);*/
     *my_req_ptr =  (mca_io_ompio_access_array_t *)
 	malloc (fh->f_size * sizeof(mca_io_ompio_access_array_t));
+    if ( NULL == *my_req_ptr ) {
+	return OMPI_ERR_OUT_OF_RESOURCE;
+    }
     my_req = *my_req_ptr;
     
     count_my_req_procs = 0;
@@ -1336,8 +1360,17 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
 	if(count_my_req_per_proc[i]) {
 	    my_req[i].offsets = (OMPI_MPI_OFFSET_TYPE *)
 		malloc(count_my_req_per_proc[i] * sizeof(OMPI_MPI_OFFSET_TYPE));
+	    
+	    if ( NULL == my_req[i].offsets ) {
+		return OMPI_ERR_OUT_OF_RESOURCE;
+	    }
+
 	    my_req[i].lens = (int *)
 		malloc(count_my_req_per_proc[i] * sizeof(int));
+
+	    if ( NULL == my_req[i].lens ) {
+		return OMPI_ERR_OUT_OF_RESOURCE;
+	    }
 	    count_my_req_procs++;
 	}
 	my_req[i].count = 0; 
@@ -1404,7 +1437,7 @@ int ompi_io_ompio_calc_my_requests (mca_io_ompio_file_t *fh,
     
     *count_my_req_procs_ptr = count_my_req_procs;
     *buf_indices = buf_idx;
-
+    
     return OMPI_SUCCESS;
 }
 /*Two-phase support functions ends here!*/