1
1

Backport changes from PMIx reference server

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-09-14 11:48:56 -07:00
родитель 0851122cce
Коммит 7c7d8a69a0
12 изменённых файлов: 146 добавлений и 61 удалений

Просмотреть файл

@ -244,6 +244,7 @@ typedef uint32_t pmix_rank_t;
#define PMIX_HWLOC_XML_V1 "pmix.hwlocxml1" // (char*) XML representation of local topology using HWLOC v1.x format
#define PMIX_HWLOC_XML_V2 "pmix.hwlocxml2" // (char*) XML representation of local topology using HWLOC v2.x format
/* request-related info */
#define PMIX_COLLECT_DATA "pmix.collect" // (bool) collect data and return it at the end of the operation
#define PMIX_TIMEOUT "pmix.timeout" // (int) time in sec before specified operation should time out (0 => infinite)
@ -263,6 +264,9 @@ typedef uint32_t pmix_rank_t;
// not request data from the server if not found
#define PMIX_EMBED_BARRIER "pmix.embed.barrier" // (bool) execute a blocking fence operation before executing the
// specified operation
#define PMIX_JOB_TERM_STATUS "pmix.job.term.status" // (pmix_status_t) status returned upon job termination
#define PMIX_PROC_STATE_STATUS "pmix.proc.state" // (pmix_proc_state_t) process state
/* attributes used by host server to pass data to the server convenience library - the
* data will then be parsed and provided to the local clients */
@ -274,10 +278,12 @@ typedef uint32_t pmix_rank_t;
#define PMIX_APP_MAP_TYPE "pmix.apmap.type" // (char*) type of mapping used to layout the application (e.g., cyclic)
#define PMIX_APP_MAP_REGEX "pmix.apmap.regex" // (char*) regex describing the result of the mapping
/* attributes used internally to communicate data from the server to the client */
#define PMIX_PROC_BLOB "pmix.pblob" // (pmix_byte_object_t) packed blob of process data
#define PMIX_MAP_BLOB "pmix.mblob" // (pmix_byte_object_t) packed blob of process location
/* event handler registration and notification info keys */
#define PMIX_EVENT_HDLR_NAME "pmix.evname" // (char*) string name identifying this handler
#define PMIX_EVENT_JOB_LEVEL "pmix.evjob" // (bool) register for job-specific events only
@ -297,6 +303,9 @@ typedef uint32_t pmix_rank_t;
#define PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked
// NOTE: the object will _only_ be returned to the process that
// registered it
#define PMIX_EVENT_DO_NOT_CACHE "pmix.evnocache" // (bool) instruct the PMIx server not to cache the event
#define PMIX_EVENT_SILENT_TERMINATION "pmix.evsilentterm" // (bool) do not generate an event when this job normally terminates
/* fault tolerance-related events */
#define PMIX_EVENT_TERMINATE_SESSION "pmix.evterm.sess" // (bool) RM intends to terminate session

Просмотреть файл

