1
1

Continue enabling connect_accept across large numbers of independent jobs by replacing the hash tables with pointer_arrays to store routes to remote hnps to gain flexibility we'll need in the future.

This commit was SVN r23439.
Этот коммит содержится в:
Ralph Castain 2010-07-20 04:47:31 +00:00
родитель 248320b91a
Коммит f85e69b64b
3 изменённых файлов: 230 добавлений и 174 удалений

Просмотреть файл

@ -79,7 +79,7 @@ orte_routed_module_t orte_routed_cm_module = {
};
/* local globals */
static opal_hash_table_t jobfam_list;
static opal_pointer_array_t jobfams;
static opal_condition_t cond;
static opal_mutex_t lock;
static orte_process_name_t *lifeline=NULL;
@ -89,8 +89,8 @@ static bool ack_recvd;
static int init(void)
{
OBJ_CONSTRUCT(&jobfam_list, opal_hash_table_t);
opal_hash_table_init(&jobfam_list, 128);
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
@ -103,7 +103,8 @@ static int init(void)
static int finalize(void)
{
int rc;
int rc, i;
orte_routed_jobfam_t *jfam;
/* if I am a tool without a daemon, just cleanout
* the basics and leave
@ -123,7 +124,13 @@ static int finalize(void)
}
cleanup:
OBJ_DESTRUCT(&jobfam_list);
for (i=0; i < jobfams.size; i++) {
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
OBJ_RELEASE(jfam);
}
}
OBJ_DESTRUCT(&jobfams);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
@ -135,8 +142,9 @@ cleanup:
static int delete_route(orte_process_name_t *proc)
{
int rc;
orte_process_name_t *route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (proc->jobid == ORTE_JOBID_INVALID ||
proc->vpid == ORTE_VPID_INVALID) {
@ -161,23 +169,22 @@ static int delete_route(orte_process_name_t *proc)
*/
if (ORTE_JOB_FAMILY(proc->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
/* see if this proc is present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* proc is present - remove the data */
free(route_copy);
rc = opal_hash_table_remove_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid));
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
/* see if this job family is present */
jfamily = ORTE_JOB_FAMILY(proc->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial: deleting route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
opal_pointer_array_set_item(&jobfams, i, NULL);
OBJ_RELEASE(jfam);
return ORTE_SUCCESS;
}
}
/* not present - nothing to do */
return ORTE_SUCCESS;
}
@ -188,15 +195,16 @@ static int delete_route(orte_process_name_t *proc)
*/
/* remove any entries in the RML for this process */
rc = orte_rml.purge(proc);
orte_rml.purge(proc);
return ORTE_SUCCESS;
}
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t *route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
@ -242,34 +250,35 @@ static int update_route(orte_process_name_t *target,
ORTE_JOBID_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
/* see if this target is already present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* target already present - update the route info
* in case it has changed
*/
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
/* see if this target is already present */
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm: updating route to %s via %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
return ORTE_SUCCESS;
}
}
/* not there, so add the route FOR THE JOB FAMILY*/
route_copy = (orte_process_name_t *) malloc(sizeof(orte_process_name_t));
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm: adding route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
jfam = OBJ_NEW(orte_routed_jobfam_t);
jfam->job_family = jfamily;
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
opal_pointer_array_add(&jobfams, jfam);
return ORTE_SUCCESS;
}
/* THIS CAME FROM OUR OWN JOB FAMILY... */
@ -283,10 +292,11 @@ static int update_route(orte_process_name_t *target,
static orte_process_name_t get_route(orte_process_name_t *target)
{
orte_process_name_t *ret, daemon;
int rc;
int32_t i;
orte_job_t *jdata;
orte_proc_t *proc;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
@ -328,11 +338,19 @@ static orte_process_name_t get_route(orte_process_name_t *target)
/* if I am the HNP, then I stored a route to
* this job family, so look it up
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_cm: route to %s found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
ret = &jfam->route;
goto found;
}
}
/* not found - so we have no route */
ret = ORTE_NAME_INVALID;

Просмотреть файл

@ -75,7 +75,7 @@ orte_routed_module_t orte_routed_linear_module = {
};
/* local globals */
static opal_hash_table_t jobfam_list;
static opal_pointer_array_t jobfams;
static opal_condition_t cond;
static opal_mutex_t lock;
static orte_process_name_t *lifeline=NULL;
@ -86,9 +86,9 @@ static bool ack_recvd;
static int init(void)
{
OBJ_CONSTRUCT(&jobfam_list, opal_hash_table_t);
opal_hash_table_init(&jobfam_list, 128);
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
OBJ_CONSTRUCT(&lock, opal_mutex_t);
@ -100,7 +100,8 @@ static int init(void)
static int finalize(void)
{
int rc;
int rc, i;
orte_routed_jobfam_t *jfam;
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
@ -114,7 +115,13 @@ static int finalize(void)
}
}
OBJ_DESTRUCT(&jobfam_list);
for (i=0; i < jobfams.size; i++) {
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
OBJ_RELEASE(jfam);
}
}
OBJ_DESTRUCT(&jobfams);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
@ -126,8 +133,9 @@ static int finalize(void)
static int delete_route(orte_process_name_t *proc)
{
int rc;
orte_process_name_t *route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (proc->jobid == ORTE_JOBID_INVALID ||
proc->vpid == ORTE_VPID_INVALID) {
@ -162,23 +170,22 @@ static int delete_route(orte_process_name_t *proc)
return ORTE_SUCCESS;
}
/* see if this proc is present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* proc is present - remove the data */
free(route_copy);
rc = opal_hash_table_remove_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid));
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
/* see if this job family is present */
jfamily = ORTE_JOB_FAMILY(proc->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial: deleting route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
opal_pointer_array_set_item(&jobfams, i, NULL);
OBJ_RELEASE(jfam);
return ORTE_SUCCESS;
}
}
/* not present - nothing to do */
return ORTE_SUCCESS;
}
@ -194,8 +201,9 @@ static int delete_route(orte_process_name_t *proc)
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t * route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
@ -242,34 +250,35 @@ static int update_route(orte_process_name_t *target,
ORTE_JOBID_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
/* see if this target is already present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* target already present - update the route info
* in case it has changed
*/
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
/* see if this target is already present */
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_linear: updating route to %s via %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
return ORTE_SUCCESS;
}
}
/* not there, so add the route FOR THE JOB FAMILY*/
route_copy = (orte_process_name_t *) malloc(sizeof(orte_process_name_t));
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_linear: adding route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
jfam = OBJ_NEW(orte_routed_jobfam_t);
jfam->job_family = jfamily;
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
opal_pointer_array_add(&jobfams, jfam);
return ORTE_SUCCESS;
}
/* THIS CAME FROM OUR OWN JOB FAMILY... */
@ -283,7 +292,9 @@ static int update_route(orte_process_name_t *target,
static orte_process_name_t get_route(orte_process_name_t *target)
{
orte_process_name_t *ret, daemon;
int rc;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
/* if it is me, then the route is just direct */
if (OPAL_EQUAL == opal_dss.compare(ORTE_PROC_MY_NAME, target, ORTE_NAME)) {
@ -318,11 +329,19 @@ static orte_process_name_t get_route(orte_process_name_t *target)
/* if I am the HNP or a tool, then I stored a route to
* this job family, so look it up
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial: route to %s found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
ret = &jfam->route;
goto found;
}
}
/* not found - so we have no route */
ret = ORTE_NAME_INVALID;

Просмотреть файл

@ -76,7 +76,7 @@ orte_routed_module_t orte_routed_radix_module = {
};
/* local globals */
static opal_hash_table_t jobfam_list;
static opal_pointer_array_t jobfams;
static opal_condition_t cond;
static opal_mutex_t lock;
static orte_process_name_t *lifeline=NULL;
@ -89,8 +89,8 @@ static bool ack_recvd;
static int init(void)
{
OBJ_CONSTRUCT(&jobfam_list, opal_hash_table_t);
opal_hash_table_init(&jobfam_list, 128);
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
/* setup the global condition and lock */
OBJ_CONSTRUCT(&cond, opal_condition_t);
@ -108,9 +108,10 @@ static int init(void)
static int finalize(void)
{
int rc;
int rc, i;
opal_list_item_t *item;
orte_routed_jobfam_t *jfam;
/* if I am an application process, indicate that I am
* truly finalizing prior to departure
*/
@ -123,7 +124,13 @@ static int finalize(void)
}
}
OBJ_DESTRUCT(&jobfam_list);
for (i=0; i < jobfams.size; i++) {
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
OBJ_RELEASE(jfam);
}
}
OBJ_DESTRUCT(&jobfams);
/* destruct the global condition and lock */
OBJ_DESTRUCT(&cond);
OBJ_DESTRUCT(&lock);
@ -142,9 +149,10 @@ static int finalize(void)
static int delete_route(orte_process_name_t *proc)
{
int rc;
orte_process_name_t *route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (proc->jobid == ORTE_JOBID_INVALID ||
proc->vpid == ORTE_VPID_INVALID) {
return ORTE_ERR_BAD_PARAM;
@ -178,23 +186,22 @@ static int delete_route(orte_process_name_t *proc)
return ORTE_SUCCESS;
}
/* see if this proc is present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* proc is present - remove the data */
free(route_copy);
rc = opal_hash_table_remove_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(proc->jobid));
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
}
/* see if this job family is present */
jfamily = ORTE_JOB_FAMILY(proc->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial: deleting route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
opal_pointer_array_set_item(&jobfams, i, NULL);
OBJ_RELEASE(jfam);
return ORTE_SUCCESS;
}
}
/* not present - nothing to do */
return ORTE_SUCCESS;
}
@ -210,8 +217,9 @@ static int delete_route(orte_process_name_t *proc)
static int update_route(orte_process_name_t *target,
orte_process_name_t *route)
{
int rc;
orte_process_name_t *route_copy;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
@ -258,34 +266,35 @@ static int update_route(orte_process_name_t *target,
ORTE_JOBID_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
/* see if this target is already present - it will have a wildcard vpid,
* so we have to look for it with that condition
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid),
(void**)&route_copy);
if (ORTE_SUCCESS == rc && NULL != route_copy) {
/* target already present - update the route info
* in case it has changed
*/
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
/* see if this target is already present */
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_radix: updating route to %s via %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid),
ORTE_NAME_PRINT(route)));
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
return ORTE_SUCCESS;
}
}
/* not there, so add the route FOR THE JOB FAMILY*/
route_copy = (orte_process_name_t *) malloc(sizeof(orte_process_name_t));
*route_copy = *route;
rc = opal_hash_table_set_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), route_copy);
if (ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
return rc;
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_radix: adding route to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
jfam = OBJ_NEW(orte_routed_jobfam_t);
jfam->job_family = jfamily;
jfam->route.jobid = route->jobid;
jfam->route.vpid = route->vpid;
opal_pointer_array_add(&jobfams, jfam);
return ORTE_SUCCESS;
}
/* THIS CAME FROM OUR OWN JOB FAMILY... */
@ -301,7 +310,9 @@ static orte_process_name_t get_route(orte_process_name_t *target)
orte_process_name_t *ret, daemon;
opal_list_item_t *item;
orte_routed_tree_t *child;
int rc;
int i;
orte_routed_jobfam_t *jfam;
uint16_t jfamily;
if (target->jobid == ORTE_JOBID_INVALID ||
target->vpid == ORTE_VPID_INVALID) {
@ -342,11 +353,19 @@ static orte_process_name_t get_route(orte_process_name_t *target)
/* if I am the HNP or a tool, then I stored a route to
* this job family, so look it up
*/
rc = opal_hash_table_get_value_uint32(&jobfam_list,
ORTE_JOB_FAMILY(target->jobid), (void**)&ret);
if (ORTE_SUCCESS == rc) {
/* got a good result - return it */
goto found;
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_output,
"%s routed_binomial: route to %s found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
ret = &jfam->route;
goto found;
}
}
/* not found - so we have no route */
ret = ORTE_NAME_INVALID;