1
1

Fix openib process accounting if procs was dynamically added.

Этот коммит содержится в:
Artem Polyakov 2015-12-24 17:22:55 +06:00
родитель 400af6c52d
Коммит 3031affdb7
3 изменённых файлов: 189 добавлений и 57 удалений

Просмотреть файл

@ -1004,7 +1004,8 @@ int mca_btl_openib_add_procs(
opal_bitmap_t* reachable) opal_bitmap_t* reachable)
{ {
mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*)btl; mca_btl_openib_module_t* openib_btl = (mca_btl_openib_module_t*)btl;
int i,j, rc, local_procs; size_t nprocs_new_loc = 0, nprocs_new = 0;
int i,j, rc;
int lcl_subnet_id_port_cnt = 0; int lcl_subnet_id_port_cnt = 0;
int btl_rank = 0; int btl_rank = 0;
volatile mca_btl_base_endpoint_t* endpoint; volatile mca_btl_base_endpoint_t* endpoint;
@ -1037,23 +1038,10 @@ int mca_btl_openib_add_procs(
return rc; return rc;
} }
rc = openib_btl_size_queues(openib_btl, nprocs); /* prepare all proc's and account them properly */
if (OPAL_SUCCESS != rc) { for (i = 0, nprocs_new_loc = 0 ; i < (int) nprocs; i++) {
BTL_ERROR(("error creating cqs"));
return rc;
}
for (i = 0, local_procs = 0 ; i < (int) nprocs; i++) {
struct opal_proc_t* proc = procs[i]; struct opal_proc_t* proc = procs[i];
mca_btl_openib_proc_t* ib_proc; mca_btl_openib_proc_t* ib_proc;
bool found_existing = false;
bool is_new;
opal_output(-1, "add procs: adding proc %d", i);
if (OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
local_procs ++;
}
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE) #if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE)
/* Most current iWARP adapters (June 2008) cannot handle /* Most current iWARP adapters (June 2008) cannot handle
@ -1067,7 +1055,70 @@ int mca_btl_openib_add_procs(
} }
#endif #endif
if(NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc, &is_new)) ) { if(NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc)) ) {
/* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it,
* so just mark it as unreachable by us */
continue;
}
/* account this openib_btl in this proc */
rc = mca_btl_openib_proc_reg_btl(ib_proc, openib_btl);
opal_mutex_unlock( &ib_proc->proc_lock );
switch( rc ){
case OPAL_SUCCESS:
/* this is a new process to this openib btl */
nprocs_new++;
if (OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
nprocs_new_loc ++;
}
break;
case OPAL_ERR_RESOURCE_BUSY:
/* process was accounted earlier in this openib btl */
break;
default:
/* unexpected error, e.g. out of mem */
return rc;
}
}
/* account this procs if need */
rc = openib_btl_size_queues(openib_btl, nprocs_new);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("error creating cqs"));
return rc;
}
opal_mutex_lock(&openib_btl->device->device_lock);
openib_btl->local_procs += nprocs_new_loc;
if( 0 < nprocs_new_loc ){
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
}
opal_mutex_unlock(&openib_btl->device->device_lock);
/* prepare endpoints */
for (i = 0, nprocs_new_loc = 0 ; i < (int) nprocs; i++) {
struct opal_proc_t* proc = procs[i];
mca_btl_openib_proc_t* ib_proc;
bool found_existing = false;
opal_output(-1, "add procs: adding proc %d", i);
#if defined(HAVE_STRUCT_IBV_DEVICE_TRANSPORT_TYPE)
/* Most current iWARP adapters (June 2008) cannot handle
talking to other processes on the same host (!) -- so mark
them as unreachable (need to use sm). So for the moment,
we'll just mark any local peer on an iWARP NIC as
unreachable. See trac ticket #1352. */
if (IBV_TRANSPORT_IWARP == openib_btl->device->ib_dev->transport_type &&
OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
continue;
}
#endif
if(NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc)) ) {
/* if we don't have connection info for this process, it's /* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it, * okay because some other method might be able to reach it,
* so just mark it as unreachable by us */ * so just mark it as unreachable by us */
@ -1076,13 +1127,11 @@ int mca_btl_openib_add_procs(
found_existing = false; found_existing = false;
if( !is_new ){ for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) {
for (j = 0 ; j < (int) ib_proc->proc_endpoint_count ; ++j) { endpoint = ib_proc->proc_endpoints[j];
endpoint = ib_proc->proc_endpoints[j]; if (endpoint->endpoint_btl == openib_btl) {
if (endpoint->endpoint_btl == openib_btl) { found_existing = true;
found_existing = true; break;
break;
}
} }
} }
@ -1104,11 +1153,6 @@ int mca_btl_openib_add_procs(
} }
opal_mutex_lock(&openib_btl->ib_lock);
openib_btl->local_procs += local_procs;
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
opal_mutex_unlock(&openib_btl->ib_lock);
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
@ -1119,7 +1163,7 @@ struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_modul
mca_btl_openib_proc_t *ib_proc; mca_btl_openib_proc_t *ib_proc;
int rc; int rc;
int local_port_cnt = 0, btl_rank; int local_port_cnt = 0, btl_rank;
bool is_new; size_t nprocs_new = 0;
rc = prepare_device_for_use (openib_btl->device); rc = prepare_device_for_use (openib_btl->device);
if (OPAL_SUCCESS != rc) { if (OPAL_SUCCESS != rc) {
@ -1133,25 +1177,51 @@ struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_modul
return NULL; return NULL;
} }
rc = openib_btl_size_queues(openib_btl, 1); if (NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc))) {
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("error creating cqs"));
return NULL;
}
if (NULL == (ib_proc = mca_btl_openib_proc_get_locked(proc, &is_new))) {
/* if we don't have connection info for this process, it's /* if we don't have connection info for this process, it's
* okay because some other method might be able to reach it, * okay because some other method might be able to reach it,
* so just mark it as unreachable by us */ * so just mark it as unreachable by us */
return NULL; return NULL;
} }
if( !is_new ){ rc = mca_btl_openib_proc_reg_btl(ib_proc, openib_btl);
for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j]; switch( rc ){
if (endpoint->endpoint_btl == openib_btl) { case OPAL_SUCCESS:
goto exit; /* unlock first to avoid possible deadlocks */
} opal_mutex_unlock(&ib_proc->proc_lock);
/* this is a new process to this openib btl
* account this procs if need */
rc = openib_btl_size_queues(openib_btl, nprocs_new);
if (OPAL_SUCCESS != rc) {
BTL_ERROR(("error creating cqs"));
return NULL;
}
if( OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags) ) {
opal_mutex_lock(&openib_btl->ib_lock);
openib_btl->local_procs += 1;
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
opal_mutex_unlock(&openib_btl->ib_lock);
}
/* lock process back */
opal_mutex_lock(&ib_proc->proc_lock);
break;
case OPAL_ERR_RESOURCE_BUSY:
/* process was accounted earlier in this openib btl */
break;
default:
/* unexpected error, e.g. out of mem */
BTL_ERROR(("Unexpected OPAL error %d", rc));
return NULL;
}
for (size_t j = 0 ; j < ib_proc->proc_endpoint_count ; ++j) {
endpoint = ib_proc->proc_endpoints[j];
if (endpoint->endpoint_btl == openib_btl) {
goto exit;
} }
} }
@ -1168,14 +1238,6 @@ struct mca_btl_base_endpoint_t *mca_btl_openib_get_ep (struct mca_btl_base_modul
exit: exit:
opal_mutex_unlock(&ib_proc->proc_lock); opal_mutex_unlock(&ib_proc->proc_lock);
if ( (NULL != endpoint) && is_new &&
OPAL_PROC_ON_LOCAL_NODE(proc->proc_flags)) {
opal_mutex_lock(&openib_btl->ib_lock);
openib_btl->local_procs += 1;
openib_btl->device->mem_reg_max = openib_btl->device->mem_reg_max_total / openib_btl->local_procs;
opal_mutex_unlock(&openib_btl->ib_lock);
}
return (struct mca_btl_base_endpoint_t *)endpoint; return (struct mca_btl_base_endpoint_t *)endpoint;
} }