@ -807,16 +807,33 @@ static void _notify_client_event(int sd, short args, void *cbdata)
PMIx_Data_range_string(cd->range),
cd->nondefault ? "NONDEFAULT" : "OPEN");
/* we cannot know if everyone who wants this notice has had a chance
* to register for it - the notice may be coming too early. So cache
* the message until all local procs have received it, or it ages to
* the point where it gets pushed out by more recent events */
PMIX_RETAIN(cd);
rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd);
/* check for caching instructions */
holdcd = true;
if (0 < cd->ninfo) {
/* check for caching instructions */
for (n=0; n < cd->ninfo; n++) {
if (0 == strncmp(cd->info[n].key, PMIX_EVENT_DO_NOT_CACHE, PMIX_MAX_KEYLEN)) {
if (PMIX_UNDEF == cd->info[n].value.type ||
cd->info[n].value.data.flag) {
holdcd = false;
break;
}
}
}
}
/* if an older event was bumped, release it */
if (NULL != rbout) {
PMIX_RELEASE(rbout);
if (holdcd) {
/* we cannot know if everyone who wants this notice has had a chance
* to register for it - the notice may be coming too early. So cache
* the message until all local procs have received it, or it ages to
* the point where it gets pushed out by more recent events */
PMIX_RETAIN(cd);
rbout = pmix_ring_buffer_push(&pmix_globals.notifications, cd);
/* if an older event was bumped, release it */
if (NULL != rbout) {
PMIX_RELEASE(rbout);
}
}
holdcd = false;
@ -984,8 +1001,15 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status,
cd->source.rank = source->rank;
}
cd->range = range;
cd->info = info;
cd->ninfo = ninfo;
/* have to copy the info to preserve it for future when cached */
if (0 < ninfo) {
cd->ninfo = ninfo;
PMIX_INFO_CREATE(cd->info, cd->ninfo);
/* need to copy the info */
for (n=0; n < cd->ninfo; n++) {
PMIX_INFO_XFER(&cd->info[n], &info[n]);
}
}
/* check for directives */
if (NULL != info) {

Просмотреть файл

@ -1155,6 +1155,8 @@ static int notify_event(int status,
}
op = OBJ_NEW(pmix2x_opcaddy_t);
op->opcbfunc = cbfunc;
op->cbdata = cbdata;
/* convert the status */
pstatus = pmix2x_convert_opalrc(status);
@ -1183,7 +1185,14 @@ static int notify_event(int status,
n=0;
OPAL_LIST_FOREACH(kv, info, opal_value_t) {
(void)strncpy(op->info[n].key, kv->key, PMIX_MAX_KEYLEN);
pmix2x_value_load(&op->info[n].value, kv);
/* little dicey here as we need to convert a status, if
* provided, and it will be an int coming down to us */
if (0 == strcmp(kv->key, OPAL_PMIX_JOB_TERM_STATUS)) {
op->info[n].value.type = PMIX_STATUS;
op->info[n].value.data.status = pmix2x_convert_opalrc(kv->data.integer);
} else {
pmix2x_value_load(&op->info[n].value, kv);
}
++n;
}
}

Просмотреть файл

@ -192,6 +192,9 @@ BEGIN_C_DECLS
// not request data from the server if not found
#define OPAL_PMIX_EMBED_BARRIER "pmix.embed.barrier" // (bool) execute a blocking fence operation before executing the
// specified operation
#define OPAL_PMIX_JOB_TERM_STATUS "pmix.job.term.status" // (int) status returned upon job termination
#define OPAL_PMIX_PROC_STATE_STATUS "pmix.proc.state" // (int) process state
/* attribute used by host server to pass data to the server convenience library - the
@ -218,6 +221,9 @@ BEGIN_C_DECLS
#define OPAL_PMIX_EVENT_RETURN_OBJECT "pmix.evobject" // (void*) object to be returned whenever the registered cbfunc is invoked
// NOTE: the object will _only_ be returned to the process that
// registered it
#define OPAL_PMIX_EVENT_DO_NOT_CACHE "pmix.evnocache" // (bool) instruct the PMIx server not to cache the event
#define OPAL_PMIX_EVENT_SILENT_TERMINATION "pmix.evsilentterm" // (bool) do not generate an event when this job normally terminates
/* fault tolerance-related events */
#define OPAL_PMIX_EVENT_TERMINATE_SESSION "pmix.evterm.sess" // (bool) RM intends to terminate session

Просмотреть файл

@ -1000,7 +1000,7 @@ int get_ofi_prov_id(opal_list_t *attributes)
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"%s - get_ofi_prov_id() -> comparing sockets != %s to choose first available fabric provider",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
providers[i], cur_fi->fabric_attr->prov_name);
cur_fi->fabric_attr->prov_name);
if (0 != strcmp("sockets", cur_fi->fabric_attr->prov_name)) {
ofi_prov_id = prov_num;
opal_output_verbose(20,orte_rml_base_framework.framework_output,

Просмотреть файл

@ -50,6 +50,7 @@ static int finalize(void);
static void init_complete(int fd, short args, void *cbdata);
static void vm_ready(int fd, short args, void *cbata);
static void check_complete(int fd, short args, void *cbdata);
static void cleanup_job(int fd, short args, void *cbdata);
/******************
* DVM module - used when mpirun is persistent
@ -89,6 +90,7 @@ static orte_job_state_t launch_states[] = {
ORTE_JOB_STATE_REGISTERED,
/* termination states */
ORTE_JOB_STATE_TERMINATED,
ORTE_JOB_STATE_NOTIFY_COMPLETED,
ORTE_JOB_STATE_ALL_JOBS_COMPLETE
};
static orte_state_cbfunc_t launch_callbacks[] = {
@ -107,6 +109,7 @@ static orte_state_cbfunc_t launch_callbacks[] = {
orte_plm_base_post_launch,
orte_plm_base_registered,
check_complete,
cleanup_job,
orte_quit
};
@ -501,11 +504,24 @@ static void check_complete(int fd, short args, void *cbdata)
OPAL_OUTPUT_VERBOSE((2, orte_state_base_framework.framework_output,
"%s state:dvm:check_job_completed state is terminated - activating notify",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL,
OPAL_PMIX_RANGE_GLOBAL, NULL,
NULL, NULL);
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_NOTIFY_COMPLETED);
/* mark the job as notified */
jdata->state = ORTE_JOB_STATE_NOTIFIED;
}
OBJ_RELEASE(caddy);
}
static void cleanup_job(int sd, short args, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata;
ORTE_ACQUIRE_OBJECT(caddy);
jdata = caddy->jdata;
/* remove this object from the job array */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
OBJ_RELEASE(caddy);
}

