1
1

Implement support for proctable queries

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2018-02-28 07:52:11 -08:00
родитель 0434b615b5
Коммит 17c40f4cea
9 изменённых файлов: 320 добавлений и 26 удалений

Просмотреть файл

@ -573,11 +573,13 @@ typedef uint8_t pmix_proc_state_t;
#define PMIX_PROC_STATE_ABORTED_BY_SIG (PMIX_PROC_STATE_ERROR + 4) /* process aborted by signal */
#define PMIX_PROC_STATE_TERM_WO_SYNC (PMIX_PROC_STATE_ERROR + 5) /* process exit'd w/o calling PMIx_Finalize */
#define PMIX_PROC_STATE_COMM_FAILED (PMIX_PROC_STATE_ERROR + 6) /* process communication has failed */
#define PMIX_PROC_STATE_CALLED_ABORT (PMIX_PROC_STATE_ERROR + 7) /* process called "PMIx_Abort" */
#define PMIX_PROC_STATE_MIGRATING (PMIX_PROC_STATE_ERROR + 8) /* process failed and is waiting for resources before restarting */
#define PMIX_PROC_STATE_CANNOT_RESTART (PMIX_PROC_STATE_ERROR + 9) /* process failed and cannot be restarted */
#define PMIX_PROC_STATE_TERM_NON_ZERO (PMIX_PROC_STATE_ERROR + 10) /* process exited with a non-zero status, indicating abnormal */
#define PMIX_PROC_STATE_FAILED_TO_LAUNCH (PMIX_PROC_STATE_ERROR + 11) /* unable to launch process */
#define PMIX_PROC_STATE_SENSOR_BOUND_EXCEEDED (PMIX_PROC_STATE_ERROR + 7) /* process exceeded a sensor limit */
#define PMIX_PROC_STATE_CALLED_ABORT (PMIX_PROC_STATE_ERROR + 8) /* process called "PMIx_Abort" */
#define PMIX_PROC_STATE_HEARTBEAT_FAILED (PMIX_PROC_STATE_ERROR + 9) /* process failed to send heartbeat w/in time limit */
#define PMIX_PROC_STATE_MIGRATING (PMIX_PROC_STATE_ERROR + 10) /* process failed and is waiting for resources before restarting */
#define PMIX_PROC_STATE_CANNOT_RESTART (PMIX_PROC_STATE_ERROR + 11) /* process failed and cannot be restarted */
#define PMIX_PROC_STATE_TERM_NON_ZERO (PMIX_PROC_STATE_ERROR + 12) /* process exited with a non-zero status, indicating abnormal */
#define PMIX_PROC_STATE_FAILED_TO_LAUNCH (PMIX_PROC_STATE_ERROR + 13) /* unable to launch process */
/**** PMIX ERROR CONSTANTS ****/
@ -1356,16 +1358,20 @@ struct pmix_info_t {
} \
} while (0)
#define PMIX_INFO_LOAD(m, k, v, t) \
do { \
(void)strncpy((m)->key, (k), PMIX_MAX_KEYLEN); \
pmix_value_load(&((m)->value), (v), (t)); \
} while (0)
#define PMIX_INFO_XFER(d, s) \
#define PMIX_INFO_LOAD(m, k, v, t) \
do { \
(void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \
(d)->flags = (s)->flags; \
pmix_value_xfer(&(d)->value, &(s)->value); \
if (NULL != (k)) { \
(void)strncpy((m)->key, (k), PMIX_MAX_KEYLEN); \
} \
pmix_value_load(&((m)->value), (v), (t)); \
} while (0)
#define PMIX_INFO_XFER(d, s) \
do { \
if (NULL != (s)->key) { \
(void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \
} \
(d)->flags = (s)->flags; \
pmix_value_xfer(&(d)->value, &(s)->value); \
} while(0)
#define PMIX_INFO_REQUIRED(m) \
@ -1386,7 +1392,9 @@ struct pmix_info_t {
(r) = PMIX_ERR_NOMEM; \
break; \
} \
_kv->key = strdup(_info[_n].key); \
if (NULL != _info[_n].key) { \
_kv->key = strdup(_info[_n].key); \
} \
PMIX_VALUE_XFER((r), _kv->value, &_info[_n].value);\
if (PMIX_SUCCESS != (r)) { \
PMIX_RELEASE(_kv); \

Просмотреть файл

@ -63,6 +63,7 @@ static void query_cbfunc(struct pmix_peer_t *peer,
PMIX_BFROPS_UNPACK(rc, peer, buf, &results->status, &cnt, PMIX_STATUS);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
results->status = rc;
goto complete;
}
if (PMIX_SUCCESS != results->status) {
@ -74,6 +75,7 @@ static void query_cbfunc(struct pmix_peer_t *peer,
PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
results->status = rc;
goto complete;
}
if (0 < results->ninfo) {
@ -82,6 +84,7 @@ static void query_cbfunc(struct pmix_peer_t *peer,
PMIX_BFROPS_UNPACK(rc, peer, buf, results->info, &cnt, PMIX_INFO);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
results->status = rc;
goto complete;
}
}

Просмотреть файл

@ -71,8 +71,12 @@ PMIX_EXPORT const char* PMIx_Proc_state_string(pmix_proc_state_t state)
return "PROC TERMINATED WITHOUT CALLING PMIx_Finalize";
case PMIX_PROC_STATE_COMM_FAILED:
return "PROC LOST COMMUNICATION";
case PMIX_PROC_STATE_SENSOR_BOUND_EXCEEDED:
return "PROC SENSOR BOUND EXCEEDED";
case PMIX_PROC_STATE_CALLED_ABORT:
return "PROC CALLED PMIx_Abort";
case PMIX_PROC_STATE_HEARTBEAT_FAILED:
return "PROC FAILED TO REPORT HEARTBEAT";
case PMIX_PROC_STATE_MIGRATING:
return "PROC WAITING TO MIGRATE";
case PMIX_PROC_STATE_CANNOT_RESTART:

Просмотреть файл

@ -373,6 +373,7 @@ pmix_status_t pmix_bfrops_base_copy_pinfo(pmix_proc_info_t **dest,
if (NULL == p) {
return PMIX_ERR_NOMEM;
}
memcpy(&p->proc, &src->proc, sizeof(pmix_proc_t));
if (NULL != src->hostname) {
p->hostname = strdup(src->hostname);
}
@ -623,7 +624,7 @@ pmix_status_t pmix_bfrops_base_copy_darray(pmix_data_array_t **dest,
p1 = (pmix_info_t*)p->array;
s1 = (pmix_info_t*)src->array;
for (n=0; n < src->size; n++) {
PMIX_INFO_LOAD(&p1[n], s1[n].key, &s1[n].value.data.flag, s1[n].value.type);
PMIX_INFO_XFER(&p1[n], &s1[n]);
}
break;
case PMIX_PDATA:
@ -635,7 +636,7 @@ pmix_status_t pmix_bfrops_base_copy_darray(pmix_data_array_t **dest,
pd = (pmix_pdata_t*)p->array;
sd = (pmix_pdata_t*)src->array;
for (n=0; n < src->size; n++) {
PMIX_PDATA_LOAD(&pd[n], &sd[n].proc, sd[n].key, &sd[n].value.data.flag, sd[n].value.type);
PMIX_PDATA_XFER(&pd[n], &sd[n]);
}
break;
case PMIX_BUFFER:

Просмотреть файл

@ -909,7 +909,9 @@ void pmix3x_value_load(pmix_value_t *v,
v->data.darray->array = info;
n=0;
OPAL_LIST_FOREACH(val, list, opal_value_t) {
(void)strncpy(info[n].key, val->key, PMIX_MAX_KEYLEN);
if (NULL != val->key) {
(void)strncpy(info[n].key, val->key, PMIX_MAX_KEYLEN);
}
pmix3x_value_load(&info[n].value, val);
++n;
}
@ -917,6 +919,32 @@ void pmix3x_value_load(pmix_value_t *v,
v->data.darray->array = NULL;
}
break;
case OPAL_PROC_INFO:
v->type = PMIX_PROC_INFO;
PMIX_PROC_INFO_CREATE(v->data.pinfo, 1);
/* see if this job is in our list of known nspaces */
found = false;
OPAL_LIST_FOREACH(job, &mca_pmix_pmix3x_component.jobids, opal_pmix3x_jobid_trkr_t) {
if (job->jobid == kv->data.pinfo.name.jobid) {
(void)strncpy(v->data.pinfo->proc.nspace, job->nspace, PMIX_MAX_NSLEN);
found = true;
break;
}
}
if (!found) {
(void)opal_snprintf_jobid(v->data.pinfo->proc.nspace, PMIX_MAX_NSLEN, kv->data.pinfo.name.jobid);
}
v->data.pinfo->proc.rank = pmix3x_convert_opalrank(kv->data.pinfo.name.vpid);
if (NULL != kv->data.pinfo.hostname) {
v->data.pinfo->hostname = strdup(kv->data.pinfo.hostname);
}
if (NULL != kv->data.pinfo.executable_name) {
v->data.pinfo->executable_name = strdup(kv->data.pinfo.executable_name);
}
v->data.pinfo->pid = kv->data.pinfo.pid;
v->data.pinfo->exit_code = kv->data.pinfo.exit_code;
v->data.pinfo->state = pmix3x_convert_opalstate(kv->data.pinfo.state);
break;
case OPAL_ENVAR:
v->type = PMIX_ENVAR;
PMIX_ENVAR_CONSTRUCT(&v->data.envar);
@ -1099,7 +1127,9 @@ int pmix3x_value_unload(opal_value_t *kv,
/* handle the various types */
if (PMIX_INFO == v->data.darray->type) {
pmix_info_t *iptr = (pmix_info_t*)v->data.darray->array;
ival->key = strdup(iptr[n].key);
if (NULL != iptr[n].key) {
ival->key = strdup(iptr[n].key);
}
rc = pmix3x_value_unload(ival, &iptr[n].value);
if (OPAL_SUCCESS != rc) {
OPAL_LIST_RELEASE(lt);
@ -1110,6 +1140,37 @@ int pmix3x_value_unload(opal_value_t *kv,
}
}
break;
case PMIX_PROC_INFO:
kv->type = OPAL_PROC_INFO;
if (NULL == v->data.pinfo) {
rc = OPAL_ERR_BAD_PARAM;
break;
}
/* see if this job is in our list of known nspaces */
found = false;
OPAL_LIST_FOREACH(job, &mca_pmix_pmix3x_component.jobids, opal_pmix3x_jobid_trkr_t) {
if (0 == strncmp(job->nspace, v->data.pinfo->proc.nspace, PMIX_MAX_NSLEN)) {
kv->data.pinfo.name.jobid = job->jobid;
found = true;
break;
}
}
if (!found) {
if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&kv->data.pinfo.name.jobid, v->data.pinfo->proc.nspace))) {
return pmix3x_convert_opalrc(rc);
}
}
kv->data.pinfo.name.vpid = pmix3x_convert_rank(v->data.pinfo->proc.rank);
if (NULL != v->data.pinfo->hostname) {
kv->data.pinfo.hostname = strdup(v->data.pinfo->hostname);
}
if (NULL != v->data.pinfo->executable_name) {
kv->data.pinfo.executable_name = strdup(v->data.pinfo->executable_name);
}
kv->data.pinfo.pid = v->data.pinfo->pid;
kv->data.pinfo.exit_code = v->data.pinfo->exit_code;
kv->data.pinfo.state = pmix3x_convert_state(v->data.pinfo->state);
break;
case PMIX_ENVAR:
kv->type = OPAL_ENVAR;
OBJ_CONSTRUCT(&kv->data.envar, opal_envar_t);
@ -1347,6 +1408,7 @@ static void infocbfunc(pmix_status_t status,
opal_list_append(results, &iptr->super);
iptr->key = strdup(info[n].key);
if (OPAL_SUCCESS != (rc = pmix3x_value_unload(iptr, &info[n].value))) {
OPAL_ERROR_LOG(rc);
OPAL_LIST_RELEASE(results);
results = NULL;
break;
@ -1510,6 +1572,103 @@ opal_pmix_alloc_directive_t pmix3x_convert_allocdir(pmix_alloc_directive_t dir)
}
}
int pmix3x_convert_state(pmix_proc_state_t state)
{
switch(state) {
case PMIX_PROC_STATE_UNDEF:
return 0;
case PMIX_PROC_STATE_PREPPED:
case PMIX_PROC_STATE_LAUNCH_UNDERWAY:
return 1;
case PMIX_PROC_STATE_RESTART:
return 2;
case PMIX_PROC_STATE_TERMINATE:
return 3;
case PMIX_PROC_STATE_RUNNING:
return 4;
case PMIX_PROC_STATE_CONNECTED:
return 5;
case PMIX_PROC_STATE_UNTERMINATED:
return 15;
case PMIX_PROC_STATE_TERMINATED:
return 20;
case PMIX_PROC_STATE_KILLED_BY_CMD:
return 51;
case PMIX_PROC_STATE_ABORTED:
return 52;
case PMIX_PROC_STATE_FAILED_TO_START:
return 53;
case PMIX_PROC_STATE_ABORTED_BY_SIG:
return 54;
case PMIX_PROC_STATE_TERM_WO_SYNC:
return 55;
case PMIX_PROC_STATE_COMM_FAILED:
return 56;
case PMIX_PROC_STATE_SENSOR_BOUND_EXCEEDED:
return 57;
case PMIX_PROC_STATE_CALLED_ABORT:
return 58;
case PMIX_PROC_STATE_HEARTBEAT_FAILED:
return 59;
case PMIX_PROC_STATE_MIGRATING:
return 60;
case PMIX_PROC_STATE_CANNOT_RESTART:
return 61;
case PMIX_PROC_STATE_TERM_NON_ZERO:
return 62;
case PMIX_PROC_STATE_FAILED_TO_LAUNCH:
return 63;
default:
return 0; // undef
}
}
pmix_proc_state_t pmix3x_convert_opalstate(int state)
{
switch(state) {
case 0:
return PMIX_PROC_STATE_UNDEF;
case 1:
return PMIX_PROC_STATE_LAUNCH_UNDERWAY;
case 2:
return PMIX_PROC_STATE_RESTART;
case 3:
return PMIX_PROC_STATE_TERMINATE;
case 4:
return PMIX_PROC_STATE_RUNNING;
case 5:
return PMIX_PROC_STATE_CONNECTED;
case 51:
return PMIX_PROC_STATE_KILLED_BY_CMD;
case 52:
return PMIX_PROC_STATE_ABORTED;
case 53:
return PMIX_PROC_STATE_FAILED_TO_START;
case 54:
return PMIX_PROC_STATE_ABORTED_BY_SIG;
case 55:
return PMIX_PROC_STATE_TERM_WO_SYNC;
case 56:
return PMIX_PROC_STATE_COMM_FAILED;
case 57:
return PMIX_PROC_STATE_SENSOR_BOUND_EXCEEDED;
case 58:
return PMIX_PROC_STATE_CALLED_ABORT;
case 59:
return PMIX_PROC_STATE_HEARTBEAT_FAILED;
case 60:
return PMIX_PROC_STATE_MIGRATING;
case 61:
return PMIX_PROC_STATE_CANNOT_RESTART;
case 62:
return PMIX_PROC_STATE_TERM_NON_ZERO;
case 63:
return PMIX_PROC_STATE_FAILED_TO_LAUNCH;
default:
return PMIX_PROC_STATE_UNDEF;
}
}
/**** INSTANTIATE INTERNAL CLASSES ****/
OBJ_CLASS_INSTANCE(opal_pmix3x_jobid_trkr_t,
opal_list_item_t,

Просмотреть файл

@ -342,6 +342,11 @@ OPAL_MODULE_DECLSPEC opal_pmix_alloc_directive_t pmix3x_convert_allocdir(pmix_al
OPAL_MODULE_DECLSPEC char* pmix3x_convert_jobid(opal_jobid_t jobid);
OPAL_MODULE_DECLSPEC int pmix3x_convert_state(pmix_proc_state_t state);
OPAL_MODULE_DECLSPEC pmix_proc_state_t pmix3x_convert_opalstate(int state);
END_C_DECLS
#endif /* MCA_PMIX_EXTERNAL_H */

Просмотреть файл

@ -954,6 +954,7 @@ static void info_cbfunc(int status,
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
(void)strncpy(pcaddy->info[n].key, kv->key, PMIX_MAX_KEYLEN);
pmix3x_value_load(&pcaddy->info[n].value, kv);
++n;
}
}
/* we are done with the incoming data */
@ -1012,10 +1013,20 @@ static pmix_status_t server_query(pmix_proc_t *proct,
for (m=0; m < queries[n].nqual; m++) {
oinfo = OBJ_NEW(opal_value_t);
opal_list_append(&q->qualifiers, &oinfo->super);
oinfo->key = strdup(queries[n].qualifiers[m].key);
if (OPAL_SUCCESS != (rc = pmix3x_value_unload(oinfo, &queries[n].qualifiers[m].value))) {
OBJ_RELEASE(opalcaddy);
return pmix3x_convert_opalrc(rc);
if (0 == strcmp(queries[n].qualifiers[m].key, PMIX_NSPACE)) {
/* must convert this to jobid */
oinfo->key = strdup(OPAL_PMIX_PROCID);
if (OPAL_SUCCESS != (rc = opal_convert_string_to_jobid(&oinfo->data.name.jobid, queries[n].qualifiers[m].value.data.string))) {
OBJ_RELEASE(opalcaddy);
return pmix3x_convert_opalrc(rc);
}
} else {
oinfo->key = strdup(queries[n].qualifiers[m].key);
if (OPAL_SUCCESS != (rc = pmix3x_value_unload(oinfo, &queries[n].qualifiers[m].value))) {
OBJ_RELEASE(opalcaddy);
return pmix3x_convert_opalrc(rc);
}
}
}
}

Просмотреть файл

@ -534,7 +534,7 @@ OBJ_CLASS_DECLARATION(opal_pmix_modex_data_t);
typedef struct {
opal_list_item_t super;
char **keys;
opal_list_t qualifiers;
opal_list_t qualifiers; // list of opal_value_t
} opal_pmix_query_t;
OBJ_CLASS_DECLARATION(opal_pmix_query_t);

Просмотреть файл

@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
@ -465,9 +465,11 @@ static void _query(int sd, short args, void *cbdata)
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
opal_pmix_query_t *q;
opal_value_t *kv;
orte_jobid_t jobid;
orte_job_t *jdata;
orte_proc_t *proct;
int rc, i, num_replies;
orte_app_context_t *app;
int rc, i, k, num_replies;
opal_list_t *results, targets, *array;
size_t n;
uint32_t key;
@ -683,10 +685,111 @@ static void _query(int sd, short args, void *cbdata)
kv->type = OPAL_STRING;
kv->data.string = strdup(orte_process_info.my_hnp_uri);
opal_list_append(results, &kv->super);
} else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_PROC_TABLE)) {
/* the job they are asking about is in the qualifiers */
jobid = ORTE_JOBID_INVALID;
OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
/* save the id */
jobid = kv->data.name.jobid;
break;
}
}
if (ORTE_JOBID_INVALID == jobid) {
rc = ORTE_ERR_BAD_PARAM;
goto done;
}
/* construct a list of values with opal_proc_info_t
* entries for each proc in the indicated job */
jdata = orte_get_job_data_object(jobid);
if (NULL == jdata) {
rc = ORTE_ERR_NOT_FOUND;
goto done;
}
/* setup the reply */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_PROC_TABLE);
kv->type = OPAL_PTR;
array = OBJ_NEW(opal_list_t);
kv->data.ptr = array;
opal_list_append(results, &kv->super);
/* cycle thru the job and create an entry for each proc */
for (k=0; k < jdata->procs->size; k++) {
if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
continue;
}
kv = OBJ_NEW(opal_value_t);
kv->type = OPAL_PROC_INFO;
kv->data.pinfo.name.jobid = jobid;
kv->data.pinfo.name.vpid = proct->name.vpid;
if (NULL != proct->node && NULL != proct->node->name) {
kv->data.pinfo.hostname = strdup(proct->node->name);
}
app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
if (NULL != app && NULL != app->app) {
kv->data.pinfo.executable_name = strdup(app->app);
}
kv->data.pinfo.pid = proct->pid;
kv->data.pinfo.exit_code = proct->exit_code;
kv->data.pinfo.state = proct->state;
opal_list_append(array, &kv->super);
}
} else if (0 == strcmp(q->keys[n], OPAL_PMIX_QUERY_LOCAL_PROC_TABLE)) {
/* the job they are asking about is in the qualifiers */
jobid = ORTE_JOBID_INVALID;
OPAL_LIST_FOREACH(kv, &q->qualifiers, opal_value_t) {
if (0 == strcmp(kv->key, OPAL_PMIX_PROCID)) {
/* save the id */
jobid = kv->data.name.jobid;
break;
}
}
if (ORTE_JOBID_INVALID == jobid) {
rc = ORTE_ERR_BAD_PARAM;
goto done;
}
/* construct a list of values with opal_proc_info_t
* entries for each LOCAL proc in the indicated job */
jdata = orte_get_job_data_object(jobid);
if (NULL == jdata) {
rc = ORTE_ERR_NOT_FOUND;
goto done;
}
/* setup the reply */
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(OPAL_PMIX_QUERY_LOCAL_PROC_TABLE);
kv->type = OPAL_PTR;
array = OBJ_NEW(opal_list_t);
kv->data.ptr = array;
opal_list_append(results, &kv->super);
/* cycle thru the job and create an entry for each proc */
for (k=0; k < jdata->procs->size; k++) {
if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, k))) {
continue;
}
if (ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_LOCAL)) {
kv = OBJ_NEW(opal_value_t);
kv->type = OPAL_PROC_INFO;
kv->data.pinfo.name.jobid = jobid;
kv->data.pinfo.name.vpid = proct->name.vpid;
if (NULL != proct->node && NULL != proct->node->name) {
kv->data.pinfo.hostname = strdup(proct->node->name);
}
app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, proct->app_idx);
if (NULL != app && NULL != app->app) {
kv->data.pinfo.executable_name = strdup(app->app);
}
kv->data.pinfo.pid = proct->pid;
kv->data.pinfo.exit_code = proct->exit_code;
kv->data.pinfo.state = proct->state;
opal_list_append(array, &kv->super);
}
}
}
}
}
done:
if (0 == opal_list_get_size(results)) {
rc = ORTE_ERR_NOT_FOUND;
} else if (opal_list_get_size(results) < opal_list_get_size(cd->info)) {