1
1

Fix openib race condition when direct modex is used.

The problem was in mca_btl_openib_proc_create. This function may be called
from several places simultaneously:
* from the main thread when somebody wants to do `MPI_Send()` (for example) for
the first time;
* from udcm if the counterpart peer is trying to connect and `mca_btl_openib_get_ep()`
is called.

In this case one of the threads may add an uninitialized proc structure
to the `mca_btl_openib_component.ib_procs` and the other will read it and
treat as initialized.

This commit turns ib_proc initialization into a single atomic operation.
Этот коммит содержится в:
Artem Polyakov 2015-12-20 10:16:33 +06:00
родитель db4f483653
Коммит 3c9fd567b6

Просмотреть файл

@ -44,10 +44,6 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
ib_proc->proc_endpoints = 0;
ib_proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&ib_proc->proc_lock, opal_mutex_t);
/* add to list of all proc instance */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
}
/*
@ -56,11 +52,6 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
{
/* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
opal_list_remove_item(&mca_btl_openib_component.ib_procs, &ib_proc->super);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
/* release resources */
if(NULL != ib_proc->proc_endpoints) {
free(ib_proc->proc_endpoints);
@ -84,26 +75,32 @@ void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
* Look for an existing IB process instances based on the associated
* opal_proc_t instance.
*/
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc(opal_proc_t* proc)
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc_nolock(opal_proc_t* proc)
{
mca_btl_openib_proc_t* ib_proc;
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
for(ib_proc = (mca_btl_openib_proc_t*)
opal_list_get_first(&mca_btl_openib_component.ib_procs);
ib_proc != (mca_btl_openib_proc_t*)
opal_list_get_end(&mca_btl_openib_component.ib_procs);
ib_proc = (mca_btl_openib_proc_t*)opal_list_get_next(ib_proc)) {
if(ib_proc->proc_opal == proc) {
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return ib_proc;
}
}
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return NULL;
}
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc(opal_proc_t* proc)
{
mca_btl_openib_proc_t* ib_proc;
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
ib_proc = mca_btl_openib_proc_lookup_proc_nolock(proc);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return ib_proc;
}
static void inline unpack8(char **src, uint8_t *value)
{
/* Copy one character */
@ -122,7 +119,7 @@ static void inline unpack8(char **src, uint8_t *value)
mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
{
mca_btl_openib_proc_t* module_proc = NULL;
mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL;
size_t msg_size;
uint32_t size;
int rc, i, j;
@ -133,18 +130,26 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
/* Check if we have already created a IB proc
* structure for this ompi process */
module_proc = mca_btl_openib_proc_lookup_proc(proc);
if (NULL != module_proc) {
ib_proc = mca_btl_openib_proc_lookup_proc(proc);
if (NULL != ib_proc) {
/* Gotcha! */
return module_proc;
return ib_proc;
}
/* Oops! First time, gotta create a new IB proc
/* All initialization has to be an atomic operation. we do the following assumption:
* - we let all concurent threads to try to do the initialization;
* - when one has finished it locks ib_lock and checks if corresponding
* process is still missing;
* - if so - new proc is added, otherwise - initialized proc struct is released.
*/
/* First time, gotta create a new IB proc
* out of the opal_proc ... */
module_proc = OBJ_NEW(mca_btl_openib_proc_t);
ib_proc = OBJ_NEW(mca_btl_openib_proc_t);
/* Initialize number of peer */
module_proc->proc_endpoint_count = 0;
module_proc->proc_opal = proc;
ib_proc->proc_endpoint_count = 0;
ib_proc->proc_opal = proc;
/* query for the peer address info */
OPAL_MODEX_RECV(rc, &mca_btl_openib_component.super.btl_version,
@ -153,11 +158,10 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
BTL_VERBOSE(("[%s:%d] opal_modex_recv failed for peer %s",
__FILE__, __LINE__,
OPAL_NAME_PRINT(proc->proc_name)));
OBJ_RELEASE(module_proc);
return NULL;
goto err_exit;
}
if (0 == msg_size) {
return NULL;
goto err_exit;
}
/* Message was packed in btl_openib_component.c; the format is
@ -166,22 +170,22 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
/* Unpack the number of modules in the message */
offset = (char *) message;
unpack8(&offset, &(module_proc->proc_port_count));
BTL_VERBOSE(("unpack: %d btls", module_proc->proc_port_count));
if (module_proc->proc_port_count > 0) {
module_proc->proc_ports = (mca_btl_openib_proc_modex_t *)
unpack8(&offset, &(ib_proc->proc_port_count));
BTL_VERBOSE(("unpack: %d btls", ib_proc->proc_port_count));
if (ib_proc->proc_port_count > 0) {
ib_proc->proc_ports = (mca_btl_openib_proc_modex_t *)
malloc(sizeof(mca_btl_openib_proc_modex_t) *
module_proc->proc_port_count);
ib_proc->proc_port_count);
} else {
module_proc->proc_ports = NULL;
ib_proc->proc_ports = NULL;
}
/* Loop over unpacking all the ports */
for (i = 0; i < module_proc->proc_port_count; i++) {
for (i = 0; i < ib_proc->proc_port_count; i++) {
/* Unpack the modex comment message struct */
size = modex_message_size;
memcpy(&(module_proc->proc_ports[i].pm_port_info), offset, size);
memcpy(&(ib_proc->proc_ports[i].pm_port_info), offset, size);
#if !defined(WORDS_BIGENDIAN) && OPAL_ENABLE_HETEROGENEOUS_SUPPORT
MCA_BTL_OPENIB_MODEX_MSG_NTOH(module_proc->proc_ports[i].pm_port_info);
#endif
@ -190,22 +194,22 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
i, (int)(offset-((char*)message))));
/* Unpack the number of CPCs that follow */
unpack8(&offset, &(module_proc->proc_ports[i].pm_cpc_data_count));
unpack8(&offset, &(ib_proc->proc_ports[i].pm_cpc_data_count));
BTL_VERBOSE(("unpacked btl %d: number of cpcs to follow %d (offset now %d)",
i, module_proc->proc_ports[i].pm_cpc_data_count,
i, ib_proc->proc_ports[i].pm_cpc_data_count,
(int)(offset-((char*)message))));
module_proc->proc_ports[i].pm_cpc_data = (opal_btl_openib_connect_base_module_data_t *)
calloc(module_proc->proc_ports[i].pm_cpc_data_count,
ib_proc->proc_ports[i].pm_cpc_data = (opal_btl_openib_connect_base_module_data_t *)
calloc(ib_proc->proc_ports[i].pm_cpc_data_count,
sizeof(opal_btl_openib_connect_base_module_data_t));
if (NULL == module_proc->proc_ports[i].pm_cpc_data) {
return NULL;
if (NULL == ib_proc->proc_ports[i].pm_cpc_data) {
goto err_exit;
}
/* Unpack the CPCs */
for (j = 0; j < module_proc->proc_ports[i].pm_cpc_data_count; ++j) {
for (j = 0; j < ib_proc->proc_ports[i].pm_cpc_data_count; ++j) {
uint8_t u8;
opal_btl_openib_connect_base_module_data_t *cpcd;
cpcd = module_proc->proc_ports[i].pm_cpc_data + j;
cpcd = ib_proc->proc_ports[i].pm_cpc_data + j;
unpack8(&offset, &u8);
BTL_VERBOSE(("unpacked btl %d: cpc %d: index %d (offset now %d)",
i, j, u8, (int)(offset-(char*)message)));
@ -224,7 +228,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
cpcd->cbm_modex_message = malloc(cpcd->cbm_modex_message_len);
if (NULL == cpcd->cbm_modex_message) {
BTL_ERROR(("Failed to malloc"));
return NULL;
goto err_exit;
}
memcpy(cpcd->cbm_modex_message, offset,
cpcd->cbm_modex_message_len);
@ -238,20 +242,42 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
}
}
if (0 == module_proc->proc_port_count) {
module_proc->proc_endpoints = NULL;
if (0 == ib_proc->proc_port_count) {
ib_proc->proc_endpoints = NULL;
} else {
module_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(module_proc->proc_port_count *
ib_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(ib_proc->proc_port_count *
sizeof(mca_btl_base_endpoint_t*));
}
if (NULL == module_proc->proc_endpoints) {
OBJ_RELEASE(module_proc);
return NULL;
if (NULL == ib_proc->proc_endpoints) {
goto err_exit;
}
BTL_VERBOSE(("unpacking done!"));
return module_proc;
/* Finally add this process to the initialized procs list */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
ib_proc_ret = mca_btl_openib_proc_lookup_proc_nolock(proc);
if (NULL == ib_proc_ret) {
/* if process can't be found in this list - insert it */
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
ib_proc_ret = ib_proc;
} else {
/* otherwise - release module_proc */
OBJ_RELEASE(ib_proc);
}
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return ib_proc_ret;
err_exit:
fprintf(stderr,"%d: error exit from mca_btl_openib_proc_create\n", OPAL_PROC_MY_NAME.vpid);
if( NULL != ib_proc ){
OBJ_RELEASE(ib_proc);
}
return NULL;
}
int mca_btl_openib_proc_remove(opal_proc_t *proc,