Просмотреть файл

@ -577,7 +577,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
if (NULL == (jdata = orte_get_job_data_object(job))) {
/* we can safely ignore this request as the job
* was already cleaned up */
opal_output(0, "NULL JOB");
goto CLEANUP;
}
@ -585,7 +584,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
* can ignore this request as we would have already
* dealt with it */
if (0 < jdata->num_local_procs) {
opal_output(0, "NO PROCS");
goto CLEANUP;
}
@ -622,7 +620,6 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
OBJ_RELEASE(map);
jdata->map = NULL;
}
opal_output(0, "CLEANUP COMPLETE");
break;

Просмотреть файл

@ -725,6 +725,8 @@ static void _toolconn(int sd, short args, void *cbdata)
orte_node_t *node;
orte_process_name_t tool;
int rc;
opal_value_t *val;
bool flag;
ORTE_ACQUIRE_OBJECT(cd);
@ -788,6 +790,22 @@ static void _toolconn(int sd, short args, void *cbdata)
proc->app_idx = 0;
ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_LOCAL);
/* check for directives */
if (NULL != cd->info) {
OPAL_LIST_FOREACH(val, cd->info, opal_value_t) {
if (0 == strcmp(val->key, OPAL_PMIX_EVENT_SILENT_TERMINATION)) {
if (OPAL_UNDEF == val->type || val->data.flag) {
flag = true;
orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
}
}
}
}
flag = true;
orte_set_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION,
ORTE_ATTR_GLOBAL, &flag, OPAL_BOOL);
/* pass back the assigned jobid */
tool.jobid = jdata->jobid;
tool.vpid = 0;

Просмотреть файл

