diff --git a/ompi/mca/btl/mx/btl_mx.c b/ompi/mca/btl/mx/btl_mx.c index 4b23bf3869..df3f180b86 100644 --- a/ompi/mca/btl/mx/btl_mx.c +++ b/ompi/mca/btl/mx/btl_mx.c @@ -61,53 +61,33 @@ mca_btl_mx_module_t mca_btl_mx_module = { /** * */ - -int mca_btl_mx_add_procs( - struct mca_btl_base_module_t* btl, - size_t nprocs, - struct ompi_proc_t **ompi_procs, - struct mca_btl_base_endpoint_t** peers, - ompi_bitmap_t* reachable) +int mca_btl_mx_add_procs( struct mca_btl_base_module_t* btl, + size_t nprocs, + struct ompi_proc_t **ompi_procs, + struct mca_btl_base_endpoint_t** peers, + ompi_bitmap_t* reachable ) { mca_btl_mx_module_t* mx_btl = (mca_btl_mx_module_t*)btl; - int i, rc, index; + int i, rc; - /* MX seems to not be very scalable if all the processes start to connect in - * same time to the same destinattion. We can help it here if we first compute - * our rank in the list, and then we setup the connections starting with - * the next processor in the list in a round-robin fashion. - */ - for( i = 0; i < (int)nprocs; i++ ) { - if( ompi_procs[i] == ompi_proc_local_proc ) - break; - } - for( i = i % nprocs, index = 0; index < (int) nprocs; index++, i = (i + 1) % nprocs ) { + for( i = 0; i < (int) nprocs; i++ ) { struct ompi_proc_t* ompi_proc = ompi_procs[i]; mca_btl_mx_proc_t* mx_proc; mca_btl_base_endpoint_t* mx_endpoint; - if( ompi_procs[i] == ompi_proc_local_proc) { - /* Do not alllow to connect to ourselfs ... */ - continue; - } - if( (0 == mca_btl_mx_component.mx_support_sharedmem) && - (ompi_procs[i]->proc_flags & OMPI_PROC_FLAG_LOCAL) ) { - /* Do not use MX for any of the procs on the same node, - * let the SM device handle that by now - */ - continue; - } - - if(NULL == (mx_proc = mca_btl_mx_proc_create(ompi_proc))) { - continue; - } - - /* - * Check to make sure that the peer has at least as many interface - * addresses exported as we are trying to use. If not, then - * don't bind this PTL instance to the proc. + /* We have special BTLs for processes on the same node as well as for all communications + * inside the same process. Therefore, MX will not be used for any of them. */ + if( (ompi_procs[i] == ompi_proc_local_proc) || + ( (0 == mca_btl_mx_component.mx_support_sharedmem) && + (ompi_procs[i]->proc_flags & OMPI_PROC_FLAG_LOCAL) ) ) { + continue; + } + + if( NULL == (mx_proc = mca_btl_mx_proc_create(ompi_proc)) ) { + continue; + } OPAL_THREAD_LOCK(&mx_proc->proc_lock); @@ -122,14 +102,13 @@ int mca_btl_mx_add_procs( } mx_endpoint->endpoint_btl = mx_btl; - rc = mca_btl_mx_proc_insert(mx_proc, mx_endpoint); - if(rc != OMPI_SUCCESS) { + rc = mca_btl_mx_proc_insert( mx_proc, mx_endpoint ); + if( rc != OMPI_SUCCESS ) { OBJ_RELEASE(mx_endpoint); OBJ_RELEASE(mx_proc); OPAL_THREAD_UNLOCK(&mx_proc->proc_lock); continue; } - ompi_bitmap_set_bit(reachable, i); OPAL_THREAD_UNLOCK(&mx_proc->proc_lock); peers[i] = mx_endpoint; @@ -601,6 +580,12 @@ int mca_btl_mx_send( mx_return_t mx_return; uint64_t total_length; + if( MCA_BTL_MX_CONNECTED != ((mca_btl_mx_endpoint_t*)endpoint)->endpoint_proc->status ) { + if( MCA_BTL_MX_NOT_REACHEABLE == ((mca_btl_mx_endpoint_t*)endpoint)->endpoint_proc->status ) + return OMPI_ERROR; + mca_btl_mx_proc_connect( (mca_btl_mx_endpoint_t*)endpoint ); + } + frag->endpoint = endpoint; frag->tag = tag; mx_segment[0].segment_ptr = descriptor->des_src[0].seg_addr.pval; diff --git a/ompi/mca/btl/mx/btl_mx.h b/ompi/mca/btl/mx/btl_mx.h index 22d10afa77..a991652dc6 100644 --- a/ompi/mca/btl/mx/btl_mx.h +++ b/ompi/mca/btl/mx/btl_mx.h @@ -78,6 +78,7 @@ struct mca_btl_mx_component_t { int32_t mx_filter; int32_t mx_timeout; + int32_t mx_connection_retries; ompi_free_list_t mx_send_eager_frags; /**< free list of mx eager send fragments */ ompi_free_list_t mx_send_user_frags; /**< free list of mx user send fragments */ diff --git a/ompi/mca/btl/mx/btl_mx_component.c b/ompi/mca/btl/mx/btl_mx_component.c index de33d6e433..73987e536e 100644 --- a/ompi/mca/btl/mx/btl_mx_component.c +++ b/ompi/mca/btl/mx/btl_mx_component.c @@ -109,6 +109,9 @@ int mca_btl_mx_component_open(void) mca_base_param_reg_int( (mca_base_component_t*)&mca_btl_mx_component, "timeout", "Timeout for connections", false, false, 10000, &mca_btl_mx_component.mx_timeout ); + mca_base_param_reg_int( (mca_base_component_t*)&mca_btl_mx_component, "retries", + "Number of retries for each new connection before considering the peer as unreacheable", + false, false, 20, &mca_btl_mx_component.mx_connection_retries ); mca_base_param_reg_int( (mca_base_component_t*)&mca_btl_mx_component, "filter", "Unique ID for the application (used to connect to the peers)", false, false, 0xdeadbeef, &mca_btl_mx_component.mx_filter ); diff --git a/ompi/mca/btl/mx/btl_mx_proc.c b/ompi/mca/btl/mx/btl_mx_proc.c index c940fba0c6..40d18242f1 100644 --- a/ompi/mca/btl/mx/btl_mx_proc.c +++ b/ompi/mca/btl/mx/btl_mx_proc.c @@ -37,6 +37,8 @@ void mca_btl_mx_proc_construct(mca_btl_mx_proc_t* proc) proc->proc_addr_index = 0; proc->proc_endpoints = NULL; proc->proc_endpoint_count = 0; + proc->mx_peers_count = 0; + proc->mx_peers = NULL; OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t); /* add to list of all proc instance */ OPAL_THREAD_LOCK(&mca_btl_mx_component.mx_lock); @@ -56,8 +58,13 @@ void mca_btl_mx_proc_destruct(mca_btl_mx_proc_t* proc) OPAL_THREAD_UNLOCK(&mca_btl_mx_component.mx_lock); /* release resources */ - if(NULL != proc->proc_endpoints) { + if( NULL != proc->proc_endpoints ) { free(proc->proc_endpoints); + proc->proc_endpoints = NULL; + } + if( NULL != proc->mx_peers ) { + free(proc->mx_peers); + proc->mx_peers = NULL; } } @@ -72,11 +79,9 @@ static mca_btl_mx_proc_t* mca_btl_mx_proc_lookup_ompi(ompi_proc_t* ompi_proc) OPAL_THREAD_LOCK(&mca_btl_mx_component.mx_lock); - for(mx_proc = (mca_btl_mx_proc_t*) - opal_list_get_first(&mca_btl_mx_component.mx_procs); - mx_proc != (mca_btl_mx_proc_t*) - opal_list_get_end(&mca_btl_mx_component.mx_procs); - mx_proc = (mca_btl_mx_proc_t*)opal_list_get_next(mx_proc)) { + for( mx_proc = (mca_btl_mx_proc_t*)opal_list_get_first(&mca_btl_mx_component.mx_procs); + mx_proc != (mca_btl_mx_proc_t*)opal_list_get_end(&mca_btl_mx_component.mx_procs); + mx_proc = (mca_btl_mx_proc_t*)opal_list_get_next(mx_proc) ) { if(mx_proc->proc_ompi == ompi_proc) { OPAL_THREAD_UNLOCK(&mca_btl_mx_component.mx_lock); @@ -115,7 +120,7 @@ mca_btl_mx_proc_t* mca_btl_mx_proc_create(ompi_proc_t* ompi_proc) module_proc = OBJ_NEW(mca_btl_mx_proc_t); - module_proc->proc_ompi = ompi_proc; + module_proc->proc_ompi = ompi_proc; return module_proc; } @@ -124,15 +129,13 @@ mca_btl_mx_proc_t* mca_btl_mx_proc_create(ompi_proc_t* ompi_proc) /* * Note that this routine must be called with the lock on the process * already held. Insert a btl instance into the proc array and assign - * it an address. +* it an address. */ int mca_btl_mx_proc_insert( mca_btl_mx_proc_t* module_proc, mca_btl_mx_endpoint_t* module_endpoint ) { - mx_return_t mx_status; - mx_endpoint_addr_t mx_remote_addr; mca_btl_mx_addr_t *mx_peers; - int num_retry = 0, rc, count, i; + int rc; size_t size; /* query for the peer address info */ @@ -151,53 +154,68 @@ int mca_btl_mx_proc_insert( mca_btl_mx_proc_t* module_proc, OBJ_RELEASE(module_proc); return OMPI_ERROR; } - count = size / sizeof(mca_btl_mx_addr_t); + module_proc->mx_peers_count = size / sizeof(mca_btl_mx_addr_t); + if( 0 == module_proc->mx_peers_count ) { /* no available connection */ + return OMPI_ERROR; + } - for( i = module_proc->proc_addr_index; i < count; i++ ) { + module_proc->status = MCA_BTL_MX_NOT_CONNECTED; + module_proc->mx_peers = mx_peers; + + if( NULL == module_proc->proc_endpoints ) { + module_proc->proc_endpoints = (mca_btl_base_endpoint_t**) + malloc(module_proc->mx_peers_count * sizeof(mca_btl_base_endpoint_t*)); + if( NULL == module_proc->proc_endpoints ) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + /* insert into endpoint array */ + module_endpoint->endpoint_proc = module_proc; + + return OMPI_SUCCESS; +} + +int mca_btl_mx_proc_connect( mca_btl_mx_endpoint_t* module_endpoint ) +{ + int num_retry = 0, i; + mx_return_t mx_status; + mx_endpoint_addr_t mx_remote_addr; + mca_btl_mx_proc_t* module_proc = module_endpoint->endpoint_proc; + + for( i = module_proc->proc_addr_index; i < module_proc->mx_peers_count; i++ ) { retry_connect: mx_status = mx_connect( module_endpoint->endpoint_btl->mx_endpoint, - mx_peers[i].nic_id, mx_peers[i].endpoint_id, + module_proc->mx_peers[i].nic_id, module_proc->mx_peers[i].endpoint_id, mca_btl_mx_component.mx_filter, mca_btl_mx_component.mx_timeout, &mx_remote_addr ); if( MX_SUCCESS != mx_status ) { if( MX_TIMEOUT == mx_status ) - if( num_retry++ < 10 ) + if( num_retry++ < mca_btl_mx_component.mx_connection_retries ) goto retry_connect; { char peer_name[MX_MAX_HOSTNAME_LEN]; - if( MX_SUCCESS != mx_nic_id_to_hostname( mx_peers[i].nic_id, peer_name ) ) - sprintf( peer_name, "unknown %lx nic_id", mx_peers[i].nic_id ); + if( MX_SUCCESS != mx_nic_id_to_hostname( module_proc->mx_peers[i].nic_id, peer_name ) ) + sprintf( peer_name, "unknown %lx nic_id", module_proc->mx_peers[i].nic_id ); opal_output( 0, "mx_connect fail for %s(%dth remote address) with key %x (error %s)\n", peer_name, i, mca_btl_mx_component.mx_filter, mx_strerror(mx_status) ); } continue; } - module_endpoint->mx_peer.nic_id = mx_peers[i].nic_id; - module_endpoint->mx_peer.endpoint_id = mx_peers[i].endpoint_id; - module_endpoint->mx_peer_addr = mx_remote_addr; - module_proc->proc_addr_index = i; + module_endpoint->mx_peer.nic_id = module_proc->mx_peers[i].nic_id; + module_endpoint->mx_peer.endpoint_id = module_proc->mx_peers[i].endpoint_id; + module_endpoint->mx_peer_addr = mx_remote_addr; + module_proc->proc_addr_index = i; + module_proc->status = MCA_BTL_MX_CONNECTED; break; } - free( mx_peers ); - - if( i == count ) { /* no available connection */ + if( i == module_proc->mx_peers_count ) { /* no available connection */ + module_proc->status = MCA_BTL_MX_NOT_REACHEABLE; return OMPI_ERROR; } - if( NULL == module_proc->proc_endpoints ) { - module_proc->proc_endpoints = (mca_btl_base_endpoint_t**) - malloc(count * sizeof(mca_btl_base_endpoint_t*)); - if(NULL == module_proc->proc_endpoints) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - } - - /* insert into endpoint array */ - module_endpoint->endpoint_proc = module_proc; module_proc->proc_endpoints[module_proc->proc_endpoint_count++] = module_endpoint; - return OMPI_SUCCESS; } diff --git a/ompi/mca/btl/mx/btl_mx_proc.h b/ompi/mca/btl/mx/btl_mx_proc.h index a8f7727918..6cb1af5617 100644 --- a/ompi/mca/btl/mx/btl_mx_proc.h +++ b/ompi/mca/btl/mx/btl_mx_proc.h @@ -28,37 +28,47 @@ #if defined(c_plusplus) || defined(__cplusplus) extern "C" { #endif -OBJ_CLASS_DECLARATION(mca_btl_mx_proc_t); + OBJ_CLASS_DECLARATION(mca_btl_mx_proc_t); -/** - * Represents the state of a remote process and the set of addresses - * that it exports. Also cache an instance of mca_btl_base_endpoint_t for - * each - * BTL instance that attempts to open a connection to the process. - */ -struct mca_btl_mx_proc_t { - opal_list_item_t super; - /**< allow proc to be placed on a list */ +#define MCA_BTL_MX_NOT_CONNECTED 0x0000 +#define MCA_BTL_MX_NOT_REACHEABLE 0x0001 +#define MCA_BTL_MX_CONNECTED 0x0002 - ompi_proc_t *proc_ompi; - /**< pointer to corresponding ompi_proc_t */ + /** + * Represents the state of a remote process and the set of addresses + * that it exports. Also cache an instance of mca_btl_base_endpoint_t for + * each + * BTL instance that attempts to open a connection to the process. + */ + struct mca_btl_mx_proc_t { + opal_list_item_t super; + /**< allow proc to be placed on a list */ - size_t proc_addr_index; - /**< next remote address that will be used to establish the connection */ + ompi_proc_t *proc_ompi; + /**< pointer to corresponding ompi_proc_t */ - struct mca_btl_base_endpoint_t **proc_endpoints; - /**< array of endpoints that have been created to access this proc */ + int status; /**< status of the connection */ - size_t proc_endpoint_count; - /**< number of endpoints */ + mca_btl_mx_addr_t *mx_peers; /**< peers addresses */ + int mx_peers_count; - opal_mutex_t proc_lock; - /**< lock to protect against concurrent access to proc state */ -}; -typedef struct mca_btl_mx_proc_t mca_btl_mx_proc_t; + size_t proc_addr_index; + /**< next remote address that will be used to establish the connection */ -mca_btl_mx_proc_t* mca_btl_mx_proc_create(ompi_proc_t* ompi_proc); -int mca_btl_mx_proc_insert(mca_btl_mx_proc_t*, mca_btl_base_endpoint_t*); + struct mca_btl_base_endpoint_t **proc_endpoints; + /**< array of endpoints that have been created to access this proc */ + + size_t proc_endpoint_count; + /**< number of endpoints */ + + opal_mutex_t proc_lock; + /**< lock to protect against concurrent access to proc state */ + }; + typedef struct mca_btl_mx_proc_t mca_btl_mx_proc_t; + + mca_btl_mx_proc_t* mca_btl_mx_proc_create(ompi_proc_t* ompi_proc); + int mca_btl_mx_proc_insert(mca_btl_mx_proc_t*, mca_btl_base_endpoint_t*); + int mca_btl_mx_proc_connect( mca_btl_mx_endpoint_t* module_endpoint ); #if defined(c_plusplus) || defined(__cplusplus) }