Просмотреть файл

@ -29,6 +29,23 @@
#include "connect/base.h" #include "connect/base.h"
#include "connect/connect.h" #include "connect/connect.h"
static void mca_btl_openib_proc_btl_construct(mca_btl_openib_proc_btlptr_t* elem);
static void mca_btl_openib_proc_btl_destruct(mca_btl_openib_proc_btlptr_t* elem);
OBJ_CLASS_INSTANCE(mca_btl_openib_proc_btlptr_t,
opal_list_item_t, mca_btl_openib_proc_btl_construct,
mca_btl_openib_proc_btl_destruct);
static void mca_btl_openib_proc_btl_construct(mca_btl_openib_proc_btlptr_t* elem)
{
elem->openib_btl = NULL;
}
static void mca_btl_openib_proc_btl_destruct(mca_btl_openib_proc_btlptr_t* elem)
{
elem->openib_btl = NULL;
}
static void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* proc); static void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* proc);
static void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* proc); static void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* proc);
@ -44,6 +61,7 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
ib_proc->proc_endpoints = 0; ib_proc->proc_endpoints = 0;
ib_proc->proc_endpoint_count = 0; ib_proc->proc_endpoint_count = 0;
OBJ_CONSTRUCT(&ib_proc->proc_lock, opal_mutex_t); OBJ_CONSTRUCT(&ib_proc->proc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&ib_proc->openib_btls, opal_list_t);
} }
/* /*
@ -52,6 +70,8 @@ void mca_btl_openib_proc_construct(mca_btl_openib_proc_t* ib_proc)
void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc) void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
{ {
mca_btl_openib_proc_btlptr_t* elem;
/* release resources */ /* release resources */
if(NULL != ib_proc->proc_endpoints) { if(NULL != ib_proc->proc_endpoints) {
free(ib_proc->proc_endpoints); free(ib_proc->proc_endpoints);
@ -68,6 +88,13 @@ void mca_btl_openib_proc_destruct(mca_btl_openib_proc_t* ib_proc)
free(ib_proc->proc_ports); free(ib_proc->proc_ports);
} }
OBJ_DESTRUCT(&ib_proc->proc_lock); OBJ_DESTRUCT(&ib_proc->proc_lock);
elem = (mca_btl_openib_proc_btlptr_t*)opal_list_remove_first(&ib_proc->openib_btls);
while( NULL != elem ){
OBJ_RELEASE(elem);
elem = (mca_btl_openib_proc_btlptr_t*)opal_list_remove_first(&ib_proc->openib_btls);
}
OBJ_DESTRUCT(&ib_proc->openib_btls);
} }
@ -123,7 +150,7 @@ static void inline unpack8(char **src, uint8_t *value)
* associated w/ a given destination on this datastructure. * associated w/ a given destination on this datastructure.
*/ */
mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *is_new) mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc)
{ {
mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL; mca_btl_openib_proc_t *ib_proc = NULL, *ib_proc_ret = NULL;
size_t msg_size; size_t msg_size;
@ -133,7 +160,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *i
char *offset; char *offset;
int modex_message_size; int modex_message_size;
mca_btl_openib_modex_message_t dummy; mca_btl_openib_modex_message_t dummy;
*is_new = false; bool is_new = false;
/* Check if we have already created a IB proc /* Check if we have already created a IB proc
* structure for this ompi process */ * structure for this ompi process */
@ -252,7 +279,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *i
if (0 == ib_proc->proc_port_count) { if (0 == ib_proc->proc_port_count) {
ib_proc->proc_endpoints = NULL; ib_proc->proc_endpoints = NULL;
} else { } else {
ib_proc->proc_endpoints = (mca_btl_base_endpoint_t**) ib_proc->proc_endpoints = (volatile mca_btl_base_endpoint_t**)
malloc(ib_proc->proc_port_count * malloc(ib_proc->proc_port_count *
sizeof(mca_btl_base_endpoint_t*)); sizeof(mca_btl_base_endpoint_t*));
} }
@ -273,7 +300,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *i
opal_mutex_lock(&ib_proc->proc_lock); opal_mutex_lock(&ib_proc->proc_lock);
opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super); opal_list_append(&mca_btl_openib_component.ib_procs, &ib_proc->super);
ib_proc_ret = ib_proc; ib_proc_ret = ib_proc;
*is_new = true; is_new = true;
} else { } else {
/* otherwise - release module_proc */ /* otherwise - release module_proc */
OBJ_RELEASE(ib_proc); OBJ_RELEASE(ib_proc);
@ -282,7 +309,7 @@ mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *i
/* if we haven't insert the process - lock it here so we /* if we haven't insert the process - lock it here so we
* won't lock mca_btl_openib_component.ib_lock */ * won't lock mca_btl_openib_component.ib_lock */
if( !(*is_new) ){ if( !is_new ){
opal_mutex_lock(&ib_proc_ret->proc_lock); opal_mutex_lock(&ib_proc_ret->proc_lock);
} }
@ -354,3 +381,27 @@ int mca_btl_openib_proc_insert(mca_btl_openib_proc_t* module_proc,
module_proc->proc_endpoints[module_proc->proc_endpoint_count++] = module_endpoint; module_proc->proc_endpoints[module_proc->proc_endpoint_count++] = module_endpoint;
return OPAL_SUCCESS; return OPAL_SUCCESS;
} }
int mca_btl_openib_proc_reg_btl(mca_btl_openib_proc_t* ib_proc,
mca_btl_openib_module_t* openib_btl)
{
mca_btl_openib_proc_btlptr_t* elem;
for(elem = (mca_btl_openib_proc_btlptr_t*)opal_list_get_first(&ib_proc->openib_btls);
elem != (mca_btl_openib_proc_btlptr_t*)opal_list_get_end(&ib_proc->openib_btls);
elem = (mca_btl_openib_proc_btlptr_t*)opal_list_get_next(elem)) {
if(elem->openib_btl == openib_btl) {
/* this is normal return meaning that this BTL has already touched this ib_proc */
return OPAL_ERR_RESOURCE_BUSY;
}
}
elem = OBJ_NEW(mca_btl_openib_proc_btlptr_t);
if( NULL == elem ){
return OPAL_ERR_OUT_OF_RESOURCE;
}
elem->openib_btl = openib_btl;
opal_list_append(&ib_proc->openib_btls, &elem->super);
return OPAL_SUCCESS;
}