@ -57,6 +57,7 @@
#include "opal/mca/event/event.h"
#include "opal/mca/installdirs/installdirs.h"
#include "opal/mca/base/base.h"
#include "opal/mca/pmix/pmix.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/basename.h"
@ -466,17 +467,10 @@ int main(int argc, char *argv[])
exit(orte_exit_status);
}
static void send_callback(int status, orte_process_name_t *peer,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
static void notify_complete(int status, void *cbdata)
{
orte_job_t *jdata = (orte_job_t*)cbdata;
OBJ_RELEASE(buffer);
/* cleanup the job object */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
OBJ_RELEASE(jdata);
opal_list_t *info = (opal_list_t*)cbdata;
OPAL_LIST_RELEASE(info);
}
static void notify_requestor(int sd, short args, void *cbdata)
@ -484,15 +478,13 @@ static void notify_requestor(int sd, short args, void *cbdata)
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_job_t *jdata = caddy->jdata;
orte_proc_t *pptr;
int ret, id, *idptr;
int ret;
opal_buffer_t *reply;
orte_daemon_cmd_flag_t command;
orte_grpcomm_signature_t *sig;
opal_output(0, "NOTIFY JOB COMPLETE");
/* notify the requestor */
reply = OBJ_NEW(opal_buffer_t);
bool notify = true;
opal_list_t *info;
opal_value_t *val;
/* see if there was any problem */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ABORTED_PROC, (void**)&pptr, OPAL_PTR) && NULL != pptr) {
@ -503,31 +495,37 @@ opal_output(0, "NOTIFY JOB COMPLETE");
} else {
ret = 0;
}
/* return the completion status */
opal_dss.pack(reply, &ret, 1, OPAL_INT);
/* pack the jobid to be returned */
opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID);
/* return the tracker ID */
idptr = &id;
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&idptr, OPAL_INT)) {
/* pack the sender's index to the tracking object */
opal_dss.pack(reply, idptr, 1, OPAL_INT);
if (0 == ret && orte_get_attribute(&jdata->attributes, ORTE_JOB_SILENT_TERMINATION, NULL, OPAL_BOOL)) {
notify = false;
}
/* if there was a problem, we need to send the requestor more info about what happened */
if (0 < ret) {
opal_dss.pack(reply, &jdata->state, 1, ORTE_JOB_STATE_T);
opal_dss.pack(reply, &pptr, 1, ORTE_PROC);
opal_dss.pack(reply, &pptr->node, 1, ORTE_NODE);
if (notify) {
info = OBJ_NEW(opal_list_t);
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_EVENT_DO_NOT_CACHE);
val->type = OPAL_BOOL;
val->data.flag = true;
opal_list_append(info, &val->super);
/* provide the status */
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_JOB_TERM_STATUS);
val->type = OPAL_INT;
val->data.integer = ret;
opal_list_append(info, &val->super);
/* if there was a problem, we need to send the requestor more info about what happened */
if (0 < ret) {
val = OBJ_NEW(opal_value_t);
val->key = strdup(OPAL_PMIX_PROCID);
val->type = OPAL_NAME;
val->data.name = pptr->name;
opal_list_append(info, &val->super);
}
opal_pmix.notify_event(OPAL_ERR_JOB_TERMINATED, NULL,
OPAL_PMIX_RANGE_GLOBAL, info,
notify_complete, info);
}
orte_rml.send_buffer_nb(orte_mgmt_conduit,
&jdata->originator, reply,
ORTE_RML_TAG_NOTIFY_COMPLETE,
send_callback, jdata);
/* now ensure that _all_ daemons know that this job has terminated so even
* those that did not participate in it will know to cleanup the resources
* they assigned to the job. This is necessary now that the mapping function
@ -545,9 +543,4 @@ opal_output(0, "NOTIFY JOB COMPLETE");
orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
/* we cannot cleanup the job object as we might
* hit an error during transmission, so clean it
* up in the send callback */
OBJ_RELEASE(caddy);
}

Просмотреть файл

@ -506,6 +506,15 @@ static void evhandler(size_t evhdlr_registration_id,
pmix_event_notification_cbfunc_fn_t cbfunc,
void *cbdata)
{
size_t n;
if (NULL != info) {
for (n=0; n < ninfo; n++) {
if (0 == strncmp(info[n].key, PMIX_JOB_TERM_STATUS, PMIX_MAX_KEYLEN)) {
opal_output(0, "JOB COMPLETED WITH STATUS %s", PMIx_Error_string(info[n].value.data.status));
}
}
}
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
}

Просмотреть файл

@ -288,6 +288,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
return "ORTE_JOB_INFO_CACHE";
case ORTE_JOB_FULLY_DESCRIBED:
return "ORTE_JOB_FULLY_DESCRIBED";
case ORTE_JOB_SILENT_TERMINATION:
return "ORTE_JOB_SILENT_TERMINATION";
case ORTE_PROC_NOBARRIER:
return "PROC-NOBARRIER";

Просмотреть файл

@ -144,6 +144,8 @@ typedef uint16_t orte_job_flags_t;
#define ORTE_JOB_TRANSPORT_KEY (ORTE_JOB_START_KEY + 51) // string - transport keys assigned to this job
#define ORTE_JOB_INFO_CACHE (ORTE_JOB_START_KEY + 52) // opal_list_t - list of opal_value_t to be included in job_info
#define ORTE_JOB_FULLY_DESCRIBED (ORTE_JOB_START_KEY + 53) // bool - job is fully described in launch msg
#define ORTE_JOB_SILENT_TERMINATION (ORTE_JOB_START_KEY + 54) // bool - do not generate an event notification when job
// normally terminates
#define ORTE_JOB_MAX_KEY 300