Improve the efficiency by making the check for uniqueness of incoming hnp contact info much faster by including the hnp_uri in the job_family tracker object. Replace the global buffer storage with a quick routine to build the buffer from the jobfams array
This commit was SVN r23443.
Этот коммит содержится в:
родитель
837fb29fab
Коммит
f3e13b9766
@ -75,11 +75,13 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
|||||||
orte_vpid_t num_procs;
|
orte_vpid_t num_procs;
|
||||||
char *rml_uri;
|
char *rml_uri;
|
||||||
orte_process_name_t name;
|
orte_process_name_t name;
|
||||||
|
bool got_name;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
/* unpack the data for each entry */
|
/* unpack the data for each entry */
|
||||||
num_procs = 0;
|
num_procs = 0;
|
||||||
name.jobid = ORTE_JOBID_INVALID;
|
name.jobid = ORTE_JOBID_INVALID;
|
||||||
|
got_name = false;
|
||||||
cnt = 1;
|
cnt = 1;
|
||||||
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
|
while (ORTE_SUCCESS == (rc = opal_dss.unpack(data, &rml_uri, &cnt, OPAL_STRING))) {
|
||||||
|
|
||||||
@ -95,23 +97,24 @@ int orte_rml_base_update_contact_info(opal_buffer_t* data)
|
|||||||
free(rml_uri);
|
free(rml_uri);
|
||||||
return(rc);
|
return(rc);
|
||||||
}
|
}
|
||||||
/* update the routing table */
|
if (!got_name) {
|
||||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
|
/* we only get an update from a single jobid - the command
|
||||||
ORTE_ERROR_LOG(rc);
|
* that creates these doesn't cross jobid boundaries - so
|
||||||
free(rml_uri);
|
* record it here
|
||||||
return rc;
|
*/
|
||||||
}
|
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(rml_uri, &name, NULL))) {
|
||||||
/* if this is for a different job family */
|
ORTE_ERROR_LOG(rc);
|
||||||
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
free(rml_uri);
|
||||||
if (!orte_routed.route_is_defined(&name)) {
|
return rc;
|
||||||
/* update the route to this proc */
|
}
|
||||||
|
got_name = true;
|
||||||
|
/* if this is for a different job family, update the route to this proc */
|
||||||
|
if (ORTE_JOB_FAMILY(name.jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&name, &name))) {
|
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&name, &name))) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
free(rml_uri);
|
free(rml_uri);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
/* and store the uri */
|
|
||||||
opal_dss.pack(&orte_remote_hnps, &rml_uri, 1, OPAL_STRING);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(rml_uri);
|
free(rml_uri);
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
#include "opal/mca/mca.h"
|
#include "opal/mca/mca.h"
|
||||||
|
|
||||||
|
#include "opal/class/opal_pointer_array.h"
|
||||||
#include "opal/dss/dss_types.h"
|
#include "opal/dss/dss_types.h"
|
||||||
#include "opal/threads/threads.h"
|
#include "opal/threads/threads.h"
|
||||||
|
|
||||||
@ -39,10 +40,12 @@ ORTE_DECLSPEC extern opal_list_t orte_routed_base_components;
|
|||||||
ORTE_DECLSPEC extern opal_mutex_t orte_routed_base_lock;
|
ORTE_DECLSPEC extern opal_mutex_t orte_routed_base_lock;
|
||||||
ORTE_DECLSPEC extern opal_condition_t orte_routed_base_cond;
|
ORTE_DECLSPEC extern opal_condition_t orte_routed_base_cond;
|
||||||
ORTE_DECLSPEC extern bool orte_routed_base_wait_sync;
|
ORTE_DECLSPEC extern bool orte_routed_base_wait_sync;
|
||||||
|
ORTE_DECLSPEC extern opal_pointer_array_t orte_routed_jobfams;
|
||||||
|
|
||||||
ORTE_DECLSPEC extern int orte_routed_base_register_sync(bool setup);
|
ORTE_DECLSPEC int orte_routed_base_register_sync(bool setup);
|
||||||
ORTE_DECLSPEC extern int orte_routed_base_process_callback(orte_jobid_t job,
|
ORTE_DECLSPEC int orte_routed_base_process_callback(orte_jobid_t job,
|
||||||
opal_buffer_t *buffer);
|
opal_buffer_t *buffer);
|
||||||
|
ORTE_DECLSPEC void orte_routed_base_update_hnps(opal_buffer_t *buf);
|
||||||
|
|
||||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||||
|
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
#include "opal/util/output.h"
|
#include "opal/util/output.h"
|
||||||
#include "opal/mca/base/mca_base_component_repository.h"
|
#include "opal/mca/base/mca_base_component_repository.h"
|
||||||
|
|
||||||
|
#include "orte/mca/errmgr/errmgr.h"
|
||||||
|
#include "orte/mca/rml/base/rml_contact.h"
|
||||||
#include "orte/util/proc_info.h"
|
#include "orte/util/proc_info.h"
|
||||||
#include "orte/runtime/orte_globals.h"
|
#include "orte/runtime/orte_globals.h"
|
||||||
|
|
||||||
@ -56,8 +58,20 @@ static void destruct(orte_routed_tree_t *rt)
|
|||||||
OBJ_CLASS_INSTANCE(orte_routed_tree_t, opal_list_item_t,
|
OBJ_CLASS_INSTANCE(orte_routed_tree_t, opal_list_item_t,
|
||||||
construct, destruct);
|
construct, destruct);
|
||||||
|
|
||||||
|
static void jfamconst(orte_routed_jobfam_t *ptr)
|
||||||
|
{
|
||||||
|
ptr->route.jobid = ORTE_JOBID_INVALID;
|
||||||
|
ptr->route.vpid = ORTE_VPID_INVALID;
|
||||||
|
ptr->hnp_uri = NULL;
|
||||||
|
}
|
||||||
|
static void jfamdest(orte_routed_jobfam_t *ptr)
|
||||||
|
{
|
||||||
|
if (NULL != ptr->hnp_uri) {
|
||||||
|
free(ptr->hnp_uri);
|
||||||
|
}
|
||||||
|
}
|
||||||
OBJ_CLASS_INSTANCE(orte_routed_jobfam_t, opal_object_t,
|
OBJ_CLASS_INSTANCE(orte_routed_jobfam_t, opal_object_t,
|
||||||
NULL, NULL);
|
jfamconst, jfamdest);
|
||||||
|
|
||||||
int orte_routed_base_output = -1;
|
int orte_routed_base_output = -1;
|
||||||
orte_routed_module_t orte_routed = {0};
|
orte_routed_module_t orte_routed = {0};
|
||||||
@ -65,6 +79,7 @@ opal_list_t orte_routed_base_components;
|
|||||||
opal_mutex_t orte_routed_base_lock;
|
opal_mutex_t orte_routed_base_lock;
|
||||||
opal_condition_t orte_routed_base_cond;
|
opal_condition_t orte_routed_base_cond;
|
||||||
bool orte_routed_base_wait_sync;
|
bool orte_routed_base_wait_sync;
|
||||||
|
opal_pointer_array_t orte_routed_jobfams;
|
||||||
|
|
||||||
static orte_routed_component_t *active_component = NULL;
|
static orte_routed_component_t *active_component = NULL;
|
||||||
static bool component_open_called = false;
|
static bool component_open_called = false;
|
||||||
@ -75,6 +90,7 @@ int
|
|||||||
orte_routed_base_open(void)
|
orte_routed_base_open(void)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
if (opened) {
|
if (opened) {
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
@ -89,11 +105,19 @@ orte_routed_base_open(void)
|
|||||||
|
|
||||||
/* Initialize globals */
|
/* Initialize globals */
|
||||||
OBJ_CONSTRUCT(&orte_routed_base_components, opal_list_t);
|
OBJ_CONSTRUCT(&orte_routed_base_components, opal_list_t);
|
||||||
|
|
||||||
/* Initialize storage of remote hnp uris */
|
/* Initialize storage of remote hnp uris */
|
||||||
OBJ_CONSTRUCT(&orte_remote_hnps, opal_buffer_t);
|
OBJ_CONSTRUCT(&orte_routed_jobfams, opal_pointer_array_t);
|
||||||
|
opal_pointer_array_init(&orte_routed_jobfams, 8, INT_MAX, 8);
|
||||||
/* prime it with our HNP uri */
|
/* prime it with our HNP uri */
|
||||||
opal_dss.pack(&orte_remote_hnps, &orte_process_info.my_hnp_uri, 1, OPAL_STRING);
|
jfam = OBJ_NEW(orte_routed_jobfam_t);
|
||||||
|
jfam->route.jobid = ORTE_PROC_MY_HNP->jobid;
|
||||||
|
jfam->route.vpid = ORTE_PROC_MY_HNP->vpid;
|
||||||
|
jfam->job_family = ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid);
|
||||||
|
if (NULL != orte_process_info.my_hnp_uri) {
|
||||||
|
jfam->hnp_uri = strdup(orte_process_info.my_hnp_uri);
|
||||||
|
}
|
||||||
|
opal_pointer_array_add(&orte_routed_jobfams, jfam);
|
||||||
|
|
||||||
/* Open up all available components */
|
/* Open up all available components */
|
||||||
ret = mca_base_components_open("routed",
|
ret = mca_base_components_open("routed",
|
||||||
@ -151,19 +175,27 @@ orte_routed_base_select(void)
|
|||||||
int
|
int
|
||||||
orte_routed_base_close(void)
|
orte_routed_base_close(void)
|
||||||
{
|
{
|
||||||
|
int i;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
/* finalize the selected component */
|
/* finalize the selected component */
|
||||||
if (NULL != orte_routed.finalize) {
|
if (NULL != orte_routed.finalize) {
|
||||||
orte_routed.finalize();
|
orte_routed.finalize();
|
||||||
}
|
}
|
||||||
|
|
||||||
OBJ_DESTRUCT(&orte_remote_hnps);
|
|
||||||
|
|
||||||
/* shutdown any remaining opened components */
|
/* shutdown any remaining opened components */
|
||||||
if (component_open_called) {
|
if (component_open_called) {
|
||||||
mca_base_components_close(orte_routed_base_output,
|
mca_base_components_close(orte_routed_base_output,
|
||||||
&orte_routed_base_components, NULL);
|
&orte_routed_base_components, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
|
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
|
OBJ_RELEASE(jfam);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OBJ_DESTRUCT(&orte_routed_jobfams);
|
||||||
|
|
||||||
OBJ_DESTRUCT(&orte_routed_base_components);
|
OBJ_DESTRUCT(&orte_routed_base_components);
|
||||||
OBJ_DESTRUCT(&orte_routed_base_lock);
|
OBJ_DESTRUCT(&orte_routed_base_lock);
|
||||||
OBJ_DESTRUCT(&orte_routed_base_cond);
|
OBJ_DESTRUCT(&orte_routed_base_cond);
|
||||||
@ -174,4 +206,52 @@ orte_routed_base_close(void)
|
|||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void orte_routed_base_update_hnps(opal_buffer_t *buf)
|
||||||
|
{
|
||||||
|
int n, rc;
|
||||||
|
char *uri;
|
||||||
|
orte_process_name_t name;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
uint16_t jobfamily;
|
||||||
|
|
||||||
|
n = 1;
|
||||||
|
while (ORTE_SUCCESS == opal_dss.unpack(buf, &uri, &n, OPAL_STRING)) {
|
||||||
|
/*extract the name */
|
||||||
|
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(uri, &name, NULL))) {
|
||||||
|
ORTE_ERROR_LOG(rc);
|
||||||
|
free(uri);
|
||||||
|
n=1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
jobfamily = ORTE_JOB_FAMILY(name.jobid);
|
||||||
|
/* see if we already have this connection */
|
||||||
|
for (n=0; n < orte_routed_jobfams.size; n++) {
|
||||||
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams,n))) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (jobfamily == jfam->job_family) {
|
||||||
|
/* update uri */
|
||||||
|
if (NULL != jfam->hnp_uri) {
|
||||||
|
free(jfam->hnp_uri);
|
||||||
|
}
|
||||||
|
jfam->hnp_uri = strdup(uri);
|
||||||
|
OPAL_OUTPUT_VERBOSE((10, orte_routed_base_output,
|
||||||
|
"%s adding remote HNP %s\n\t%s",
|
||||||
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
|
ORTE_NAME_PRINT(&name), uri));
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* nope - create it */
|
||||||
|
jfam = OBJ_NEW(orte_routed_jobfam_t);
|
||||||
|
jfam->job_family = jobfamily;
|
||||||
|
jfam->route.jobid = name.jobid;
|
||||||
|
jfam->route.vpid = name.vpid;
|
||||||
|
jfam->hnp_uri = strdup(uri);
|
||||||
|
done:
|
||||||
|
free(uri);
|
||||||
|
n=1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||||
|
@ -76,7 +76,6 @@ orte_routed_module_t orte_routed_binomial_module = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* local globals */
|
/* local globals */
|
||||||
static opal_pointer_array_t jobfams;
|
|
||||||
static opal_condition_t cond;
|
static opal_condition_t cond;
|
||||||
static opal_mutex_t lock;
|
static opal_mutex_t lock;
|
||||||
static orte_process_name_t *lifeline=NULL;
|
static orte_process_name_t *lifeline=NULL;
|
||||||
@ -89,8 +88,6 @@ static bool ack_recvd;
|
|||||||
|
|
||||||
static int init(void)
|
static int init(void)
|
||||||
{
|
{
|
||||||
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
|
|
||||||
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
|
|
||||||
|
|
||||||
/* setup the global condition and lock */
|
/* setup the global condition and lock */
|
||||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||||
@ -108,9 +105,8 @@ static int init(void)
|
|||||||
|
|
||||||
static int finalize(void)
|
static int finalize(void)
|
||||||
{
|
{
|
||||||
int rc, i;
|
int rc;
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *item;
|
||||||
orte_routed_jobfam_t *jfam;
|
|
||||||
|
|
||||||
/* if I am an application process, indicate that I am
|
/* if I am an application process, indicate that I am
|
||||||
* truly finalizing prior to departure
|
* truly finalizing prior to departure
|
||||||
@ -123,14 +119,7 @@ static int finalize(void)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i=0; i < jobfams.size; i++) {
|
|
||||||
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
|
||||||
OBJ_RELEASE(jfam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&jobfams);
|
|
||||||
|
|
||||||
/* destruct the global condition and lock */
|
/* destruct the global condition and lock */
|
||||||
OBJ_DESTRUCT(&cond);
|
OBJ_DESTRUCT(&cond);
|
||||||
OBJ_DESTRUCT(&lock);
|
OBJ_DESTRUCT(&lock);
|
||||||
@ -188,8 +177,8 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
|
|
||||||
/* see if this job family is present */
|
/* see if this job family is present */
|
||||||
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -197,7 +186,7 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
"%s routed_binomial: deleting route to %s",
|
"%s routed_binomial: deleting route to %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
||||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||||
OBJ_RELEASE(jfam);
|
OBJ_RELEASE(jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -268,8 +257,8 @@ static int update_route(orte_process_name_t *target,
|
|||||||
|
|
||||||
/* see if this target is already present */
|
/* see if this target is already present */
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -293,7 +282,7 @@ static int update_route(orte_process_name_t *target,
|
|||||||
jfam->job_family = jfamily;
|
jfam->job_family = jfamily;
|
||||||
jfam->route.jobid = route->jobid;
|
jfam->route.jobid = route->jobid;
|
||||||
jfam->route.vpid = route->vpid;
|
jfam->route.vpid = route->vpid;
|
||||||
opal_pointer_array_add(&jobfams, jfam);
|
opal_pointer_array_add(&orte_routed_jobfams, jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -360,8 +349,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
|||||||
* this job family, so look it up
|
* this job family, so look it up
|
||||||
*/
|
*/
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -590,12 +579,9 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
* the server - we need to pass the routing info to our HNP
|
* the server - we need to pass the routing info to our HNP
|
||||||
*/
|
*/
|
||||||
if (NULL != ndat) {
|
if (NULL != ndat) {
|
||||||
int rc, n;
|
int rc;
|
||||||
opal_buffer_t xfer;
|
opal_buffer_t xfer;
|
||||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||||
ptrdiff_t unpack_rel;
|
|
||||||
bool found;
|
|
||||||
char *uri, *hnps;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||||
"%s routed_binomial: init routes w/non-NULL data",
|
"%s routed_binomial: init routes w/non-NULL data",
|
||||||
@ -627,40 +613,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
opal_dss.copy_payload(&xfer, ndat);
|
opal_dss.copy_payload(&xfer, ndat);
|
||||||
|
|
||||||
/* save any new connections for use in subsequent connect_accept calls */
|
/* save any new connections for use in subsequent connect_accept calls */
|
||||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
orte_routed_base_update_hnps(ndat);
|
||||||
found = false;
|
|
||||||
n = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
|
||||||
/* check if we already have the incoming one */
|
|
||||||
if (0 == strcmp(uri, hnps)) {
|
|
||||||
found = true;
|
|
||||||
free(hnps);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
free(hnps);
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
|
||||||
}
|
|
||||||
free(uri);
|
|
||||||
found = false;
|
|
||||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
|
||||||
opal_buffer_t dng;
|
|
||||||
char *dmn;
|
|
||||||
int grr;
|
|
||||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
|
||||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
|
||||||
grr = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
|
||||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
|
||||||
free(dmn);
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&dng);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||||
@ -785,8 +738,8 @@ static int route_lost(const orte_process_name_t *route)
|
|||||||
if ((ORTE_JOB_FAMILY(route->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) &&
|
if ((ORTE_JOB_FAMILY(route->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) &&
|
||||||
ORTE_PROC_IS_HNP) {
|
ORTE_PROC_IS_HNP) {
|
||||||
jfamily = ORTE_JOB_FAMILY(route->jobid);
|
jfamily = ORTE_JOB_FAMILY(route->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -794,7 +747,7 @@ static int route_lost(const orte_process_name_t *route)
|
|||||||
"%s routed_binomial: route to %s lost",
|
"%s routed_binomial: route to %s lost",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_JOB_FAMILY_PRINT(route->jobid)));
|
ORTE_JOB_FAMILY_PRINT(route->jobid)));
|
||||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||||
OBJ_RELEASE(jfam);
|
OBJ_RELEASE(jfam);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -847,8 +800,8 @@ static bool route_is_defined(const orte_process_name_t *target)
|
|||||||
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
if (ORTE_JOB_FAMILY(target->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
|
||||||
if (ORTE_PROC_IS_HNP) {
|
if (ORTE_PROC_IS_HNP) {
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -1027,7 +980,9 @@ static orte_vpid_t get_routing_tree(opal_list_t *children)
|
|||||||
static int get_wireup_info(opal_buffer_t *buf)
|
static int get_wireup_info(opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
int i;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
if (ORTE_PROC_IS_HNP) {
|
if (ORTE_PROC_IS_HNP) {
|
||||||
/* if we are not using static ports, then we need to share the
|
/* if we are not using static ports, then we need to share the
|
||||||
* comm info - otherwise, just return
|
* comm info - otherwise, just return
|
||||||
@ -1047,10 +1002,12 @@ static int get_wireup_info(opal_buffer_t *buf)
|
|||||||
* know about, if any
|
* know about, if any
|
||||||
*/
|
*/
|
||||||
if (ORTE_PROC_IS_APP) {
|
if (ORTE_PROC_IS_APP) {
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
ORTE_ERROR_LOG(rc);
|
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
|
opal_dss.pack(buf, &(jfam->hnp_uri), 1, OPAL_STRING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -79,7 +79,6 @@ orte_routed_module_t orte_routed_cm_module = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* local globals */
|
/* local globals */
|
||||||
static opal_pointer_array_t jobfams;
|
|
||||||
static opal_condition_t cond;
|
static opal_condition_t cond;
|
||||||
static opal_mutex_t lock;
|
static opal_mutex_t lock;
|
||||||
static orte_process_name_t *lifeline=NULL;
|
static orte_process_name_t *lifeline=NULL;
|
||||||
@ -89,9 +88,6 @@ static bool ack_recvd;
|
|||||||
|
|
||||||
static int init(void)
|
static int init(void)
|
||||||
{
|
{
|
||||||
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
|
|
||||||
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
|
|
||||||
|
|
||||||
/* setup the global condition and lock */
|
/* setup the global condition and lock */
|
||||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||||
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
||||||
@ -103,8 +99,7 @@ static int init(void)
|
|||||||
|
|
||||||
static int finalize(void)
|
static int finalize(void)
|
||||||
{
|
{
|
||||||
int rc, i;
|
int rc;
|
||||||
orte_routed_jobfam_t *jfam;
|
|
||||||
|
|
||||||
/* if I am a tool without a daemon, just cleanout
|
/* if I am a tool without a daemon, just cleanout
|
||||||
* the basics and leave
|
* the basics and leave
|
||||||
@ -124,13 +119,6 @@ static int finalize(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
for (i=0; i < jobfams.size; i++) {
|
|
||||||
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
|
||||||
OBJ_RELEASE(jfam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&jobfams);
|
|
||||||
|
|
||||||
/* destruct the global condition and lock */
|
/* destruct the global condition and lock */
|
||||||
OBJ_DESTRUCT(&cond);
|
OBJ_DESTRUCT(&cond);
|
||||||
OBJ_DESTRUCT(&lock);
|
OBJ_DESTRUCT(&lock);
|
||||||
@ -171,8 +159,8 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
|
|
||||||
/* see if this job family is present */
|
/* see if this job family is present */
|
||||||
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -180,7 +168,7 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
"%s routed_binomial: deleting route to %s",
|
"%s routed_binomial: deleting route to %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
||||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||||
OBJ_RELEASE(jfam);
|
OBJ_RELEASE(jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -252,8 +240,8 @@ static int update_route(orte_process_name_t *target,
|
|||||||
|
|
||||||
/* see if this target is already present */
|
/* see if this target is already present */
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -277,7 +265,7 @@ static int update_route(orte_process_name_t *target,
|
|||||||
jfam->job_family = jfamily;
|
jfam->job_family = jfamily;
|
||||||
jfam->route.jobid = route->jobid;
|
jfam->route.jobid = route->jobid;
|
||||||
jfam->route.vpid = route->vpid;
|
jfam->route.vpid = route->vpid;
|
||||||
opal_pointer_array_add(&jobfams, jfam);
|
opal_pointer_array_add(&orte_routed_jobfams, jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -339,8 +327,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
|||||||
* this job family, so look it up
|
* this job family, so look it up
|
||||||
*/
|
*/
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -617,12 +605,9 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
* the server - we need to pass the routing info to our HNP
|
* the server - we need to pass the routing info to our HNP
|
||||||
*/
|
*/
|
||||||
if (NULL != ndat) {
|
if (NULL != ndat) {
|
||||||
int rc, n;
|
int rc;
|
||||||
opal_buffer_t xfer;
|
opal_buffer_t xfer;
|
||||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||||
ptrdiff_t unpack_rel;
|
|
||||||
bool found;
|
|
||||||
char *uri, *hnps;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||||
"%s routed_cm: init routes w/non-NULL data",
|
"%s routed_cm: init routes w/non-NULL data",
|
||||||
@ -653,40 +638,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
opal_dss.copy_payload(&xfer, ndat);
|
opal_dss.copy_payload(&xfer, ndat);
|
||||||
|
|
||||||
/* save any new connections for use in subsequent connect_accept calls */
|
/* save any new connections for use in subsequent connect_accept calls */
|
||||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
orte_routed_base_update_hnps(ndat);
|
||||||
found = false;
|
|
||||||
n = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
|
||||||
/* check if we already have the incoming one */
|
|
||||||
if (0 == strcmp(uri, hnps)) {
|
|
||||||
found = true;
|
|
||||||
free(hnps);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
free(hnps);
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
|
||||||
}
|
|
||||||
free(uri);
|
|
||||||
found = false;
|
|
||||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
|
||||||
opal_buffer_t dng;
|
|
||||||
char *dmn;
|
|
||||||
int grr;
|
|
||||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
|
||||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
|
||||||
grr = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
|
||||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
|
||||||
free(dmn);
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&dng);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||||
@ -921,7 +873,9 @@ static orte_vpid_t get_routing_tree(opal_list_t *children)
|
|||||||
static int get_wireup_info(opal_buffer_t *buf)
|
static int get_wireup_info(opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
int i;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
if (ORTE_PROC_IS_HNP) {
|
if (ORTE_PROC_IS_HNP) {
|
||||||
/* if we are not using static ports, then we need to share the
|
/* if we are not using static ports, then we need to share the
|
||||||
* comm info - otherwise, just return
|
* comm info - otherwise, just return
|
||||||
@ -941,10 +895,12 @@ static int get_wireup_info(opal_buffer_t *buf)
|
|||||||
* know about, if any
|
* know about, if any
|
||||||
*/
|
*/
|
||||||
if (ORTE_PROC_IS_APP) {
|
if (ORTE_PROC_IS_APP) {
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
ORTE_ERROR_LOG(rc);
|
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
|
opal_dss.pack(buf, &(jfam->hnp_uri), 1, OPAL_STRING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -75,7 +75,6 @@ orte_routed_module_t orte_routed_linear_module = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* local globals */
|
/* local globals */
|
||||||
static opal_pointer_array_t jobfams;
|
|
||||||
static opal_condition_t cond;
|
static opal_condition_t cond;
|
||||||
static opal_mutex_t lock;
|
static opal_mutex_t lock;
|
||||||
static orte_process_name_t *lifeline=NULL;
|
static orte_process_name_t *lifeline=NULL;
|
||||||
@ -86,9 +85,6 @@ static bool ack_recvd;
|
|||||||
|
|
||||||
static int init(void)
|
static int init(void)
|
||||||
{
|
{
|
||||||
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
|
|
||||||
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
|
|
||||||
|
|
||||||
/* setup the global condition and lock */
|
/* setup the global condition and lock */
|
||||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||||
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
||||||
@ -100,8 +96,7 @@ static int init(void)
|
|||||||
|
|
||||||
static int finalize(void)
|
static int finalize(void)
|
||||||
{
|
{
|
||||||
int rc, i;
|
int rc;
|
||||||
orte_routed_jobfam_t *jfam;
|
|
||||||
|
|
||||||
/* if I am an application process, indicate that I am
|
/* if I am an application process, indicate that I am
|
||||||
* truly finalizing prior to departure
|
* truly finalizing prior to departure
|
||||||
@ -115,13 +110,6 @@ static int finalize(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i=0; i < jobfams.size; i++) {
|
|
||||||
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
|
||||||
OBJ_RELEASE(jfam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&jobfams);
|
|
||||||
|
|
||||||
/* destruct the global condition and lock */
|
/* destruct the global condition and lock */
|
||||||
OBJ_DESTRUCT(&cond);
|
OBJ_DESTRUCT(&cond);
|
||||||
OBJ_DESTRUCT(&lock);
|
OBJ_DESTRUCT(&lock);
|
||||||
@ -172,8 +160,8 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
|
|
||||||
/* see if this job family is present */
|
/* see if this job family is present */
|
||||||
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -181,7 +169,7 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
"%s routed_binomial: deleting route to %s",
|
"%s routed_binomial: deleting route to %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
||||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||||
OBJ_RELEASE(jfam);
|
OBJ_RELEASE(jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -252,8 +240,8 @@ static int update_route(orte_process_name_t *target,
|
|||||||
|
|
||||||
/* see if this target is already present */
|
/* see if this target is already present */
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -277,7 +265,7 @@ static int update_route(orte_process_name_t *target,
|
|||||||
jfam->job_family = jfamily;
|
jfam->job_family = jfamily;
|
||||||
jfam->route.jobid = route->jobid;
|
jfam->route.jobid = route->jobid;
|
||||||
jfam->route.vpid = route->vpid;
|
jfam->route.vpid = route->vpid;
|
||||||
opal_pointer_array_add(&jobfams, jfam);
|
opal_pointer_array_add(&orte_routed_jobfams, jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -330,8 +318,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
|||||||
* this job family, so look it up
|
* this job family, so look it up
|
||||||
*/
|
*/
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -547,12 +535,9 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
* the server - we need to pass the routing info to our HNP
|
* the server - we need to pass the routing info to our HNP
|
||||||
*/
|
*/
|
||||||
if (NULL != ndat) {
|
if (NULL != ndat) {
|
||||||
int rc, n;
|
int rc;
|
||||||
opal_buffer_t xfer;
|
opal_buffer_t xfer;
|
||||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||||
ptrdiff_t unpack_rel;
|
|
||||||
bool found;
|
|
||||||
char *uri, *hnps;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||||
"%s routed_linear: init routes w/non-NULL data",
|
"%s routed_linear: init routes w/non-NULL data",
|
||||||
@ -583,40 +568,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
opal_dss.copy_payload(&xfer, ndat);
|
opal_dss.copy_payload(&xfer, ndat);
|
||||||
|
|
||||||
/* save any new connections for use in subsequent connect_accept calls */
|
/* save any new connections for use in subsequent connect_accept calls */
|
||||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
orte_routed_base_update_hnps(ndat);
|
||||||
found = false;
|
|
||||||
n = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
|
||||||
/* check if we already have the incoming one */
|
|
||||||
if (0 == strcmp(uri, hnps)) {
|
|
||||||
found = true;
|
|
||||||
free(hnps);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
free(hnps);
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
|
||||||
}
|
|
||||||
free(uri);
|
|
||||||
found = false;
|
|
||||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
|
||||||
opal_buffer_t dng;
|
|
||||||
char *dmn;
|
|
||||||
int grr;
|
|
||||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
|
||||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
|
||||||
grr = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
|
||||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
|
||||||
free(dmn);
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&dng);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||||
@ -823,7 +775,9 @@ static orte_vpid_t get_routing_tree(opal_list_t *children)
|
|||||||
static int get_wireup_info(opal_buffer_t *buf)
|
static int get_wireup_info(opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
int i;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
if (ORTE_PROC_IS_HNP) {
|
if (ORTE_PROC_IS_HNP) {
|
||||||
/* if we are not using static ports, then we need to share the
|
/* if we are not using static ports, then we need to share the
|
||||||
* comm info - otherwise, just return
|
* comm info - otherwise, just return
|
||||||
@ -843,10 +797,12 @@ static int get_wireup_info(opal_buffer_t *buf)
|
|||||||
* know about, if any
|
* know about, if any
|
||||||
*/
|
*/
|
||||||
if (ORTE_PROC_IS_APP) {
|
if (ORTE_PROC_IS_APP) {
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
ORTE_ERROR_LOG(rc);
|
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
|
opal_dss.pack(buf, &(jfam->hnp_uri), 1, OPAL_STRING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -76,7 +76,6 @@ orte_routed_module_t orte_routed_radix_module = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* local globals */
|
/* local globals */
|
||||||
static opal_pointer_array_t jobfams;
|
|
||||||
static opal_condition_t cond;
|
static opal_condition_t cond;
|
||||||
static opal_mutex_t lock;
|
static opal_mutex_t lock;
|
||||||
static orte_process_name_t *lifeline=NULL;
|
static orte_process_name_t *lifeline=NULL;
|
||||||
@ -89,9 +88,6 @@ static bool ack_recvd;
|
|||||||
|
|
||||||
static int init(void)
|
static int init(void)
|
||||||
{
|
{
|
||||||
OBJ_CONSTRUCT(&jobfams, opal_pointer_array_t);
|
|
||||||
opal_pointer_array_init(&jobfams, 16, UINT16_MAX, 32);
|
|
||||||
|
|
||||||
/* setup the global condition and lock */
|
/* setup the global condition and lock */
|
||||||
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
OBJ_CONSTRUCT(&cond, opal_condition_t);
|
||||||
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
OBJ_CONSTRUCT(&lock, opal_mutex_t);
|
||||||
@ -108,9 +104,8 @@ static int init(void)
|
|||||||
|
|
||||||
static int finalize(void)
|
static int finalize(void)
|
||||||
{
|
{
|
||||||
int rc, i;
|
int rc;
|
||||||
opal_list_item_t *item;
|
opal_list_item_t *item;
|
||||||
orte_routed_jobfam_t *jfam;
|
|
||||||
|
|
||||||
/* if I am an application process, indicate that I am
|
/* if I am an application process, indicate that I am
|
||||||
* truly finalizing prior to departure
|
* truly finalizing prior to departure
|
||||||
@ -124,13 +119,6 @@ static int finalize(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i=0; i < jobfams.size; i++) {
|
|
||||||
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
|
||||||
OBJ_RELEASE(jfam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&jobfams);
|
|
||||||
|
|
||||||
/* destruct the global condition and lock */
|
/* destruct the global condition and lock */
|
||||||
OBJ_DESTRUCT(&cond);
|
OBJ_DESTRUCT(&cond);
|
||||||
OBJ_DESTRUCT(&lock);
|
OBJ_DESTRUCT(&lock);
|
||||||
@ -188,8 +176,8 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
|
|
||||||
/* see if this job family is present */
|
/* see if this job family is present */
|
||||||
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
jfamily = ORTE_JOB_FAMILY(proc->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -197,7 +185,7 @@ static int delete_route(orte_process_name_t *proc)
|
|||||||
"%s routed_binomial: deleting route to %s",
|
"%s routed_binomial: deleting route to %s",
|
||||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||||
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
ORTE_JOB_FAMILY_PRINT(proc->jobid)));
|
||||||
opal_pointer_array_set_item(&jobfams, i, NULL);
|
opal_pointer_array_set_item(&orte_routed_jobfams, i, NULL);
|
||||||
OBJ_RELEASE(jfam);
|
OBJ_RELEASE(jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -268,8 +256,8 @@ static int update_route(orte_process_name_t *target,
|
|||||||
|
|
||||||
/* see if this target is already present */
|
/* see if this target is already present */
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -293,7 +281,7 @@ static int update_route(orte_process_name_t *target,
|
|||||||
jfam->job_family = jfamily;
|
jfam->job_family = jfamily;
|
||||||
jfam->route.jobid = route->jobid;
|
jfam->route.jobid = route->jobid;
|
||||||
jfam->route.vpid = route->vpid;
|
jfam->route.vpid = route->vpid;
|
||||||
opal_pointer_array_add(&jobfams, jfam);
|
opal_pointer_array_add(&orte_routed_jobfams, jfam);
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -354,8 +342,8 @@ static orte_process_name_t get_route(orte_process_name_t *target)
|
|||||||
* this job family, so look it up
|
* this job family, so look it up
|
||||||
*/
|
*/
|
||||||
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
jfamily = ORTE_JOB_FAMILY(target->jobid);
|
||||||
for (i=0; i < jobfams.size; i++) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&jobfams, i))) {
|
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (jfam->job_family == jfamily) {
|
if (jfam->job_family == jfamily) {
|
||||||
@ -579,12 +567,9 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
* the server - we need to pass the routing info to our HNP
|
* the server - we need to pass the routing info to our HNP
|
||||||
*/
|
*/
|
||||||
if (NULL != ndat) {
|
if (NULL != ndat) {
|
||||||
int rc, n;
|
int rc;
|
||||||
opal_buffer_t xfer;
|
opal_buffer_t xfer;
|
||||||
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
orte_rml_cmd_flag_t cmd=ORTE_RML_UPDATE_CMD;
|
||||||
ptrdiff_t unpack_rel;
|
|
||||||
bool found;
|
|
||||||
char *uri, *hnps;
|
|
||||||
|
|
||||||
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_output,
|
||||||
"%s routed_radix: init routes w/non-NULL data",
|
"%s routed_radix: init routes w/non-NULL data",
|
||||||
@ -615,40 +600,7 @@ static int init_routes(orte_jobid_t job, opal_buffer_t *ndat)
|
|||||||
opal_dss.copy_payload(&xfer, ndat);
|
opal_dss.copy_payload(&xfer, ndat);
|
||||||
|
|
||||||
/* save any new connections for use in subsequent connect_accept calls */
|
/* save any new connections for use in subsequent connect_accept calls */
|
||||||
unpack_rel = orte_remote_hnps.unpack_ptr - orte_remote_hnps.base_ptr;
|
orte_routed_base_update_hnps(ndat);
|
||||||
found = false;
|
|
||||||
n = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(ndat, &uri, &n, OPAL_STRING)) {
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&orte_remote_hnps, &hnps, &n, OPAL_STRING)) {
|
|
||||||
/* check if we already have the incoming one */
|
|
||||||
if (0 == strcmp(uri, hnps)) {
|
|
||||||
found = true;
|
|
||||||
free(hnps);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
free(hnps);
|
|
||||||
}
|
|
||||||
if (!found) {
|
|
||||||
opal_dss.pack(&orte_remote_hnps, &uri, 1, OPAL_STRING);
|
|
||||||
}
|
|
||||||
free(uri);
|
|
||||||
found = false;
|
|
||||||
orte_remote_hnps.unpack_ptr = orte_remote_hnps.base_ptr + unpack_rel;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (9 < opal_output_get_verbosity(orte_routed_base_output)) {
|
|
||||||
opal_buffer_t dng;
|
|
||||||
char *dmn;
|
|
||||||
int grr;
|
|
||||||
OBJ_CONSTRUCT(&dng, opal_buffer_t);
|
|
||||||
opal_dss.copy_payload(&dng, &orte_remote_hnps);
|
|
||||||
grr = 1;
|
|
||||||
while (ORTE_SUCCESS == opal_dss.unpack(&dng, &dmn, &grr, OPAL_STRING)) {
|
|
||||||
opal_output(0, "%s REMOTE: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), dmn);
|
|
||||||
free(dmn);
|
|
||||||
}
|
|
||||||
OBJ_DESTRUCT(&dng);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &xfer,
|
||||||
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
ORTE_RML_TAG_RML_INFO_UPDATE, 0))) {
|
||||||
@ -962,7 +914,9 @@ static orte_vpid_t get_routing_tree(opal_list_t *children)
|
|||||||
static int get_wireup_info(opal_buffer_t *buf)
|
static int get_wireup_info(opal_buffer_t *buf)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
int i;
|
||||||
|
orte_routed_jobfam_t *jfam;
|
||||||
|
|
||||||
if (ORTE_PROC_IS_HNP) {
|
if (ORTE_PROC_IS_HNP) {
|
||||||
/* if we are not using static ports, then we need to share the
|
/* if we are not using static ports, then we need to share the
|
||||||
* comm info - otherwise, just return
|
* comm info - otherwise, just return
|
||||||
@ -982,10 +936,12 @@ static int get_wireup_info(opal_buffer_t *buf)
|
|||||||
* know about, if any
|
* know about, if any
|
||||||
*/
|
*/
|
||||||
if (ORTE_PROC_IS_APP) {
|
if (ORTE_PROC_IS_APP) {
|
||||||
if (ORTE_SUCCESS != (rc = opal_dss.copy_payload(buf, &orte_remote_hnps))) {
|
for (i=0; i < orte_routed_jobfams.size; i++) {
|
||||||
ORTE_ERROR_LOG(rc);
|
if (NULL != (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
|
||||||
|
opal_dss.pack(buf, &(jfam->hnp_uri), 1, OPAL_STRING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
return ORTE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ORTE_SUCCESS;
|
return ORTE_SUCCESS;
|
||||||
|
@ -43,6 +43,7 @@ typedef struct {
|
|||||||
opal_object_t super;
|
opal_object_t super;
|
||||||
uint16_t job_family;
|
uint16_t job_family;
|
||||||
orte_process_name_t route;
|
orte_process_name_t route;
|
||||||
|
char *hnp_uri;
|
||||||
} orte_routed_jobfam_t;
|
} orte_routed_jobfam_t;
|
||||||
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_jobfam_t);
|
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_routed_jobfam_t);
|
||||||
|
|
||||||
|
@ -185,9 +185,6 @@ orte_default_comm_fn_t orte_comm;
|
|||||||
bool orte_report_child_jobs_separately;
|
bool orte_report_child_jobs_separately;
|
||||||
struct timeval orte_child_time_to_exit;
|
struct timeval orte_child_time_to_exit;
|
||||||
|
|
||||||
/* record uri's of remote hnps */
|
|
||||||
opal_buffer_t orte_remote_hnps;
|
|
||||||
|
|
||||||
#endif /* !ORTE_DISABLE_FULL_RTE */
|
#endif /* !ORTE_DISABLE_FULL_RTE */
|
||||||
|
|
||||||
int orte_debug_output = -1;
|
int orte_debug_output = -1;
|
||||||
|
@ -708,9 +708,6 @@ ORTE_DECLSPEC int orte_global_comm(orte_process_name_t *recipient,
|
|||||||
ORTE_DECLSPEC extern bool orte_report_child_jobs_separately;
|
ORTE_DECLSPEC extern bool orte_report_child_jobs_separately;
|
||||||
ORTE_DECLSPEC extern struct timeval orte_child_time_to_exit;
|
ORTE_DECLSPEC extern struct timeval orte_child_time_to_exit;
|
||||||
|
|
||||||
/* record uri's of remote hnps */
|
|
||||||
ORTE_DECLSPEC extern opal_buffer_t orte_remote_hnps;
|
|
||||||
|
|
||||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||||
|
|
||||||
END_C_DECLS
|
END_C_DECLS
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user