1
1

Sends and receives are now fully implemented. Correctly handle the reference counts

on groups. Correct the rearrangement of the communicators. Other minors improvements.

This commit was SVN r11722.
Этот коммит содержится в:
George Bosilca 2006-09-20 07:51:30 +00:00
родитель 10a230373b
Коммит ff7e670c6a
3 изменённых файлов: 122 добавлений и 66 удалений

Просмотреть файл

@ -188,11 +188,11 @@ typedef long mqs_tword_t; /* Something long enough for a word */
/* A structure for (target) architectural information */
typedef struct
{
int short_size; /* sizeof (short) */
int int_size; /* sizeof (int) */
int long_size; /* sizeof (long) */
int long_long_size; /* sizeof (long long) */
int pointer_size; /* sizeof (void *) */
int short_size; /* sizeof (short) */
int int_size; /* sizeof (int) */
int long_size; /* sizeof (long) */
int long_long_size; /* sizeof (long long) */
int pointer_size; /* sizeof (void *) */
} mqs_target_type_sizes;
/* Result codes.

Просмотреть файл

@ -78,6 +78,7 @@
#endif /* defined(HAVE_STDLIB_H) */
#include "ompi/request/request.h"
#include "ompi/mca/pml/base/pml_base_request.h"
/*
End of inclusion
@ -237,6 +238,7 @@ enum {
*/
static mqs_taddr_t fetch_pointer (mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info);
static mqs_tword_t fetch_int (mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info);
static mqs_tword_t fetch_bool(mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info);
/* Internal structure we hold for each communicator */
typedef struct communicator_t
@ -297,7 +299,7 @@ static group_t * find_or_create_group (mqs_process *proc,
np = fetch_int( proc,
table + i_info->ompi_group_t.offset.grp_proc_count,
p_info );
if( np <= 0 ) {
if( np < 0 ) {
printf( "Get a size for the communicator = %d\n", np );
return NULL; /* Makes no sense ! */
}
@ -306,8 +308,6 @@ static group_t * find_or_create_group (mqs_process *proc,
g = comm->group;
if (g && g->table_base == table) {
g->ref_count++; /* Someone else is interested */
printf( "%s:%d increase ref_count for the group %p to %d\n", __FILE__, __LINE__,
(void*)g, (int)g->ref_count );
return g;
}
}
@ -335,17 +335,12 @@ static group_t * find_or_create_group (mqs_process *proc,
g->entries = np;
g->ref_count = 1;
printf( "%s:%d increase ref_count for the group %p to %d\n", __FILE__, __LINE__,
(void*)g, (int)g->ref_count );
printf( "group with size %d refcount %d\n", g->entries, g->ref_count );
return g;
} /* find_or_create_group */
/***********************************************************************/
static void group_decref (group_t * group)
{
printf( "%s:%d decrease ref_count for the group %p to %d\n", __FILE__, __LINE__,
(void*)group, (int)(group->ref_count - 1) );
if (--(group->ref_count) == 0) {
mqs_free (group->local_to_global);
mqs_free (group);
@ -502,6 +497,7 @@ int mqs_image_has_queues (mqs_image *image, char **message)
i_info->mca_pml_base_request_t.offset.req_comm = mqs_field_offset(qh_type, "req_comm");
i_info->mca_pml_base_request_t.offset.req_proc = mqs_field_offset(qh_type, "req_proc");
i_info->mca_pml_base_request_t.offset.req_sequence = mqs_field_offset(qh_type, "req_sequence");
i_info->mca_pml_base_request_t.offset.req_type = mqs_field_offset(qh_type, "req_type");
}
{
mqs_type* qh_type = mqs_find_type( image, "mca_pml_base_send_request_t", mqs_lang_c );
@ -560,6 +556,19 @@ int mqs_image_has_queues (mqs_image *image, char **message)
i_info->ompi_group_t.offset.grp_my_rank = mqs_field_offset(qh_type, "grp_my_rank");
i_info->ompi_group_t.offset.grp_flags = mqs_field_offset(qh_type, "grp_flags" );
}
{
mqs_type* qh_type = mqs_find_type( image, "ompi_status_public_t", mqs_lang_c );
if( !qh_type ) {
missing_in_action = "ompi_status_public_t";
goto type_missing;
}
i_info->ompi_status_public_t.size = mqs_sizeof(qh_type);
i_info->ompi_status_public_t.offset.MPI_SOURCE = mqs_field_offset(qh_type, "MPI_SOURCE");
i_info->ompi_status_public_t.offset.MPI_TAG = mqs_field_offset(qh_type, "MPI_TAG");
i_info->ompi_status_public_t.offset.MPI_ERROR = mqs_field_offset(qh_type, "MPI_ERROR" );
i_info->ompi_status_public_t.offset._count = mqs_field_offset(qh_type, "_count" );
i_info->ompi_status_public_t.offset._cancelled = mqs_field_offset(qh_type, "_cancelled" );
}
/* All the types are here. Let's succesfully return. */
return mqs_ok;
@ -637,6 +646,7 @@ int mqs_process_has_queues (mqs_process *proc, char **msg)
*/
static int communicators_changed (mqs_process *proc)
{
#if 0 /* TODO: how do we figure out which communicators have changed ? */
mpi_process_info *p_info = (mpi_process_info *)mqs_get_process_info (proc);
mqs_image * image = mqs_get_image (proc);
mpi_image_info *i_info = (mpi_image_info *)mqs_get_image_info (image);
@ -649,6 +659,8 @@ static int communicators_changed (mqs_process *proc)
p_info->communicator_sequence = new_seq;
return res;
#endif
return 1;
} /* mqs_communicators_changed */
/***********************************************************************
@ -703,7 +715,6 @@ static int rebuild_communicator_list (mqs_process *proc)
comm_size = fetch_int( proc,
p_info->commlist_base + i_info->ompi_pointer_array_t.offset.size,
p_info );
printf( "=> %d active communicators\n", (int)comm_size );
/* Now get the pointer to the first communicator pointer */
comm_addr_base = fetch_pointer( proc, comm_addr_base, p_info );
@ -733,9 +744,8 @@ static int rebuild_communicator_list (mqs_process *proc)
old = (communicator_t *)mqs_malloc (sizeof (communicator_t));
/* Save the results */
old->next = p_info->communicator_list;
old->comm_ptr = comm_ptr;
p_info->communicator_list = old;
old->group = NULL;
old->comm_ptr = comm_ptr;
old->recv_context = remote_comm.unique_id;
/* Now get the information about the group */
@ -748,9 +758,7 @@ static int rebuild_communicator_list (mqs_process *proc)
strncpy(old->comm_info.name, remote_comm.name, 64);
old->comm_info.unique_id = remote_comm.unique_id;
old->comm_info.local_rank = remote_comm.local_rank;
if( NULL == old->group ) {
printf( "Found empty group\n" );
} else {
if( NULL != old->group ) {
old->comm_info.size = old->group->entries;
}
old->present = TRUE;
@ -763,7 +771,6 @@ static int rebuild_communicator_list (mqs_process *proc)
for (; *commp; commp = &(*commp)->next) {
communicator_t *comm = *commp;
if (comm->present) {
comm->present = FALSE;
commcount++;
@ -771,6 +778,7 @@ static int rebuild_communicator_list (mqs_process *proc)
*commp = comm->next; /* Remove from the list */
group_decref (comm->group); /* Group is no longer referenced from here */
mqs_free (comm);
if( *commp == NULL ) break;
}
}
@ -786,7 +794,7 @@ static int rebuild_communicator_list (mqs_process *proc)
/* Do the sort */
qsort (comm_array, commcount, sizeof (communicator_t *), compare_comms);
/* Re build the list */
/* Rebuild the list */
p_info->communicator_list = NULL;
for (i=0; i<commcount; i++) {
comm = comm_array[i];
@ -1006,12 +1014,10 @@ static int ompi_free_list_t_init_parser( mqs_process *proc, mpi_process_info *p_
}
position->current_item = active_allocation;
ompi_free_list_t_dump_position( position );
/*ompi_free_list_t_dump_position( position );*/
return mqs_ok;
}
#include <unistd.h> /* TODO: remove me when the sleep is not required */
/**
* Return the current position and move the internal counter to the next element.
*/
@ -1028,7 +1034,7 @@ static int ompi_free_list_t_next_item( mqs_process *proc, mpi_process_info *p_in
position->current_item += position->fl_elem_size;
if( position->current_item >= position->upper_bound ) {
printf( "Reach the end of one of the ompi_free_list_t allocations. Go to the next one\n" );
/*printf( "Reach the end of one of the ompi_free_list_t allocations. Go to the next one\n" );*/
/* we should go to the next allocation */
next_item_opal_list_t( proc, p_info,
&position->opal_list_t_pos, &active_allocation );
@ -1054,32 +1060,38 @@ static int ompi_free_list_t_next_item( mqs_process *proc, mpi_process_info *p_in
position->upper_bound =
position->fl_num_per_alloc * position->fl_elem_size + active_allocation;
position->current_item = active_allocation;
ompi_free_list_t_dump_position( position );
/*ompi_free_list_t_dump_position( position );*/
}
printf( "Free list actual position %p next element at %p\n", (void*)*active_item,
(void*)position->current_item );
/*printf( "Free list actual position %p next element at %p\n", (void*)*active_item,
(void*)position->current_item );*/
return mqs_ok;
}
static void dump_request( mqs_taddr_t current_item, mqs_pending_operation *res )
{
printf( "\n=====================================\n" );
printf( "Request 0x%llx contain \n", (long long)current_item );
printf( "\tres->status = %d\n", res->status );
printf( "\tres->desired_local_rank = %ld\n", (long)res->desired_local_rank );
printf( "\tres->desired_global_rank = %ld\n", (long)res->desired_global_rank );
printf( "\tres->tag_wild = %ld\n", (long)res->tag_wild );
printf( "\tres->desired_tag = %ld\n", (long)res->desired_tag );
printf( "\tres->system_buffer = %s\n", (TRUE == res->system_buffer ? "TRUE" : "FALSE") );
printf( "\tres->buffer = 0x%llx\n", (long long)res->buffer );
printf( "\tres->desired_length = %ld\n", (long)res->desired_length );
printf( "\tres->actual_length = %ld\n", (long)res->actual_length );
printf( "\tres->actual_tag = %ld\n", (long)res->actual_tag );
printf( "\tres->actual_local_rank = %ld\n", (long)res->actual_local_rank );
printf( "\tres->actual_global_rank = %ld\n", (long)res->actual_global_rank );
printf( "=====================================\n\n" );
printf( "\n+===============================================+\n" );
printf( "|Request 0x%llx contain \n", (long long)current_item );
printf( "| res->status = %d\n", res->status );
printf( "| res->desired_local_rank = %ld\n", (long)res->desired_local_rank );
printf( "| res->desired_global_rank = %ld\n", (long)res->desired_global_rank );
printf( "| res->tag_wild = %ld\n", (long)res->tag_wild );
printf( "| res->desired_tag = %ld\n", (long)res->desired_tag );
printf( "| res->system_buffer = %s\n", (TRUE == res->system_buffer ? "TRUE" : "FALSE") );
printf( "| res->buffer = 0x%llx\n", (long long)res->buffer );
printf( "| res->desired_length = %ld\n", (long)res->desired_length );
if( res->status != mqs_st_pending ) {
printf( "| res->actual_length = %ld\n", (long)res->actual_length );
printf( "| res->actual_tag = %ld\n", (long)res->actual_tag );
printf( "| res->actual_local_rank = %ld\n", (long)res->actual_local_rank );
printf( "| res->actual_global_rank = %ld\n", (long)res->actual_global_rank );
}
printf( "+===============================================+\n\n" );
}
/**
* TODO: ompi_request_completed can be used to detect any changes in the request handles.
*/
/**
* Handle the send queue as well as the receive queue. The unexpected queue
* is a whole different story ...
@ -1104,35 +1116,57 @@ static int fetch_request( mqs_process *proc, mpi_process_info *p_info,
if( p_info->current_communicator->comm_ptr == req_comm ) break;
}
res->extra_text[0][0] = 0; res->extra_text[1][0] = 0; res->extra_text[2][0] = 0;
res->extra_text[3][0] = 0; res->extra_text[4][0] = 0;
req_type = fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_type, p_info );
if( OMPI_REQUEST_PML == req_type ) {
req_complete = fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_complete, p_info );
req_type =
fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_type,
p_info);
req_complete = fetch_bool( proc, current_item + i_info->ompi_request_t.offset.req_complete, p_info );
res->status = (req_complete == 0 ? mqs_st_pending : mqs_st_complete);
res->desired_local_rank =
fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_peer, p_info );
res->desired_global_rank = res->desired_local_rank;
res->desired_tag =
fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_tag, p_info );
res->tag_wild = res->desired_tag;
res->tag_wild = (MPI_ANY_TAG == res->desired_tag ? TRUE : FALSE);
req_buffer = fetch_pointer( proc, current_item + i_info->mca_pml_base_request_t.offset.req_addr,
res->buffer = fetch_pointer( proc, current_item + i_info->mca_pml_base_request_t.offset.req_addr,
p_info );
res->buffer =
fetch_pointer( proc, current_item + i_info->mca_pml_base_send_request_t.offset.req_addr,
p_info );
if( req_buffer == res->buffer ) {
res->system_buffer = TRUE;
res->system_buffer = FALSE;
if( MCA_PML_REQUEST_SEND == req_type ) {
snprintf( (char *)res->extra_text[0], 64, "Non-blocking send 0x%llx", (long long)current_item );
req_buffer =
fetch_pointer( proc, current_item + i_info->mca_pml_base_send_request_t.offset.req_addr,
p_info );
res->system_buffer = ( req_buffer == res->buffer ? FALSE : TRUE );
res->desired_length =
fetch_int( proc,
current_item + i_info->mca_pml_base_send_request_t.offset.req_bytes_packed, p_info );
} else if( MCA_PML_REQUEST_RECV == req_type ) {
snprintf( (char *)res->extra_text[0], 64, "Non-blocking recv 0x%llx", (long long)current_item );
} else {
res->system_buffer = FALSE;
snprintf( (char *)res->extra_text[0], 64, "Unknown type of request 0x%llx", (long long)current_item );
}
res->desired_length = fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_count, p_info );
res->desired_length =
fetch_int( proc, current_item + i_info->mca_pml_base_request_t.offset.req_count, p_info );
res->actual_length = 1;
res->actual_tag = 0x1111;
res->actual_local_rank = -1;
res->actual_global_rank = -1;
dump_request( current_item, res );
if( mqs_st_pending != res->status ) { /* The real data from the status */
res->actual_length =
fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status +
i_info->ompi_status_public_t.offset._count, p_info );
res->actual_tag =
fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status +
i_info->ompi_status_public_t.offset.MPI_TAG, p_info );
res->actual_local_rank =
fetch_int( proc, current_item + i_info->ompi_request_t.offset.req_status +
i_info->ompi_status_public_t.offset.MPI_SOURCE, p_info );
res->actual_global_rank = res->actual_local_rank; /* TODO: what's the global rank ? */
}
/*dump_request( current_item, res );*/
}
return mqs_ok;
}
@ -1294,12 +1328,10 @@ int mqs_setup_operation_iterator (mqs_process *proc, int op)
switch (op) {
case mqs_pending_sends:
printf( "prepare the send queue\n" );
ompi_free_list_t_init_parser( proc, p_info, &p_info->next_msg, p_info->send_queue_base );
return mqs_ok;
case mqs_pending_receives:
printf( "prepare the receive queue\n" );
ompi_free_list_t_init_parser( proc, p_info, &p_info->next_msg, p_info->recv_queue_base );
return mqs_ok;
@ -1323,13 +1355,12 @@ int mqs_next_operation (mqs_process *proc, mqs_pending_operation *op)
switch (p_info->what) {
case mqs_pending_receives:
printf( "Go for the next receive\n" );
return fetch_request( proc, p_info, op, TRUE );
case mqs_unexpected_messages:
printf( "Go or the next send\n" );
return fetch_request( proc, p_info, op, FALSE );
case mqs_pending_sends:
/* TODO: not handled yet */
return err_bad_request;
case mqs_pending_sends:
return fetch_request( proc, p_info, op, FALSE );
default: return err_bad_request;
}
} /* mqs_next_operation */
@ -1346,7 +1377,8 @@ void mqs_destroy_process_info (mqs_process_info *mp_info)
while (comm) {
communicator_t *next = comm->next;
group_decref (comm->group); /* Group is no longer referenced from here */
if( NULL != comm->group )
group_decref (comm->group); /* Group is no longer referenced from here */
mqs_free (comm);
comm = next;
@ -1393,6 +1425,19 @@ static mqs_tword_t fetch_int (mqs_process * proc, mqs_taddr_t addr, mpi_process_
return res;
} /* fetch_int */
/***********************************************************************/
static mqs_tword_t fetch_bool(mqs_process * proc, mqs_taddr_t addr, mpi_process_info *p_info)
{
int isize = 1;
char buffer; /* ASSUME an integer fits in 8 bytes */
mqs_tword_t res = 0;
if (mqs_ok == mqs_fetch_data (proc, addr, isize, &buffer))
res = (mqs_tword_t)buffer;
return res;
} /* fetch_bool */
/***********************************************************************/
/* Convert an error code into a printable string */
char * mqs_dll_error_string (int errcode)

Просмотреть файл

@ -86,6 +86,7 @@ typedef struct
int req_comm;
int req_proc;
int req_sequence;
int req_type;
} offset;
} mca_pml_base_request_t;
struct {
@ -128,6 +129,16 @@ typedef struct
int c_local_group;
} offset;
} ompi_communicator_t;
struct {
int size;
struct {
int MPI_SOURCE;
int MPI_TAG;
int MPI_ERROR;
int _count;
int _cancelled;
} offset;
} ompi_status_public_t;
/* Fields in MPID_QHDR */
int unexpected_offs;