1
1

Perform endpoint initialization atomically.

Этот коммит содержится в:
Artem Polyakov 2015-12-21 09:34:11 +06:00
родитель afaf9c9ea6
Коммит 0f77bc7ea7
3 изменённых файлов: 51 добавлений и 33 удалений

Просмотреть файл

@ -796,11 +796,11 @@ static int prepare_device_for_use (mca_btl_openib_device_t *device)
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc_t* ib_proc, static int init_ib_proc_nolock(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc_t* ib_proc,
mca_btl_base_endpoint_t **endpoint_ptr, mca_btl_base_endpoint_t **endpoint_ptr,
int local_port_cnt, int btl_rank, bool *is_reachable) int local_port_cnt, int btl_rank, bool *is_reachable)
{ {
int rem_port_cnt, matching_port = -1, i, j, rc; int rem_port_cnt, matching_port = -1, j, rc;
mca_btl_base_endpoint_t *endpoint; mca_btl_base_endpoint_t *endpoint;
opal_btl_openib_connect_base_module_t *local_cpc; opal_btl_openib_connect_base_module_t *local_cpc;
opal_btl_openib_connect_base_module_data_t *remote_cpc_data; opal_btl_openib_connect_base_module_data_t *remote_cpc_data;
@ -871,8 +871,6 @@ static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc
return OPAL_ERROR; return OPAL_ERROR;
} }
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
/* The btl_proc datastructure is shared by all IB BTL /* The btl_proc datastructure is shared by all IB BTL
* instances that are trying to reach this destination. * instances that are trying to reach this destination.
* Cache the peer instance on the btl_proc. * Cache the peer instance on the btl_proc.
@ -880,7 +878,6 @@ static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc
endpoint = OBJ_NEW(mca_btl_openib_endpoint_t); endpoint = OBJ_NEW(mca_btl_openib_endpoint_t);
assert(((opal_object_t*)endpoint)->obj_reference_count == 1); assert(((opal_object_t*)endpoint)->obj_reference_count == 1);
if(NULL == endpoint) { if(NULL == endpoint) {
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERR_OUT_OF_RESOURCE; return OPAL_ERR_OUT_OF_RESOURCE;
} }
@ -905,7 +902,6 @@ static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc
ib_proc->proc_ports[j].pm_port_info.subnet_id, ib_proc->proc_ports[j].pm_port_info.subnet_id,
ib_proc->proc_opal->proc_name.jobid, endpoint); ib_proc->proc_opal->proc_name.jobid, endpoint);
if (OPAL_SUCCESS != rc ) { if (OPAL_SUCCESS != rc ) {
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR; return OPAL_ERROR;
} }
} }
@ -918,20 +914,17 @@ static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc
rc = mca_btl_openib_proc_insert(ib_proc, endpoint); rc = mca_btl_openib_proc_insert(ib_proc, endpoint);
if (OPAL_SUCCESS != rc) { if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint); OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR; return OPAL_ERROR;
} }
if(OPAL_SUCCESS != mca_btl_openib_tune_endpoint(openib_btl, endpoint)) { if(OPAL_SUCCESS != mca_btl_openib_tune_endpoint(openib_btl, endpoint)) {
OBJ_RELEASE(endpoint); OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR; return OPAL_ERROR;
} }
endpoint->index = opal_pointer_array_add(openib_btl->device->endpoints, (void*)endpoint); endpoint->index = opal_pointer_array_add(openib_btl->device->endpoints, (void*)endpoint);
if( 0 > endpoint->index ) { if( 0 > endpoint->index ) {
OBJ_RELEASE(endpoint); OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR; return OPAL_ERROR;
} }
@ -942,13 +935,10 @@ static int init_ib_proc(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc
rc = local_cpc->cbm_endpoint_init(endpoint); rc = local_cpc->cbm_endpoint_init(endpoint);
if (OPAL_SUCCESS != rc) { if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint); OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR; return OPAL_ERROR;
} }
} }
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
*is_reachable = true; *is_reachable = true;
*endpoint_ptr = endpoint; *endpoint_ptr = endpoint;
@ -1011,6 +1001,7 @@ int mca_btl_openib_add_procs(
mca_btl_openib_proc_t* ib_proc; mca_btl_openib_proc_t* ib_proc;
bool found_existing = false; bool found_existing = false;
bool is_reachable; bool is_reachable;
bool is_new;
opal_output(-1, "add procs: adding proc %d", i); opal_output(-1, "add procs: adding proc %d", i);
@ -1030,14 +1021,13 @@ int mca_btl_openib_add_procs(
} }
#endif #endif
if(NULL == (ib_proc = mca_btl_openib_proc_create(proc))) { if(NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc))) {
/* if we don't have connection info for this process, it's /* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it, * okay because some other method might be able to reach it,
* so just mark it as unreachable by us */ * so just mark it as unreachable by us */
continue; continue;
} }
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) { for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j]; endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) { if (endpoint->endpoint_btl == openib_btl) {
@ -1045,18 +1035,19 @@ int mca_btl_openib_add_procs(
break; break;
} }
} }
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
if (found_existing) { if (found_existing) {
if (reachable) { if (reachable) {
opal_bitmap_set_bit(reachable, i); opal_bitmap_set_bit(reachable, i);
} }
peers[i] = endpoint; peers[i] = endpoint;
OPAL_THREAD_UNLOCK( &ib_proc->proc_lock );
continue; continue;
} }
rc = init_ib_proc(openib_btl, ib_proc, &endpoint, lcl_subnet_id_port_cnt, rc = init_ib_proc_nolock(openib_btl, ib_proc, &endpoint, lcl_subnet_id_port_cnt,
btl_rank, &is_reachable); btl_rank, &is_reachable);
OPAL_THREAD_UNLOCK( &ib_proc->proc_lock );
if( OPAL_SUCCESS == rc ){ if( OPAL_SUCCESS == rc ){
peers[i] = endpoint; peers[i] = endpoint;
if( is_reachable && NULL != reachable ){ if( is_reachable && NULL != reachable ){
@ -1076,15 +1067,18 @@ struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_modul
mca_btl_openib_module_t *openib_btl = (mca_btl_openib_module_t *) btl; mca_btl_openib_module_t *openib_btl = (mca_btl_openib_module_t *) btl;
mca_btl_base_endpoint_t *endpoint; mca_btl_base_endpoint_t *endpoint;
mca_btl_openib_proc_t *ib_proc; mca_btl_openib_proc_t *ib_proc;
int j;
int local_port_cnt = 0, btl_rank;
bool is_reachable;
if (NULL == (ib_proc = mca_btl_openib_proc_create(proc))) { if (NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc))) {
/* if we don't have connection info for this process, it's /* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it, * okay because some other method might be able to reach it,
* so just mark it as unreachable by us */ * so just mark it as unreachable by us */
return NULL; return NULL;
} }
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) { for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j]; endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) { if (endpoint->endpoint_btl == openib_btl) {
@ -1092,13 +1086,19 @@ struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_modul
return endpoint; return endpoint;
} }
} }
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
BTL_VERBOSE(("creating new endpoint for remote process {.jobid = 0x%x, .vpid = 0x%x}", for(j=0; j < mca_btl_openib_component.ib_num_btls; j++){
proc->proc_name.jobid, proc->proc_name.vpid)); if(mca_btl_openib_component.openib_btls[j]->port_info.subnet_id
== openib_btl->port_info.subnet_id) {
if(openib_btl == mca_btl_openib_component.openib_btls[j]) {
btl_rank = local_port_cnt;
}
local_port_cnt++;
}
}
endpoint = NULL; (void)init_ib_proc_nolock(openib_btl, ib_proc, &endpoint,
(void) mca_btl_openib_add_procs (btl, 1, &proc, &endpoint, NULL); local_port_cnt, btl_rank, &is_reachable);
return endpoint; return endpoint;
} }

