1
1

Merge pull request #1248 from artpol84/openib_proc_init_race

Openib dynamic add proc race conditions
Этот коммит содержится в:
Nathan Hjelm 2015-12-22 21:48:05 -07:00
родитель 2362bf0c0c 08ad8357a8
Коммит 84d890b7e7
3 изменённых файлов: 333 добавлений и 229 удалений

Просмотреть файл

@ -210,7 +210,6 @@ static int adjust_cq(mca_btl_openib_device_t *device, const int cq)
return OPAL_ERROR;
}
OPAL_THREAD_LOCK(&device->device_lock);
if (!device->progress) {
int rc;
device->progress = true;
@ -219,7 +218,6 @@ static int adjust_cq(mca_btl_openib_device_t *device, const int cq)
return rc;
}
}
OPAL_THREAD_UNLOCK(&device->device_lock);
#endif
}
#ifdef HAVE_IBV_RESIZE_CQ
@ -406,7 +404,7 @@ static int create_srq(mca_btl_openib_module_t *openib_btl)
return OPAL_SUCCESS;
}
static int mca_btl_openib_size_queues(struct mca_btl_openib_module_t* openib_btl, size_t nprocs)
static int mca_btl_openib_size_queues_nolock(struct mca_btl_openib_module_t* openib_btl, size_t nprocs)
{
uint32_t send_cqes, recv_cqes;
int rc = OPAL_SUCCESS, qp;
@ -603,7 +601,7 @@ static int mca_btl_openib_tune_endpoint(mca_btl_openib_module_t* openib_btl,
return OPAL_SUCCESS;
}
static int prepare_device_for_use (mca_btl_openib_device_t *device)
static int prepare_device_for_use_nolock (mca_btl_openib_device_t *device)
{
mca_btl_openib_frag_init_data_t *init_data;
int rc, length;
@ -796,6 +794,156 @@ static int prepare_device_for_use (mca_btl_openib_device_t *device)
return OPAL_SUCCESS;
}
static int init_ib_proc_nolock(mca_btl_openib_module_t* openib_btl, mca_btl_openib_proc_t* ib_proc,
mca_btl_base_endpoint_t **endpoint_ptr,
int local_port_cnt, int btl_rank)
{
int rem_port_cnt, matching_port = -1, j, rc;
mca_btl_base_endpoint_t *endpoint;
opal_btl_openib_connect_base_module_t *local_cpc;
opal_btl_openib_connect_base_module_data_t *remote_cpc_data;
*endpoint_ptr = NULL;
/* check if the remote proc has any ports that:
- on the same subnet as the local proc, and
- on that subnet, has a CPC in common with the local proc
*/
rem_port_cnt = 0;
BTL_VERBOSE(("got %d port_infos ", ib_proc->proc_port_count));
for (j = 0; j < (int) ib_proc->proc_port_count; j++){
BTL_VERBOSE(("got a subnet %016" PRIx64,
ib_proc->proc_ports[j].pm_port_info.subnet_id));
if (ib_proc->proc_ports[j].pm_port_info.subnet_id ==
openib_btl->port_info.subnet_id) {
BTL_VERBOSE(("Got a matching subnet!"));
if (rem_port_cnt == btl_rank) {
matching_port = j;
}
rem_port_cnt++;
}
}
if (0 == rem_port_cnt) {
/* no use trying to communicate with this endpoint */
BTL_VERBOSE(("No matching subnet id/CPC was found, moving on.. "));
return OPAL_ERROR;
}
/* If this process has multiple ports on a single subnet ID,
and the report proc also has multiple ports on this same
subnet ID, the default connection pattern is:
LOCAL REMOTE PEER
1st port on subnet X <--> 1st port on subnet X
2nd port on subnet X <--> 2nd port on subnet X
3nd port on subnet X <--> 3nd port on subnet X
...etc.
Note that the port numbers may not be contiguous, and they
may not be the same on either side. Hence the "1st", "2nd",
"3rd, etc. notation, above.
Hence, if the local "rank" of this module's port on the
subnet ID is greater than the total number of ports on the
peer on this same subnet, then we have no match. So skip
this connection. */
if (rem_port_cnt < local_port_cnt && btl_rank >= rem_port_cnt) {
BTL_VERBOSE(("Not enough remote ports on this subnet id, moving on.. "));
return OPAL_ERROR;
}
/* Now that we have verified that we're on the same subnet and
the remote peer has enough ports, see if that specific port
on the peer has a matching CPC. */
assert(btl_rank <= ib_proc->proc_port_count);
assert(matching_port != -1);
if (OPAL_SUCCESS !=
opal_btl_openib_connect_base_find_match(openib_btl,
&(ib_proc->proc_ports[matching_port]),
&local_cpc,
&remote_cpc_data)) {
return OPAL_ERROR;
}
/* The btl_proc datastructure is shared by all IB BTL
* instances that are trying to reach this destination.
* Cache the peer instance on the btl_proc.
*/
endpoint = OBJ_NEW(mca_btl_openib_endpoint_t);
assert(((opal_object_t*)endpoint)->obj_reference_count == 1);
if(NULL == endpoint) {
return OPAL_ERR_OUT_OF_RESOURCE;
}
#if HAVE_XRC
if (MCA_BTL_XRC_ENABLED) {
int rem_port_cnt = 0;
for(j = 0; j < (int) ib_proc->proc_port_count; j++) {
if(ib_proc->proc_ports[j].pm_port_info.subnet_id ==
openib_btl->port_info.subnet_id) {
if (rem_port_cnt == btl_rank)
break;
else
rem_port_cnt ++;
}
}
assert(rem_port_cnt == btl_rank);
/* Push the subnet/lid/jobid to xrc hash */
rc = mca_btl_openib_ib_address_add_new(
ib_proc->proc_ports[j].pm_port_info.lid,
ib_proc->proc_ports[j].pm_port_info.subnet_id,
ib_proc->proc_opal->proc_name.jobid, endpoint);
if (OPAL_SUCCESS != rc ) {
return OPAL_ERROR;
}
}
#endif
mca_btl_openib_endpoint_init(openib_btl, endpoint,
local_cpc,
&(ib_proc->proc_ports[matching_port]),
remote_cpc_data);
rc = mca_btl_openib_proc_insert(ib_proc, endpoint);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint);
return OPAL_ERROR;
}
if(OPAL_SUCCESS != mca_btl_openib_tune_endpoint(openib_btl, endpoint)) {
OBJ_RELEASE(endpoint);
return OPAL_ERROR;
}
/* protect device because several endpoints for different ib_proc's
* may be simultaneously initialized */
opal_mutex_lock(&openib_btl->device->device_lock);
endpoint->index = opal_pointer_array_add(openib_btl->device->endpoints, (void*)endpoint);
opal_mutex_unlock(&openib_btl->device->device_lock);
if( 0 > endpoint->index ) {
OBJ_RELEASE(endpoint);
return OPAL_ERROR;
}
/* Tell the selected CPC that it won. NOTE: This call is
outside of / separate from mca_btl_openib_endpoint_init()
because this function likely needs the endpoint->index. */
if (NULL != local_cpc->cbm_endpoint_init) {
rc = local_cpc->cbm_endpoint_init(endpoint);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint);
return OPAL_ERROR;
}
}
*endpoint_ptr = endpoint;
return OPAL_SUCCESS;
}
/*
* add a proc to this btl module
* creates an endpoint that is setup on the
@ -810,12 +958,9 @@ int mca_btl_openib_add_procs(
{
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*)btl;
int i,j, rc, local_procs;
int rem_subnet_id_port_cnt;
int lcl_subnet_id_port_cnt = 0;
int btl_rank = 0;
mca_btl_base_endpoint_t* endpoint;
opal_btl_openib_connect_base_module_t *local_cpc;
opal_btl_openib_connect_base_module_data_t *remote_cpc_data;
for(j=0; j < mca_btl_openib_component.ib_num_btls; j++){
if(mca_btl_openib_component.openib_btls[j]->port_info.subnet_id
@ -838,23 +983,28 @@ int mca_btl_openib_add_procs(
}
#endif
rc = prepare_device_for_use (openib_btl->device);
/* protect the device */
opal_mutex_lock(&openib_btl->device->device_lock);
rc = prepare_device_for_use_nolock (openib_btl->device);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("could not prepare openib device for use"));
opal_mutex_unlock(&openib_btl->device->device_lock);
return rc;
}
rc = mca_btl_openib_size_queues(openib_btl, nprocs);
rc = mca_btl_openib_size_queues_nolock(openib_btl, nprocs);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("error creating cqs"));
opal_mutex_unlock(&openib_btl->device->device_lock);
return rc;
}
opal_mutex_unlock(&openib_btl->device->device_lock);
for (i = 0, local_procs = 0 ; i < (int) nprocs; i++) {
struct opal_proc_t* proc = procs[i];
mca_btl_openib_proc_t* ib_proc;
bool found_existing = false;
int remote_matching_port;
bool is_new;
opal_output(-1, "add procs: adding proc %d", i);
@ -874,177 +1024,47 @@ int mca_btl_openib_add_procs(
}
#endif
if(NULL == (ib_proc = mca_btl_openib_proc_create(proc))) {
if(NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc, &is_new)) ) {
/* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it,
* so just mark it as unreachable by us */
continue;
}
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) {
found_existing = true;
break;
found_existing = false;
if( !is_new ){
for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) {
found_existing = true;
break;
}
}
}
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
if( !found_existing ) {
rc = init_ib_proc_nolock(openib_btl, ib_proc, &endpoint,
lcl_subnet_id_port_cnt, btl_rank);
if( OPAL_SUCCESS == rc ){
found_existing = true;
}
}
opal_mutex_unlock( &ib_proc->proc_lock );
if (found_existing) {
if (reachable) {
opal_bitmap_set_bit(reachable, i);
}
peers[i] = endpoint;
continue;
}
/* check if the remote proc has any ports that:
- on the same subnet as the local proc, and
- on that subnet, has a CPC in common with the local proc
*/
remote_matching_port = -1;
rem_subnet_id_port_cnt = 0;
BTL_VERBOSE(("got %d port_infos ", ib_proc->proc_port_count));
for (j = 0; j < (int) ib_proc->proc_port_count; j++){
BTL_VERBOSE(("got a subnet %016" PRIx64,
ib_proc->proc_ports[j].pm_port_info.subnet_id));
if (ib_proc->proc_ports[j].pm_port_info.subnet_id ==
openib_btl->port_info.subnet_id) {
BTL_VERBOSE(("Got a matching subnet!"));
if (rem_subnet_id_port_cnt == btl_rank) {
remote_matching_port = j;
}
rem_subnet_id_port_cnt++;
}
}
if (0 == rem_subnet_id_port_cnt) {
/* no use trying to communicate with this endpoint */
BTL_VERBOSE(("No matching subnet id/CPC was found, moving on.. "));
continue;
}
/* If this process has multiple ports on a single subnet ID,
and the report proc also has multiple ports on this same
subnet ID, the default connection pattern is:
LOCAL REMOTE PEER
1st port on subnet X <--> 1st port on subnet X
2nd port on subnet X <--> 2nd port on subnet X
3nd port on subnet X <--> 3nd port on subnet X
...etc.
Note that the port numbers may not be contiguous, and they
may not be the same on either side. Hence the "1st", "2nd",
"3rd, etc. notation, above.
Hence, if the local "rank" of this module's port on the
subnet ID is greater than the total number of ports on the
peer on this same subnet, then we have no match. So skip
this connection. */
if (rem_subnet_id_port_cnt < lcl_subnet_id_port_cnt &&
btl_rank >= rem_subnet_id_port_cnt) {
BTL_VERBOSE(("Not enough remote ports on this subnet id, moving on.. "));
continue;
}
/* Now that we have verified that we're on the same subnet and
the remote peer has enough ports, see if that specific port
on the peer has a matching CPC. */
assert(btl_rank <= ib_proc->proc_port_count);
assert(remote_matching_port != -1);
if (OPAL_SUCCESS !=
opal_btl_openib_connect_base_find_match(openib_btl,
&(ib_proc->proc_ports[remote_matching_port]),
&local_cpc,
&remote_cpc_data)) {
continue;
}
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
/* The btl_proc datastructure is shared by all IB BTL
* instances that are trying to reach this destination.
* Cache the peer instance on the btl_proc.
*/
endpoint = OBJ_NEW(mca_btl_openib_endpoint_t);
assert(((opal_object_t*)endpoint)->obj_reference_count == 1);
if(NULL == endpoint) {
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERR_OUT_OF_RESOURCE;
}
#if HAVE_XRC
if (MCA_BTL_XRC_ENABLED) {
int rem_port_cnt = 0;
for(j = 0; j < (int) ib_proc->proc_port_count; j++) {
if(ib_proc->proc_ports[j].pm_port_info.subnet_id ==
openib_btl->port_info.subnet_id) {
if (rem_port_cnt == btl_rank)
break;
else
rem_port_cnt ++;
}
}
assert(rem_port_cnt == btl_rank);
/* Push the subnet/lid/jobid to xrc hash */
rc = mca_btl_openib_ib_address_add_new(
ib_proc->proc_ports[j].pm_port_info.lid,
ib_proc->proc_ports[j].pm_port_info.subnet_id,
proc->proc_name.jobid, endpoint);
if (OPAL_SUCCESS != rc ) {
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR;
}
}
#endif
mca_btl_openib_endpoint_init(openib_btl, endpoint,
local_cpc,
&(ib_proc->proc_ports[remote_matching_port]),
remote_cpc_data);
rc = mca_btl_openib_proc_insert(ib_proc, endpoint);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
continue;
}
if(OPAL_SUCCESS != mca_btl_openib_tune_endpoint(openib_btl, endpoint)) {
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return OPAL_ERROR;
}
endpoint->index = opal_pointer_array_add(openib_btl->device->endpoints, (void*)endpoint);
if( 0 > endpoint->index ) {
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
continue;
}
/* Tell the selected CPC that it won. NOTE: This call is
outside of / separate from mca_btl_openib_endpoint_init()
because this function likely needs the endpoint->index. */
if (NULL != local_cpc->cbm_endpoint_init) {
rc = local_cpc->cbm_endpoint_init(endpoint);
if (OPAL_SUCCESS != rc) {
OBJ_RELEASE(endpoint);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
continue;
}
}
opal_bitmap_set_bit(reachable, i);
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
peers[i] = endpoint;
}
opal_mutex_lock(&openib_btl->ib_lock);
openib_btl->local_procs += local_procs;
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
opal_mutex_unlock(&openib_btl->ib_lock);
return OPAL_SUCCESS;
}
@ -1052,31 +1072,71 @@ int mca_btl_openib_add_procs(
struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_module_t *btl, struct opal_proc_t *proc)
{
mca_btl_openib_module_t *openib_btl = (mca_btl_openib_module_t *) btl;
mca_btl_base_endpoint_t *endpoint;
mca_btl_base_endpoint_t *endpoint = NULL;
mca_btl_openib_proc_t *ib_proc;
int j, rc;
int local_port_cnt = 0, btl_rank;
bool is_new;
if (NULL == (ib_proc = mca_btl_openib_proc_create(proc))) {
// TODO: shift to the separate function
/* protect the device */
opal_mutex_lock(&openib_btl->device->device_lock);
rc = prepare_device_for_use_nolock (openib_btl->device);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("could not prepare openib device for use"));
opal_mutex_unlock(&openib_btl->device->device_lock);
return NULL;
}
rc = mca_btl_openib_size_queues_nolock(openib_btl, 1);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("error creating cqs"));
opal_mutex_unlock(&openib_btl->device->device_lock);
return NULL;
}
opal_mutex_unlock(&openib_btl->device->device_lock);
if (NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc, &is_new))) {
/* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it,
* so just mark it as unreachable by us */
return NULL;
}
OPAL_THREAD_LOCK(&ib_proc->proc_lock);
for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) {
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
return endpoint;
if( !is_new ){
for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) {
goto exit;
}
}
}
OPAL_THREAD_UNLOCK(&ib_proc->proc_lock);
BTL_VERBOSE(("creating new endpoint for remote process {.jobid = 0x%x, .vpid = 0x%x}",
proc->proc_name.jobid, proc->proc_name.vpid));
endpoint = NULL;
(void) mca_btl_openib_add_procs (btl, 1, &proc, &endpoint, NULL);
for(j=0; j < mca_btl_openib_component.ib_num_btls; j++){
if(mca_btl_openib_component.openib_btls[j]->port_info.subnet_id
== openib_btl->port_info.subnet_id) {
if(openib_btl == mca_btl_openib_component.openib_btls[j]) {
btl_rank = local_port_cnt;
}
local_port_cnt++;
}
}
(void)init_ib_proc_nolock(openib_btl, ib_proc, &endpoint,
local_port_cnt, btl_rank);
exit:
opal_mutex_unlock(&ib_proc->proc_lock);
if (is_new && OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
opal_mutex_lock(&openib_btl->ib_lock);
openib_btl->local_procs += 1;
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
opal_mutex_unlock(&openib_btl->ib_lock);
}
return endpoint;
}

