1
1
This commit was SVN r5793.
Этот коммит содержится в:
Thara Angskun 2005-05-20 09:18:28 +00:00
родитель bb3009e6f3
Коммит 42d6434337

Просмотреть файл

@ -42,6 +42,7 @@
extern char **environ;
orte_jobid_t global_jobid;
/*
@ -52,7 +53,6 @@ static int pls_poe_terminate_job(orte_jobid_t jobid);
static int pls_poe_terminate_proc(const orte_process_name_t *name);
static int pls_poe_finalize(void);
orte_pls_base_module_1_0_0_t orte_pls_poe_module = {
pls_poe_launch,
pls_poe_terminate_job,
@ -262,9 +262,6 @@ int pls_poe_launch_interactive_orted(orte_jobid_t jobid)
ompi_output(0, "orte_pls_poe: execv failed with errno=%d\n", errno);
exit(-1);
} else {
/*
waitpid(pid,&status,0);
*/
}
cleanup:
@ -281,54 +278,134 @@ cleanup:
int orte_pls_poe_wait_proc(pid_t pid, int status, void* cbdata)
{
int i;
orte_proc_state_t state;
orte_gpr_value_t *value;
int rc;
size_t i;
orte_rmaps_base_proc_t* proc;
ompi_list_item_t* item;
ompi_list_t *map = (ompi_list_t *)cbdata;
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s:--- BEGIN ---\n", __FUNCTION__);
ompi_output(0, "---- %s BEGIN -----\n", __FUNCTION__);
value = OBJ_NEW(orte_gpr_value_t);
if (NULL == value) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for(item = ompi_list_get_first(map);
item != ompi_list_get_end(map);
item = ompi_list_get_next(item)) {
orte_rmaps_base_map_t* map2 = (orte_rmaps_base_map_t*)item;
size_t i;
for(i=0; i<map2->num_procs; i++) {
proc=map2->procs[i];
if(WIFEXITED(status)) {
rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_TERMINATED, status);
} else {
rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_ABORTED, status);
}
value->addr_mode = ORTE_GPR_OVERWRITE | ORTE_GPR_TOKENS_AND;
if(WIFEXITED(status)) {
state=ORTE_PROC_STATE_TERMINATED;
} else {
state=ORTE_PROC_STATE_ABORTED;
}
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(value->segment), global_jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
value->tokens = NULL; /* set everyone on job segment to this status */
value->cnt = 2;
value->keyvals = (orte_gpr_keyval_t**)malloc(2 * sizeof(orte_gpr_keyval_t*));
if (NULL == value->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value->keyvals[0] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value->keyvals[0]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(value->keyvals[0])->key = strdup(ORTE_PROC_STATE_KEY);
(value->keyvals[0])->type = ORTE_PROC_STATE;
(value->keyvals[0])->value.proc_state = state;
value->keyvals[1] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value->keyvals[1]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(value->keyvals[1])->key = strdup(ORTE_PROC_EXIT_CODE_KEY);
(value->keyvals[1])->type = ORTE_EXIT_CODE;
(value->keyvals[1])->value.exit_code = status;
if (ORTE_SUCCESS != (rc = orte_gpr.put(1, &value))) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(value);
/* check to see if we need to increment orte-standard counters */
/* first, cleanup value so it can be used for that purpose */
value = OBJ_NEW(orte_gpr_value_t);
if (NULL == value) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
value->addr_mode = ORTE_GPR_KEYS_OR|ORTE_GPR_TOKENS_AND;
if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&(value->segment), global_jobid))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(NULL == (value->tokens = (char**)malloc(sizeof(char*)))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
value->tokens[0] = strdup(ORTE_JOB_GLOBALS);
value->num_tokens = 1;
/* If we're setting ABORTED, we're also setting TERMINATED, so we
need 2 keyvals. Everything else only needs 1 keyval. */
value->cnt = 1;
if (ORTE_PROC_STATE_ABORTED == state) {
++value->cnt;
}
value->keyvals = (orte_gpr_keyval_t**)malloc(sizeof(orte_gpr_keyval_t*) *
value->cnt);
if (NULL == value->keyvals) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
for (i = 0; i < value->cnt; ++i) {
value->keyvals[i] = OBJ_NEW(orte_gpr_keyval_t);
if (NULL == value->keyvals[i]) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(value);
return ORTE_ERR_OUT_OF_RESOURCE;
}
(value->keyvals[i])->type = ORTE_NULL;
}
/* see which state we are in - let that determine the counter, if any */
switch (state) {
case ORTE_PROC_STATE_TERMINATED:
(value->keyvals[0])->key = strdup(ORTE_PROC_NUM_TERMINATED);
break;
case ORTE_PROC_STATE_ABORTED:
(value->keyvals[0])->key = strdup(ORTE_PROC_NUM_ABORTED);
(value->keyvals[1])->key = strdup(ORTE_PROC_NUM_TERMINATED);
break;
}
if (NULL != (value->keyvals[0])->key) { /* need to increment a counter */
if (ORTE_SUCCESS != (rc = orte_gpr.increment_value(value))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(value);
return rc;
}
}
ompi_output(0, "---- %s END-----\n", __FUNCTION__);
OBJ_RELEASE(value);
if (mca_pls_poe_component.verbose>10) ompi_output(0, "%s: --- END rc(%d) ---\n", __FUNCTION__, rc);
#ifdef XXX
orte_rmaps_base_proc_t* proc = (orte_rmaps_base_proc_t*)cbdata;
int rc;
/* Clean up the session directory as if we were the process
itself. This covers the case where the process died abnormally
and didn't cleanup its own session directory. */
orte_session_dir_finalize(&proc->proc_name);
orte_iof.iof_flush();
/* set the state of this process */
if(WIFEXITED(status)) {
rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_TERMINATED, status);
} else {
rc = orte_soh.set_proc_soh(&proc->proc_name, ORTE_PROC_STATE_ABORTED, status);
}
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
}
OBJ_RELEASE(proc);
#endif
return rc;
}
@ -347,6 +424,8 @@ static int orte_pls_poe_launch_create_cmd_file(
char* uri;
char **environ_copy;
if (mca_pls_poe_component.verbose > 10) ompi_output(0, "%s:--- BEGIN ---\n", __FUNCTION__);
/* setup base environment */
environ_copy = NULL;
param = mca_base_param_environ_variable("rmgr","bootproxy","jobid");
@ -405,7 +484,9 @@ static int orte_pls_poe_launch_create_cmd_file(
while(context->argv[i]!=NULL) {
fprintf(cfp," %s",context->argv[i++]);
}
fprintf(cfp,"\n"); /* POE will upset if you don't have end line. */
fprintf(cfp,"\n"); /* POE will upset if the file doesn't contain end of line. */
if (mca_pls_poe_component.verbose>10) ompi_output(0, "%s: --- END ---\n", __FUNCTION__);
return ORTE_SUCCESS;
}
@ -431,6 +512,8 @@ int orte_pls_poe_launch_interactive(orte_jobid_t jobid)
if((mca_pls_poe_component.cmdfile=tempnam(NULL,NULL))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
if((cfp=fopen(mca_pls_poe_component.cmdfile,"w"))==NULL) return ORTE_ERR_OUT_OF_RESOURCE;
global_jobid = jobid;
OBJ_CONSTRUCT(&nodes, ompi_list_t);
rc = orte_ras_base_node_query_alloc(&nodes, jobid);
if(ORTE_SUCCESS != rc) {
@ -538,12 +621,7 @@ int orte_pls_poe_launch_interactive(orte_jobid_t jobid)
ompi_output(0, "orte_pls_poe: execv failed with errno=%d\n", errno);
exit(-1);
} else {
/*
ompi_output(0, "\n\nBEFORE WAIT!!\n\n");
orte_waitpid(pid,&status,0);
ompi_output(0, "\n\nAFTER WAIT!!\n\n");
*/
orte_wait_cb(pid, orte_pls_poe_wait_proc, &map);
orte_wait_cb(pid, orte_pls_poe_wait_proc, NULL);
}
cleanup: