From ff7e670c6a063bf761fceac016264466fc2c0e73 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Wed, 20 Sep 2006 07:51:30 +0000 Subject: [PATCH] Sends and receives are now fully implemented. Correctly handle the reference counts on groups. Correct the rearrangement of the communicators. Other minors improvements. This commit was SVN r11722. --- ompi/debuggers/mpi_interface.h | 10 +- ompi/debuggers/ompi_dll.c | 167 +++++++++++++++++++++------------ ompi/debuggers/ompi_dll_defs.h | 11 +++ 3 files changed, 122 insertions(+), 66 deletions(-) diff --git a/ompi/debuggers/mpi_interface.h b/ompi/debuggers/mpi_interface.h index cbd70e1849..a91d267a5f 100644 --- a/ompi/debuggers/mpi_interface.h +++ b/ompi/debuggers/mpi_interface.h @@ -188,11 +188,11 @@ typedef long mqs_tword_t; /* Something long enough for a word */ /* A structure for (target) architectural information */ typedef struct { - int short_size; /* sizeof (short) */ - int int_size; /* sizeof (int) */ - int long_size; /* sizeof (long) */ - int long_long_size; /* sizeof (long long) */ - int pointer_size; /* sizeof (void *) */ + int short_size; /* sizeof (short) */ + int int_size; /* sizeof (int) */ + int long_size; /* sizeof (long) */ + int long_long_size; /* sizeof (long long) */ + int pointer_size; /* sizeof (void *) */ } mqs_target_type_sizes; /* Result codes. diff --git a/ompi/debuggers/ompi_dll.c b/ompi/debuggers/ompi_dll.c index 436f7fdfc5..5eff9ad3ee 100644 --- a/ompi/debuggers/ompi_dll.c +++ b/ompi/debuggers/ompi_dll.c @@ -78,6 +78,7 @@ #endif /* defined(HAVE_STDLIB_H) */ #include "ompi/request/request.h" +#include "ompi/mca/pml/base/pml_base_request.h" /* End of inclusion @@ -237,6 +238,7 @@ enum { */ static mqs_taddr_t fetch_pointer (mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info); static mqs_tword_t fetch_int (mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info); +static mqs_tword_t fetch_bool(mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info); /* Internal structure we hold for each communicator */ typedef struct communicator_t @@ -297,7 +299,7 @@ static group_t * find_or_create_group (mqs_process *proc, np = fetch_int( proc, table + i_info->ompi_group_t.offset.grp_proc_count, p_info ); - if( np <= 0 ) { + if( np < 0 ) { printf( "Get a size for the communicator = %d\n", np ); return NULL; /* Makes no sense ! */ } @@ -306,8 +308,6 @@ static group_t * find_or_create_group (mqs_process *proc, g = comm->group; if (g && g->table_base == table) { g->ref_count++; /* Someone else is interested */ - printf( "%s:%d increase ref_count for the group %p to %d\n", __FILE__, __LINE__, - (void*)g, (int)g->ref_count ); return g; } } @@ -335,17 +335,12 @@ static group_t * find_or_create_group (mqs_process *proc, g->entries = np; g->ref_count = 1; - printf( "%s:%d increase ref_count for the group %p to %d\n", __FILE__, __LINE__, - (void*)g, (int)g->ref_count ); - printf( "group with size %d refcount %d\n", g->entries, g->ref_count ); return g; } /* find_or_create_group */ /***********************************************************************/ static void group_decref (group_t * group) { - printf( "%s:%d decrease ref_count for the group %p to %d\n", __FILE__, __LINE__, - (void*)group, (int)(group->ref_count - 1) ); if (--(group->ref_count) == 0) { mqs_free (group->local_to_global); mqs_free (group); @@ -502,6 +497,7 @@ int mqs_image_has_queues (mqs_image *image, char **message) i_info->mca_pml_base_request_t.offset.req_comm = mqs_field_offset(qh_type, "req_comm"); i_info->mca_pml_base_request_t.offset.req_proc = mqs_field_offset(qh_type, "req_proc"); i_info->mca_pml_base_request_t.offset.req_sequence = mqs_field_offset(qh_type, "req_sequence"); + i_info->mca_pml_base_request_t.offset.req_type = mqs_field_offset(qh_type, "req_type"); } { mqs_type* qh_type = mqs_find_type( image, "mca_pml_base_send_request_t", mqs_lang_c ); @@ -560,6 +556,19 @@ int mqs_image_has_queues (mqs_image *image, char **message) i_info->ompi_group_t.offset.grp_my_rank = mqs_field_offset(qh_type, "grp_my_rank"); i_info->ompi_group_t.offset.grp_flags = mqs_field_offset(qh_type, "grp_flags" ); } + { + mqs_type* qh_type = mqs_find_type( image, "ompi_status_public_t", mqs_lang_c ); + if( !qh_type ) { + missing_in_action = "ompi_status_public_t"; + goto type_missing; + } + i_info->ompi_status_public_t.size = mqs_sizeof(qh_type); + i_info->ompi_status_public_t.offset.MPI_SOURCE = mqs_field_offset(qh_type, "MPI_SOURCE"); + i_info->ompi_status_public_t.offset.MPI_TAG = mqs_field_offset(qh_type, "MPI_TAG"); + i_info->ompi_status_public_t.offset.MPI_ERROR = mqs_field_offset(qh_type, "MPI_ERROR" ); + i_info->ompi_status_public_t.offset._count = mqs_field_offset(qh_type, "_count" ); + i_info->ompi_status_public_t.offset._cancelled = mqs_field_offset(qh_type, "_cancelled" ); + } /* All the types are here. Let's succesfully return. */ return mqs_ok; @@ -637,6 +646,7 @@ int mqs_process_has_queues (mqs_process *proc, char **msg) */ static int communicators_changed (mqs_process *proc) { +#if 0 /* TODO: how do we figure out which communicators have changed ? */ mpi_process_info *p_info = (mpi_process_info *)mqs_get_process_info (proc); mqs_image * image = mqs_get_image (proc); mpi_image_info *i_info = (mpi_image_info *)mqs_get_image_info (image); @@ -649,6 +659,8 @@ static int communicators_changed (mqs_process *proc) p_info->communicator_sequence = new_seq; return res; +#endif + return 1; } /* mqs_communicators_changed */ /*********************************************************************** @@ -703,7 +715,6 @@ static int rebuild_communicator_list (mqs_process *proc) comm_size = fetch_int( proc, p_info->commlist_base + i_info->ompi_pointer_array_t.offset.size, p_info ); - printf( "=> %d active communicators\n", (int)comm_size ); /* Now get the pointer to the first communicator pointer */ comm_addr_base = fetch_pointer( proc, comm_addr_base, p_info ); @@ -733,9 +744,8 @@ static int rebuild_communicator_list (mqs_process *proc) old = (communicator_t *)mqs_malloc (sizeof (communicator_t)); /* Save the results */ old->next = p_info->communicator_list; - old->comm_ptr = comm_ptr; p_info->communicator_list = old; - old->group = NULL; + old->comm_ptr = comm_ptr; old->recv_context = remote_comm.unique_id; /* Now get the information about the group */ @@ -748,9 +758,7 @@ static int rebuild_communicator_list (mqs_process *proc) strncpy(old->comm_info.name, remote_comm.name, 64); old->comm_info.unique_id = remote_comm.unique_id; old->comm_info.local_rank = remote_comm.local_rank; - if( NULL == old->group ) { - printf( "Found empty group\n" ); - } else { + if( NULL != old->group ) { old->comm_info.size = old->group->entries; } old->present = TRUE; @@ -763,7 +771,6 @@ static int rebuild_communicator_list (mqs_process *proc) for (; *commp; commp = &(*commp)->next) { communicator_t *comm = *commp; - if (comm->present) { comm->present = FALSE; commcount++; @@ -771,6 +778,7 @@ static int rebuild_communicator_list (mqs_process *proc) *commp = comm->next; /* Remove from the list */ group_decref (comm->group); /* Group is no longer referenced from here */ mqs_free (comm); + if( *commp == NULL ) break; } } @@ -786,7 +794,7 @@ static int rebuild_communicator_list (mqs_process *proc) /* Do the sort */ qsort (comm_array, commcount, sizeof (communicator_t *), compare_comms); - /* Re build the list */ + /* Rebuild the list */ p_info->communicator_list = NULL; for (i=0; icurrent_item = active_allocation; - ompi_free_list_t_dump_position( position ); + /*ompi_free_list_t_dump_position( position );*/ return mqs_ok; } -#include /* TODO: remove me when the sleep is not required */ - /** * Return the current position and move the internal counter to the next element. */ @@ -1028,7 +1034,7 @@ static int ompi_free_list_t_next_item( mqs_process *proc, mpi_process_info *p_in position->current_item += position->fl_elem_size; if( position->current_item >= position->upper_bound ) { - printf( "Reach the end of one of the ompi_free_list_t allocations. Go to the next one\n" ); + /*printf( "Reach the end of one of the ompi_free_list_t allocations. Go to the next one\n" );*/ /* we should go to the next allocation */ next_item_opal_list_t( proc, p_info, &position->opal_list_t_pos, &active_allocation ); @@ -1054,32 +1060,38 @@ static int ompi_free_list_t_next_item( mqs_process *proc, mpi_process_info *p_in position->upper_bound = position->fl_num_per_alloc * position->fl_elem_size + active_allocation; position->current_item = active_allocation; - ompi_free_list_t_dump_position( position ); + /*ompi_free_list_t_dump_position( position );*/ } - printf( "Free list actual position %p next element at %p\n", (void*)*active_item, - (void*)position->current_item ); + /*printf( "Free list actual position %p next element at %p\n", (void*)*active_item, + (void*)position->current_item );*/ return mqs_ok; } static void dump_request( mqs_taddr_t current_item, mqs_pending_operation *res ) { - printf( "\n=====================================\n" ); - printf( "Request 0x%llx contain \n", (long long)current_item ); - printf( "\tres->status = %d\n", res->status ); - printf( "\tres->desired_local_rank = %ld\n", (long)res->desired_local_rank ); - printf( "\tres->desired_global_rank = %ld\n", (long)res->desired_global_rank ); - printf( "\tres->tag_wild = %ld\n", (long)res->tag_wild ); - printf( "\tres->desired_tag = %ld\n", (long)res->desired_tag ); - printf( "\tres->system_buffer = %s\n", (TRUE == res->system_buffer ? "TRUE" : "FALSE") ); - printf( "\tres->buffer = 0x%llx\n", (long long)res->buffer ); - printf( "\tres->desired_length = %ld\n", (long)res->desired_length ); - printf( "\tres->actual_length = %ld\n", (long)res->actual_length ); - printf( "\tres->actual_tag = %ld\n", (long)res->actual_tag ); - printf( "\tres->actual_local_rank = %ld\n", (long)res->actual_local_rank ); - printf( "\tres->actual_global_rank = %ld\n", (long)res->actual_global_rank ); - printf( "=====================================\n\n" ); + printf( "\n+===============================================+\n" ); + printf( "|Request 0x%llx contain \n", (long long)current_item ); + printf( "| res->status = %d\n", res->status ); + printf( "| res->desired_local_rank = %ld\n", (long)res->desired_local_rank ); + printf( "| res->desired_global_rank = %ld\n", (long)res->desired_global_rank ); + printf( "| res->tag_wild = %ld\n", (long)res->tag_wild ); + printf( "| res->desired_tag = %ld\n", (long)res->desired_tag ); + printf( "| res->system_buffer = %s\n", (TRUE == res->system_buffer ? "TRUE" : "FALSE") ); + printf( "| res->buffer = 0x%llx\n", (long long)res->buffer ); + printf( "| res->desired_length = %ld\n", (long)res->desired_length ); + if( res->status != mqs_st_pending ) { + printf( "| res->actual_length = %ld\n", (long)res->actual_length ); + printf( "| res->actual_tag = %ld\n", (long)res->actual_tag ); + printf( "| res->actual_local_rank = %ld\n", (long)res->actual_local_rank ); + printf( "| res->actual_global_rank = %ld\n", (long)res->actual_global_rank ); + } + printf( "+===============================================+\n\n" ); } +/** + * TODO: ompi_request_completed can be used to detect any changes in the request handles. + */ + /** * Handle the send queue as well as the receive queue. The unexpected queue * is a whole different story ... @@ -1104,35 +1116,57 @@ static int fetch_request( mqs_process *proc, mpi_process_info *p_info, if( p_info->current_communicator->comm_ptr == req_comm ) break; } + res->extra_text[0][0] = 0; res->extra_text[1][0] = 0; res->extra_text[2][0] = 0; + res->extra_text[3][0] = 0; res->extra_text[4][0] = 0; + req_type = fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_type, p_info ); if( OMPI_REQUEST_PML == req_type ) { - req_complete = fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_complete, p_info ); + req_type = + fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_type, + p_info); + req_complete = fetch_bool( proc, current_item + i_info->ompi_request_t.offset.req_complete, p_info ); res->status = (req_complete == 0 ? mqs_st_pending : mqs_st_complete); + res->desired_local_rank = fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_peer, p_info ); res->desired_global_rank = res->desired_local_rank; res->desired_tag = fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_tag, p_info ); - res->tag_wild = res->desired_tag; + res->tag_wild = (MPI_ANY_TAG == res->desired_tag ? TRUE : FALSE); - req_buffer = fetch_pointer( proc, current_item + i_info->mca_pml_base_request_t.offset.req_addr, + res->buffer = fetch_pointer( proc, current_item + i_info->mca_pml_base_request_t.offset.req_addr, p_info ); - res->buffer = - fetch_pointer( proc, current_item + i_info->mca_pml_base_send_request_t.offset.req_addr, - p_info ); - if( req_buffer == res->buffer ) { - res->system_buffer = TRUE; + res->system_buffer = FALSE; + if( MCA_PML_REQUEST_SEND == req_type ) { + snprintf( (char *)res->extra_text[0], 64, "Non-blocking send 0x%llx", (long long)current_item ); + req_buffer = + fetch_pointer( proc, current_item + i_info->mca_pml_base_send_request_t.offset.req_addr, + p_info ); + res->system_buffer = ( req_buffer == res->buffer ? FALSE : TRUE ); + res->desired_length = + fetch_int( proc, + current_item + i_info->mca_pml_base_send_request_t.offset.req_bytes_packed, p_info ); + } else if( MCA_PML_REQUEST_RECV == req_type ) { + snprintf( (char *)res->extra_text[0], 64, "Non-blocking recv 0x%llx", (long long)current_item ); } else { - res->system_buffer = FALSE; + snprintf( (char *)res->extra_text[0], 64, "Unknown type of request 0x%llx", (long long)current_item ); } - res->desired_length = fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_count, p_info ); + res->desired_length = + fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_count, p_info ); - res->actual_length = 1; - res->actual_tag = 0x1111; - res->actual_local_rank = -1; - res->actual_global_rank = -1; - - dump_request( current_item, res ); + if( mqs_st_pending != res->status ) { /* The real data from the status */ + res->actual_length = + fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status + + i_info->ompi_status_public_t.offset._count, p_info ); + res->actual_tag = + fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status + + i_info->ompi_status_public_t.offset.MPI_TAG, p_info ); + res->actual_local_rank = + fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status + + i_info->ompi_status_public_t.offset.MPI_SOURCE, p_info ); + res->actual_global_rank = res->actual_local_rank; /* TODO: what's the global rank ? */ + } + /*dump_request( current_item, res );*/ } return mqs_ok; } @@ -1294,12 +1328,10 @@ int mqs_setup_operation_iterator (mqs_process *proc, int op) switch (op) { case mqs_pending_sends: - printf( "prepare the send queue\n" ); ompi_free_list_t_init_parser( proc, p_info, &p_info->next_msg, p_info->send_queue_base ); return mqs_ok; case mqs_pending_receives: - printf( "prepare the receive queue\n" ); ompi_free_list_t_init_parser( proc, p_info, &p_info->next_msg, p_info->recv_queue_base ); return mqs_ok; @@ -1323,13 +1355,12 @@ int mqs_next_operation (mqs_process *proc, mqs_pending_operation *op) switch (p_info->what) { case mqs_pending_receives: - printf( "Go for the next receive\n" ); return fetch_request( proc, p_info, op, TRUE ); case mqs_unexpected_messages: - printf( "Go or the next send\n" ); - return fetch_request( proc, p_info, op, FALSE ); - case mqs_pending_sends: /* TODO: not handled yet */ + return err_bad_request; + case mqs_pending_sends: + return fetch_request( proc, p_info, op, FALSE ); default: return err_bad_request; } } /* mqs_next_operation */ @@ -1346,7 +1377,8 @@ void mqs_destroy_process_info (mqs_process_info *mp_info) while (comm) { communicator_t *next = comm->next; - group_decref (comm->group); /* Group is no longer referenced from here */ + if( NULL != comm->group ) + group_decref (comm->group); /* Group is no longer referenced from here */ mqs_free (comm); comm = next; @@ -1393,6 +1425,19 @@ static mqs_tword_t fetch_int (mqs_process * proc, mqs_taddr_t addr, mpi_process_ return res; } /* fetch_int */ +/***********************************************************************/ +static mqs_tword_t fetch_bool(mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info) +{ + int isize = 1; + char buffer; /* ASSUME an integer fits in 8 bytes */ + mqs_tword_t res = 0; + + if (mqs_ok == mqs_fetch_data (proc, addr, isize, &buffer)) + res = (mqs_tword_t)buffer; + + return res; +} /* fetch_bool */ + /***********************************************************************/ /* Convert an error code into a printable string */ char * mqs_dll_error_string (int errcode) diff --git a/ompi/debuggers/ompi_dll_defs.h b/ompi/debuggers/ompi_dll_defs.h index 1bdfc2a715..be098ccbfe 100644 --- a/ompi/debuggers/ompi_dll_defs.h +++ b/ompi/debuggers/ompi_dll_defs.h @@ -86,6 +86,7 @@ typedef struct int req_comm; int req_proc; int req_sequence; + int req_type; } offset; } mca_pml_base_request_t; struct { @@ -128,6 +129,16 @@ typedef struct int c_local_group; } offset; } ompi_communicator_t; + struct { + int size; + struct { + int MPI_SOURCE; + int MPI_TAG; + int MPI_ERROR; + int _count; + int _cancelled; + } offset; + } ompi_status_public_t; /* Fields in MPID_QHDR */ int unexpected_offs;