Просмотреть файл

@ -52,6 +52,19 @@ typedef struct mca_btl_openib_proc_modex_t {
uint8_t pm_cpc_data_count; uint8_t pm_cpc_data_count;
} mca_btl_openib_proc_modex_t; } mca_btl_openib_proc_modex_t;
/**
* The list element to hold pointers to openin_btls that are using this
* ib_proc.
*/
struct mca_btl_openib_proc_btlptr_t {
opal_list_item_t super;
mca_btl_openib_module_t* openib_btl;
};
typedef struct mca_btl_openib_proc_btlptr_t mca_btl_openib_proc_btlptr_t;
OBJ_CLASS_DECLARATION(mca_btl_openib_proc_btlptr_t);
/** /**
* Represents the state of a remote process and the set of addresses * Represents the state of a remote process and the set of addresses
* that it exports. Also cache an instance of mca_btl_base_endpoint_t for * that it exports. Also cache an instance of mca_btl_base_endpoint_t for
@ -71,6 +84,9 @@ struct mca_btl_openib_proc_t {
/** length of proc_ports array */ /** length of proc_ports array */
uint8_t proc_port_count; uint8_t proc_port_count;
/** list of openib_btl's that touched this proc **/
opal_list_t openib_btls;
/** array of endpoints that have been created to access this proc */ /** array of endpoints that have been created to access this proc */
volatile struct mca_btl_base_endpoint_t **proc_endpoints; volatile struct mca_btl_base_endpoint_t **proc_endpoints;
@ -84,10 +100,13 @@ typedef struct mca_btl_openib_proc_t mca_btl_openib_proc_t;
OBJ_CLASS_DECLARATION(mca_btl_openib_proc_t); OBJ_CLASS_DECLARATION(mca_btl_openib_proc_t);
mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc, bool *is_new); mca_btl_openib_proc_t* mca_btl_openib_proc_get_locked(opal_proc_t* proc);
int mca_btl_openib_proc_insert(mca_btl_openib_proc_t*, mca_btl_base_endpoint_t*); int mca_btl_openib_proc_insert(mca_btl_openib_proc_t*, mca_btl_base_endpoint_t*);
int mca_btl_openib_proc_remove(opal_proc_t* proc, int mca_btl_openib_proc_remove(opal_proc_t* proc,
mca_btl_base_endpoint_t* module_endpoint); mca_btl_base_endpoint_t* module_endpoint);
int mca_btl_openib_proc_reg_btl(mca_btl_openib_proc_t* ib_proc,
mca_btl_openib_module_t* openib_btl);
END_C_DECLS END_C_DECLS