Просмотреть файл

@ -44,10 +44,6 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
ib_proc->proc_endpoints = 0;
ib_proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&ib_proc->proc_lock, opal_mutex_t);
/* add to list of all proc instance */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
}
/*
@ -56,11 +52,6 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
{
/* remove from list of all proc instances */
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
opal_list_remove_item(&mca_btl_openib_component.ib_procs, &ib_proc->super);
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
/* release resources */
if(NULL != ib_proc->proc_endpoints) {
free(ib_proc->proc_endpoints);
@ -84,26 +75,38 @@ void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
* Look for an existing IB process instances based on the associated
* opal_proc_t instance.
*/
static mca_btl_openib_proc_t* mca_btl_openib_proc_lookup_proc(opal_proc_t* proc)
static mca_btl_openib_proc_t* ibproc_lookup_no_lock(opal_proc_t* proc)
{
mca_btl_openib_proc_t* ib_proc;
OPAL_THREAD_LOCK(&mca_btl_openib_component.ib_lock);
for(ib_proc = (mca_btl_openib_proc_t*)
opal_list_get_first(&mca_btl_openib_component.ib_procs);
ib_proc != (mca_btl_openib_proc_t*)
opal_list_get_end(&mca_btl_openib_component.ib_procs);
ib_proc = (mca_btl_openib_proc_t*)opal_list_get_next(ib_proc)) {
if(ib_proc->proc_opal == proc) {
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return ib_proc;
}
}
OPAL_THREAD_UNLOCK(&mca_btl_openib_component.ib_lock);
return NULL;
}
static mca_btl_openib_proc_t* ibproc_lookup_and_lock(opal_proc_t* proc)
{
mca_btl_openib_proc_t* ib_proc;
/* get the process from the list */
opal_mutex_lock(&mca_btl_openib_component.ib_lock);
ib_proc = ibproc_lookup_no_lock(proc);
opal_mutex_unlock(&mca_btl_openib_component.ib_lock);
if( NULL != ib_proc ){
/* if we were able to find it - lock it.
* NOTE: we want to lock it outside of list locked region */
opal_mutex_lock(&ib_proc->proc_lock);
}
return ib_proc;
}
static void inline unpack8(char **src, uint8_t *value)
{
/* Copy one character */
@ -120,9 +123,9 @@ static void inline unpack8(char **src, uint8_t *value)
* associated w/ a given destination on this datastructure.
*/
mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *is_new)
{
mca_btl_openib_proc_t* module_proc = NULL;
mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL;
size_t msg_size;
uint32_t size;
int rc, i, j;
@ -130,21 +133,30 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
char *offset;
int modex_message_size;
mca_btl_openib_modex_message_t dummy;
*is_new = false;
/* Check if we have already created a IB proc
* structure for this ompi process */
module_proc = mca_btl_openib_proc_lookup_proc(proc);
if (NULL != module_proc) {
ib_proc = ibproc_lookup_and_lock(proc);
if (NULL != ib_proc) {
/* Gotcha! */
return module_proc;
return ib_proc;
}
/* Oops! First time, gotta create a new IB proc
/* All initialization has to be an atomic operation. we do the following assumption:
* - we let all concurent threads to try to do the initialization;
* - when one has finished it locks ib_lock and checks if corresponding
* process is still missing;
* - if so - new proc is added, otherwise - initialized proc struct is released.
*/
/* First time, gotta create a new IB proc
* out of the opal_proc ... */
module_proc = OBJ_NEW(mca_btl_openib_proc_t);
ib_proc = OBJ_NEW(mca_btl_openib_proc_t);
/* Initialize number of peer */
module_proc->proc_endpoint_count = 0;
module_proc->proc_opal = proc;
ib_proc->proc_endpoint_count = 0;
ib_proc->proc_opal = proc;
/* query for the peer address info */
OPAL_MODEX_RECV(rc, &mca_btl_openib_component.super.btl_version,
@ -153,11 +165,10 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
BTL_VERBOSE(("[%s:%d] opal_modex_recv failed for peer %s",
__FILE__, __LINE__,
OPAL_NAME_PRINT(proc->proc_name)));
OBJ_RELEASE(module_proc);
return NULL;
goto err_exit;
}
if (0 == msg_size) {
return NULL;
goto err_exit;
}
/* Message was packed in btl_openib_component.c; the format is
@ -166,22 +177,22 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
/* Unpack the number of modules in the message */
offset = (char *) message;
unpack8(&offset, &(module_proc->proc_port_count));
BTL_VERBOSE(("unpack: %d btls", module_proc->proc_port_count));
if (module_proc->proc_port_count > 0) {
module_proc->proc_ports = (mca_btl_openib_proc_modex_t *)
unpack8(&offset, &(ib_proc->proc_port_count));
BTL_VERBOSE(("unpack: %d btls", ib_proc->proc_port_count));
if (ib_proc->proc_port_count > 0) {
ib_proc->proc_ports = (mca_btl_openib_proc_modex_t *)
malloc(sizeof(mca_btl_openib_proc_modex_t) *
module_proc->proc_port_count);
ib_proc->proc_port_count);
} else {
module_proc->proc_ports = NULL;
ib_proc->proc_ports = NULL;
}
/* Loop over unpacking all the ports */
for (i = 0; i < module_proc->proc_port_count; i++) {
for (i = 0; i < ib_proc->proc_port_count; i++) {
/* Unpack the modex comment message struct */
size = modex_message_size;
memcpy(&(module_proc->proc_ports[i].pm_port_info), offset, size);
memcpy(&(ib_proc->proc_ports[i].pm_port_info), offset, size);
#if !defined(WORDS_BIGENDIAN) && OPAL_ENABLE_HETEROGENEOUS_SUPPORT
MCA_BTL_OPENIB_MODEX_MSG_NTOH(module_proc->proc_ports[i].pm_port_info);
#endif
@ -190,22 +201,22 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
i, (int)(offset-((char*)message))));
/* Unpack the number of CPCs that follow */
unpack8(&offset, &(module_proc->proc_ports[i].pm_cpc_data_count));
unpack8(&offset, &(ib_proc->proc_ports[i].pm_cpc_data_count));
BTL_VERBOSE(("unpacked btl %d: number of cpcs to follow %d (offset now %d)",
i, module_proc->proc_ports[i].pm_cpc_data_count,
i, ib_proc->proc_ports[i].pm_cpc_data_count,
(int)(offset-((char*)message))));
module_proc->proc_ports[i].pm_cpc_data = (opal_btl_openib_connect_base_module_data_t *)
calloc(module_proc->proc_ports[i].pm_cpc_data_count,
ib_proc->proc_ports[i].pm_cpc_data = (opal_btl_openib_connect_base_module_data_t *)
calloc(ib_proc->proc_ports[i].pm_cpc_data_count,
sizeof(opal_btl_openib_connect_base_module_data_t));
if (NULL == module_proc->proc_ports[i].pm_cpc_data) {
return NULL;
if (NULL == ib_proc->proc_ports[i].pm_cpc_data) {
goto err_exit;
}
/* Unpack the CPCs */
for (j = 0; j < module_proc->proc_ports[i].pm_cpc_data_count; ++j) {
for (j = 0; j < ib_proc->proc_ports[i].pm_cpc_data_count; ++j) {
uint8_t u8;
opal_btl_openib_connect_base_module_data_t *cpcd;
cpcd = module_proc->proc_ports[i].pm_cpc_data + j;
cpcd = ib_proc->proc_ports[i].pm_cpc_data + j;
unpack8(&offset, &u8);
BTL_VERBOSE(("unpacked btl %d: cpc %d: index %d (offset now %d)",
i, j, u8, (int)(offset-(char*)message)));
@ -224,7 +235,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
cpcd->cbm_modex_message = malloc(cpcd->cbm_modex_message_len);
if (NULL == cpcd->cbm_modex_message) {
BTL_ERROR(("Failed to malloc"));
return NULL;
goto err_exit;
}
memcpy(cpcd->cbm_modex_message, offset,
cpcd->cbm_modex_message_len);
@ -238,20 +249,52 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc)
}
}
if (0 == module_proc->proc_port_count) {
module_proc->proc_endpoints = NULL;
if (0 == ib_proc->proc_port_count) {
ib_proc->proc_endpoints = NULL;
} else {
module_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(module_proc->proc_port_count *
ib_proc->proc_endpoints = (mca_btl_base_endpoint_t**)
malloc(ib_proc->proc_port_count *
sizeof(mca_btl_base_endpoint_t*));
}
if (NULL == module_proc->proc_endpoints) {
OBJ_RELEASE(module_proc);
return NULL;
if (NULL == ib_proc->proc_endpoints) {
goto err_exit;
}
BTL_VERBOSE(("unpacking done!"));
return module_proc;
/* Finally add this process to the initialized procs list */
opal_mutex_lock(&mca_btl_openib_component.ib_lock);
ib_proc_ret = ibproc_lookup_no_lock(proc);
if (NULL == ib_proc_ret) {
/* if process can't be found in this list - insert it locked
* it is safe to lock ib_proc here because this thread is
* the only one who knows about it so far */
opal_mutex_lock(&ib_proc->proc_lock);
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
ib_proc_ret = ib_proc;
*is_new = true;
} else {
/* otherwise - release module_proc */
OBJ_RELEASE(ib_proc);
}
opal_mutex_unlock(&mca_btl_openib_component.ib_lock);
/* if we haven't insert the process - lock it here so we
* won't lock mca_btl_openib_component.ib_lock */
if( !(*is_new) ){
opal_mutex_lock(&ib_proc_ret->proc_lock);
}
return ib_proc_ret;
err_exit:
fprintf(stderr,"%d: error exit from mca_btl_openib_proc_create\n", OPAL_PROC_MY_NAME.vpid);
if( NULL != ib_proc ){
OBJ_RELEASE(ib_proc);
}
return NULL;
}
int mca_btl_openib_proc_remove(opal_proc_t *proc,
@ -262,7 +305,7 @@ int mca_btl_openib_proc_remove(opal_proc_t *proc,
/* Remove endpoint from the openib BTL version of the proc as
well */
ib_proc = mca_btl_openib_proc_lookup_proc(proc);
ib_proc = ibproc_lookup_and_lock(proc);
if (NULL != ib_proc) {
for (i = 0; i < ib_proc->proc_endpoint_count; ++i) {
if (ib_proc->proc_endpoints[i] == endpoint) {
@ -270,6 +313,7 @@ int mca_btl_openib_proc_remove(opal_proc_t *proc,
if (i == ib_proc->proc_endpoint_count - 1) {
--ib_proc->proc_endpoint_count;
}
opal_mutex_unlock(&ib_proc->proc_lock);
return OPAL_SUCCESS;
}
}

Просмотреть файл

@ -72,10 +72,10 @@ struct mca_btl_openib_proc_t {
uint8_t proc_port_count;
/** array of endpoints that have been created to access this proc */
struct mca_btl_base_endpoint_t **proc_endpoints;
volatile struct mca_btl_base_endpoint_t **proc_endpoints;
/** number of endpoints (length of proc_endpoints array) */
size_t proc_endpoint_count;
volatile size_t proc_endpoint_count;
/** lock to protect against concurrent access to proc state */
opal_mutex_t proc_lock;
@ -84,7 +84,7 @@ typedef struct mca_btl_openib_proc_t mca_btl_openib_proc_t;
OBJ_CLASS_DECLARATION(mca_btl_openib_proc_t);
mca_btl_openib_proc_t* mca_btl_openib_proc_create(opal_proc_t* proc);
mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *is_new);
int mca_btl_openib_proc_insert(mca_btl_openib_proc_t*, mca_btl_base_endpoint_t*);
int mca_btl_openib_proc_remove(opal_proc_t* proc,
mca_btl_base_endpoint_t* module_endpoint);