1
1

restore subscribe for connect/accept

This commit was SVN r5280.
Этот коммит содержится в:
Tim Woodall 2005-04-12 21:25:51 +00:00
родитель 31768b36de
Коммит f2acb6d66f

Просмотреть файл

@ -553,68 +553,119 @@ void mca_oob_tcp_registry_callback(
int mca_oob_tcp_resolve(mca_oob_tcp_peer_t* peer)
{
mca_oob_tcp_addr_t* addr;
mca_oob_tcp_subscription_t* subscription;
ompi_list_item_t* item;
char *segment, *jobid;
char *keys[2];
int rc;
mca_oob_tcp_addr_t* addr;
mca_oob_tcp_subscription_t* subscription;
orte_gpr_value_t trig, *trigs;
orte_gpr_subscription_t sub, *subs;
ompi_list_item_t* item;
int rc;
/* if the address is already cached - simply return it */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
addr = (mca_oob_tcp_addr_t *)ompi_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names,
/* if the address is already cached - simply return it */
OMPI_THREAD_LOCK(&mca_oob_tcp_component.tcp_lock);
addr = (mca_oob_tcp_addr_t *)ompi_hash_table_get_proc(&mca_oob_tcp_component.tcp_peer_names,
&peer->peer_name);
if(NULL != addr) {
if(NULL != addr) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
mca_oob_tcp_peer_resolved(peer, addr);
return OMPI_SUCCESS;
}
}
/* check to see if we have subscribed to this registry segment */
for( item = ompi_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);
item = ompi_list_get_next(item)) {
subscription = (mca_oob_tcp_subscription_t*)item;
if(subscription->jobid == peer->peer_name.jobid) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return OMPI_SUCCESS;
}
}
/* check to see if we have subscribed to this registry segment */
for( item = ompi_list_get_first(&mca_oob_tcp_component.tcp_subscriptions);
item != ompi_list_get_end(&mca_oob_tcp_component.tcp_subscriptions);
item = ompi_list_get_next(item)) {
subscription = (mca_oob_tcp_subscription_t*)item;
if(subscription->jobid == peer->peer_name.jobid) {
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return OMPI_SUCCESS;
}
}
/* otherwise - need to subscribe to this registry segment
* record the subscription
*/
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
subscription->jobid = peer->peer_name.jobid;
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
if (ORTE_SUCCESS != (rc = orte_ns.get_jobid_string(&jobid, &peer->peer_name))) {
OBJ_CONSTRUCT(&sub, orte_gpr_subscription_t);
sub.addr_mode = ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(sub.segment), peer->peer_name.jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
asprintf(&segment, "%s-%s", ORTE_JOB_SEGMENT, jobid);
keys[0] = "oob-tcp";
keys[1] = NULL;
}
sub.num_tokens= 0;
sub.tokens = NULL;
sub.num_keys = 1;
sub.keys = (char**)malloc(sizeof(char*));
if(NULL == sub.keys) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
sub.keys[0] = strdup("oob-tcp");
if (NULL == sub.keys[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
sub.cbfunc = mca_oob_tcp_registry_callback;
sub.user_tag = NULL;
#if 0
rc = orte_gpr.subscribe(
ORTE_GPR_OR,
ORTE_GPR_NOTIFY_ON_STARTUP|ORTE_GPR_NOTIFY_INCLUDE_STARTUP_DATA|
ORTE_GPR_NOTIFY_ON_SHUTDOWN|ORTE_GPR_NOTIFY_INCLUDE_SHUTDOWN_DATA|
ORTE_GPR_NOTIFY_PRE_EXISTING,
segment,
NULL,
keys,
&subscription->subid,
mca_oob_tcp_registry_callback,
NULL);
/* setup the trigger value */
OBJ_CONSTRUCT(&trig, orte_gpr_value_t);
trig.addr_mode = ORTE_GPR_TOKENS_XAND | ORTE_GPR_KEYS_OR;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(trig.segment), peer->peer_name.jobid))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&trig);
return rc;
}
trig.num_tokens = 1;
trig.tokens = (char**)malloc(sizeof(char*));
if (NULL == trig.tokens) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.tokens[0] = strdup(ORTE_JOB_GLOBALS);
if (NULL == trig.tokens[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.keyvals = (orte_gpr_keyval_t**)malloc(2*sizeof(orte_gpr_keyval_t*));
if(NULL == trig.keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.cnt = 2;
trig.keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if(NULL == trig.keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.keyvals[0]->key = strdup(ORTE_JOB_SLOTS_KEY);
trig.keyvals[0]->type = ORTE_NULL;
trig.keyvals[1] = OBJ_NEW(orte_gpr_keyval_t);
if(NULL == trig.keyvals[1]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
trig.keyvals[1]->key = strdup(ORTE_PROC_NUM_AT_STG1);
trig.keyvals[1]->type = ORTE_NULL;
trigs = &trig;
subs = ⊂
subscription = OBJ_NEW(mca_oob_tcp_subscription_t);
subscription->jobid = peer->peer_name.jobid;
rc = orte_gpr.subscribe(
ORTE_GPR_NOTIFY_ADD_ENTRY | ORTE_GPR_NOTIFY_VALUE_CHG |
ORTE_GPR_TRIG_CMP_LEVELS | ORTE_GPR_TRIG_ONE_SHOT,
1, &subs,
1, &trigs,
&subscription->subid);
if(rc != OMPI_SUCCESS) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&sub);
OBJ_DESTRUCT(&trig);
return rc;
}
OBJ_DESTRUCT(&sub);
OBJ_DESTRUCT(&trig); /* done with these */
free(segment);
free(jobid);
return rc;
#else
return ORTE_SUCCESS;
#endif
ompi_list_append(&mca_oob_tcp_component.tcp_subscriptions, &subscription->item);
OMPI_THREAD_UNLOCK(&mca_oob_tcp_component.tcp_lock);
return rc;
}