Просмотреть файл

@ -75,7 +75,7 @@ void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
* Look for an existing IB process instances based on the associated * Look for an existing IB process instances based on the associated
* opal_proc_t instance. * opal_proc_t instance.
*/ */
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc_nolock(opal_proc_t* proc) static mca_btl_openib_proc_t* ibproc_lookup_no_lock(opal_proc_t* proc)
{ {
mca_btl_openib_proc_t* ib_proc; mca_btl_openib_proc_t* ib_proc;
@ -91,13 +91,19 @@ static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc_nolock(opal_proc_t
return NULL; return NULL;
} }
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc(opal_proc_t* proc) static mca_btl_openib_proc_t* ibproc_lookup_and_lock(opal_proc_t* proc)
{ {
mca_btl_openib_proc_t* ib_proc; mca_btl_openib_proc_t* ib_proc;
/* get the process from the list */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock); OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
ib_proc = mca_btl_openib_proc_lookup_proc_nolock(proc); ib_proc = ibproc_lookup_no_lock(proc);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock); OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
if( NULL != ib_proc ){
/* if we were able to find it - lock it.
* NOTE: we want to lock it outside of list locked region */
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
}
return ib_proc; return ib_proc;
} }
@ -117,7 +123,7 @@ static void inline unpack8(char **src, uint8_t *value)
* associated w/ a given destination on this datastructure. * associated w/ a given destination on this datastructure.
*/ */
mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc) mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc)
{ {
mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL; mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL;
size_t msg_size; size_t msg_size;
@ -127,10 +133,11 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
char *offset; char *offset;
int modex_message_size; int modex_message_size;
mca_btl_openib_modex_message_t dummy; mca_btl_openib_modex_message_t dummy;
bool found = false;
/* Check if we have already created a IB proc /* Check if we have already created a IB proc
* structure for this ompi process */ * structure for this ompi process */
ib_proc = mca_btl_openib_proc_lookup_proc(proc); ib_proc = ibproc_lookup_and_lock(proc);
if (NULL != ib_proc) { if (NULL != ib_proc) {
/* Gotcha! */ /* Gotcha! */
return ib_proc; return ib_proc;
@ -258,17 +265,28 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
/* Finally add this process to the initialized procs list */ /* Finally add this process to the initialized procs list */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock); OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
ib_proc_ret = mca_btl_openib_proc_lookup_proc_nolock(proc); ib_proc_ret = ibproc_lookup_no_lock(proc);
if (NULL == ib_proc_ret) { if (NULL == ib_proc_ret) {
/* if process can't be found in this list - insert it */ /* if process can't be found in this list - insert it locked
* it is safe to lock ib_proc here because this thread is
* the only one who knows about it so far */
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super); opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
ib_proc_ret = ib_proc; ib_proc_ret = ib_proc;
found = true;
} else { } else {
/* otherwise - release module_proc */ /* otherwise - release module_proc */
OBJ_RELEASE(ib_proc); OBJ_RELEASE(ib_proc);
found = false;
} }
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock); OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
/* if we haven't insert the process - lock it here so we
* won't lock mca_btl_openib_component.ib_lock */
if( !found ){
OPAL_THREAD_LOCK(&ib_proc_ret->proc_lock);
}
return ib_proc_ret; return ib_proc_ret;
err_exit: err_exit:
@ -288,7 +306,7 @@ int mca_btl_openib_proc_remove(opal_proc_t *proc,
/* Remove endpoint from the openib BTL version of the proc as /* Remove endpoint from the openib BTL version of the proc as
well */ well */
ib_proc = mca_btl_openib_proc_lookup_proc(proc); ib_proc = ibproc_lookup_and_lock(proc);
if (NULL != ib_proc) { if (NULL != ib_proc) {
for (i = 0; i < ib_proc->proc_endpoint_count; ++i) { for (i = 0; i < ib_proc->proc_endpoint_count; ++i) {
if (ib_proc->proc_endpoints[i] == endpoint) { if (ib_proc->proc_endpoints[i] == endpoint) {

Просмотреть файл

@ -84,7 +84,7 @@ typedef struct mca_btl_openib_proc_t mca_btl_openib_proc_t;
OBJ_CLASS_DECLARATION(mca_btl_openib_proc_t); OBJ_CLASS_DECLARATION(mca_btl_openib_proc_t);
mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc); mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc);
int mca_btl_openib_proc_insert(mca_btl_openib_proc_t*, mca_btl_base_endpoint_t*); int mca_btl_openib_proc_insert(mca_btl_openib_proc_t*, mca_btl_base_endpoint_t*);
int mca_btl_openib_proc_remove(opal_proc_t* proc, int mca_btl_openib_proc_remove(opal_proc_t* proc,
mca_btl_base_endpoint_t* module_endpoint); mca_btl_base_endpoint_t* module_endpoint);