1
1

Ensure that data from a job that was stored in ompi-server is purged once that job completes. Cleanup a few typos. Silence a Coverity warning

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-05-30 09:43:01 -07:00
родитель 22631832ce
Коммит 9a8811a246
8 изменённых файлов: 137 добавлений и 22 удалений

Просмотреть файл

@ -24,6 +24,8 @@
#include "opal/mca/event/event.h" #include "opal/mca/event/event.h"
#include "opal/mca/pmix/pmix.h" #include "opal/mca/pmix/pmix.h"
#include "orte/orted/pmix/pmix_server_internal.h"
#include "orte/runtime/orte_data_server.h"
#include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h" #include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h" #include "orte/mca/errmgr/errmgr.h"
@ -466,6 +468,50 @@ void orte_state_base_report_progress(int fd, short argc, void *cbdata)
OBJ_RELEASE(caddy); OBJ_RELEASE(caddy);
} }
void orte_state_base_notify_data_server(orte_process_name_t *target)
{
opal_buffer_t *buf;
int rc, room = -1;
uint8_t cmd = ORTE_PMIX_PURGE_PROC_CMD;
/* if nobody local to us published anything, then we can ignore this */
if (ORTE_JOBID_INVALID == orte_pmix_server_globals.server.jobid) {
return;
}
buf = OBJ_NEW(opal_buffer_t);
/* pack the room number */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &room, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* load the command */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &cmd, 1, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* provide the target */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, target, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
return;
}
/* send the request to the server */
rc = orte_rml.send_buffer_nb(orte_mgmt_conduit,
&orte_pmix_server_globals.server, buf,
ORTE_RML_TAG_DATA_SERVER,
orte_rml_send_callback, NULL);
if (ORTE_SUCCESS != rc) {
OBJ_RELEASE(buf);
}
}
static void _send_notification(int status, static void _send_notification(int status,
orte_proc_state_t state, orte_proc_state_t state,
orte_process_name_t *proc, orte_process_name_t *proc,
@ -725,6 +771,13 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
if (orte_state_base_run_fdcheck) { if (orte_state_base_run_fdcheck) {
orte_state_base_check_fds(jdata); orte_state_base_check_fds(jdata);
} }
/* if ompi-server is around, then notify it to purge
* any session-related info */
if (NULL != orte_data_server_uri) {
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
orte_state_base_notify_data_server(&target);
}
ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED); ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_TERMINATED);
/* if they requested notification upon completion, provide it */ /* if they requested notification upon completion, provide it */
if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) { if (orte_get_attribute(&jdata->attributes, ORTE_JOB_NOTIFY_COMPLETION, NULL, OPAL_BOOL)) {
@ -1035,6 +1088,7 @@ void orte_state_base_check_fds(orte_job_t *jdata)
char path[1024], info[256], **list=NULL, *status, *result, *r2; char path[1024], info[256], **list=NULL, *status, *result, *r2;
ssize_t rc; ssize_t rc;
struct flock fl; struct flock fl;
bool flk;
int cnt = 0; int cnt = 0;
/* get the number of available file descriptors /* get the number of available file descriptors
@ -1066,7 +1120,11 @@ void orte_state_base_check_fds(orte_job_t *jdata)
fl.l_whence = 0; fl.l_whence = 0;
fl.l_start = 0; fl.l_start = 0;
fl.l_len = 0; fl.l_len = 0;
fcntl(i, F_GETLK, &fl); if (-1 == fcntl(i, F_GETLK, &fl)) {
flk = false;
} else {
flk = true;
}
/* construct the list of capabilities */ /* construct the list of capabilities */
if (fdflags & FD_CLOEXEC) { if (fdflags & FD_CLOEXEC) {
opal_argv_append_nosize(&list, "cloexec"); opal_argv_append_nosize(&list, "cloexec");
@ -1077,14 +1135,18 @@ void orte_state_base_check_fds(orte_job_t *jdata)
if (flflags & O_NONBLOCK) { if (flflags & O_NONBLOCK) {
opal_argv_append_nosize(&list, "nonblock"); opal_argv_append_nosize(&list, "nonblock");
} }
if (flflags & O_RDONLY) { /* from the man page:
* Unlike the other values that can be specified in flags,
* the access mode values O_RDONLY, O_WRONLY, and O_RDWR,
* do not specify individual bits. Rather, they define
* the low order two bits of flags, and defined respectively
* as 0, 1, and 2. */
if (O_RDONLY == (flflags & 3)) {
opal_argv_append_nosize(&list, "rdonly"); opal_argv_append_nosize(&list, "rdonly");
} } else if (O_WRONLY == (flflags & 3)) {
if (flflags & O_RDWR) {
opal_argv_append_nosize(&list, "rdwr");
}
if (flflags & O_WRONLY) {
opal_argv_append_nosize(&list, "wronly"); opal_argv_append_nosize(&list, "wronly");
} else {
opal_argv_append_nosize(&list, "rdwr");
} }
if (flflags & O_DSYNC) { if (flflags & O_DSYNC) {
opal_argv_append_nosize(&list, "dsync"); opal_argv_append_nosize(&list, "dsync");
@ -1095,7 +1157,7 @@ void orte_state_base_check_fds(orte_job_t *jdata)
if (flflags & O_SYNC) { if (flflags & O_SYNC) {
opal_argv_append_nosize(&list, "sync"); opal_argv_append_nosize(&list, "sync");
} }
if (F_UNLCK != fl.l_type) { if (flk && F_UNLCK != fl.l_type) {
if (F_WRLCK == fl.l_type) { if (F_WRLCK == fl.l_type) {
opal_argv_append_nosize(&list, "wrlock"); opal_argv_append_nosize(&list, "wrlock");
} else { } else {

Просмотреть файл

@ -78,6 +78,7 @@ ORTE_DECLSPEC void orte_state_base_report_progress(int fd, short argc, void *cbd
ORTE_DECLSPEC void orte_state_base_track_procs(int fd, short argc, void *cbdata); ORTE_DECLSPEC void orte_state_base_track_procs(int fd, short argc, void *cbdata);
ORTE_DECLSPEC void orte_state_base_check_all_complete(int fd, short args, void *cbdata); ORTE_DECLSPEC void orte_state_base_check_all_complete(int fd, short args, void *cbdata);
ORTE_DECLSPEC void orte_state_base_check_fds(orte_job_t *jdata); ORTE_DECLSPEC void orte_state_base_check_fds(orte_job_t *jdata);
ORTE_DECLSPEC void orte_state_base_notify_data_server(orte_process_name_t *target);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -27,6 +27,8 @@
#include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml.h"
#include "orte/mca/routed/routed.h" #include "orte/mca/routed/routed.h"
#include "orte/util/session_dir.h" #include "orte/util/session_dir.h"
#include "orte/orted/pmix/pmix_server_internal.h"
#include "orte/runtime/orte_data_server.h"
#include "orte/runtime/orte_quit.h" #include "orte/runtime/orte_quit.h"
#include "orte/mca/state/state.h" #include "orte/mca/state/state.h"
@ -260,6 +262,7 @@ static void track_procs(int fd, short argc, void *cbdata)
orte_std_cntr_t index; orte_std_cntr_t index;
orte_job_map_t *map; orte_job_map_t *map;
orte_node_t *node; orte_node_t *node;
orte_process_name_t target;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((5, orte_state_base_framework.framework_output,
"%s state:orted:track_procs called for proc %s state %s", "%s state:orted:track_procs called for proc %s state %s",
@ -489,6 +492,14 @@ static void track_procs(int fd, short argc, void *cbdata)
orte_state_base_check_fds(jdata); orte_state_base_check_fds(jdata);
} }
/* if ompi-server is around, then notify it to purge
* any session-related info */
if (NULL != orte_data_server_uri) {
target.jobid = jdata->jobid;
target.vpid = ORTE_VPID_WILDCARD;
orte_state_base_notify_data_server(&target);
}
/* cleanup the job info */ /* cleanup the job info */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL); opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
OBJ_RELEASE(jdata); OBJ_RELEASE(jdata);

Просмотреть файл

@ -220,6 +220,7 @@ int pmix_server_init(void)
return rc; return rc;
} }
OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t); OBJ_CONSTRUCT(&orte_pmix_server_globals.notifications, opal_list_t);
orte_pmix_server_globals.server = *ORTE_NAME_INVALID;
/* setup recv for direct modex requests */ /* setup recv for direct modex requests */
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX, orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX,

Просмотреть файл

@ -45,8 +45,9 @@
#include "opal/util/proc.h" #include "opal/util/proc.h"
#include "orte/mca/grpcomm/base/base.h" #include "orte/mca/grpcomm/base/base.h"
#include "orte/runtime/orte_globals.h"
BEGIN_C_DECLS BEGIN_C_DECLS
#define ORTED_PMIX_MIN_DMX_TIMEOUT 10 #define ORTED_PMIX_MIN_DMX_TIMEOUT 10
#define ORTE_ADJUST_TIMEOUT(a) \ #define ORTE_ADJUST_TIMEOUT(a) \
@ -252,7 +253,6 @@ typedef struct {
opal_hotel_t reqs; opal_hotel_t reqs;
int num_rooms; int num_rooms;
int timeout; int timeout;
char *server_uri;
bool wait_for_server; bool wait_for_server;
orte_process_name_t server; orte_process_name_t server;
opal_list_t notifications; opal_list_t notifications;

Просмотреть файл

@ -69,7 +69,7 @@ static int init_server(void)
if (NULL == filename) { if (NULL == filename) {
/* filename is not correctly formatted */ /* filename is not correctly formatted */
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true, orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-bad", true,
orte_basename, orte_pmix_server_globals.server_uri); orte_basename, orte_data_server_uri);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
++filename; /* space past the : */ ++filename; /* space past the : */
@ -77,7 +77,7 @@ static int init_server(void)
if (0 >= strlen(filename)) { if (0 >= strlen(filename)) {
/* they forgot to give us the name! */ /* they forgot to give us the name! */
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true, orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-missing", true,
orte_basename, orte_pmix_server_globals.server_uri); orte_basename, orte_data_server_uri);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
@ -85,14 +85,14 @@ static int init_server(void)
fp = fopen(filename, "r"); fp = fopen(filename, "r");
if (NULL == fp) { /* can't find or read file! */ if (NULL == fp) { /* can't find or read file! */
orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true, orte_show_help("help-orterun.txt", "orterun:ompi-server-filename-access", true,
orte_basename, orte_pmix_server_globals.server_uri); orte_basename, orte_data_server_uri);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
if (NULL == fgets(input, 1024, fp)) { if (NULL == fgets(input, 1024, fp)) {
/* something malformed about file */ /* something malformed about file */
fclose(fp); fclose(fp);
orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true, orte_show_help("help-orterun.txt", "orterun:ompi-server-file-bad", true,
orte_basename, orte_pmix_server_globals.server_uri, orte_basename, orte_data_server_uri,
orte_basename); orte_basename);
return ORTE_ERR_BAD_PARAM; return ORTE_ERR_BAD_PARAM;
} }
@ -100,7 +100,7 @@ static int init_server(void)
input[strlen(input)-1] = '\0'; /* remove newline */ input[strlen(input)-1] = '\0'; /* remove newline */
server = strdup(input); server = strdup(input);
} else { } else {
server = strdup(orte_pmix_server_globals.server_uri); server = strdup(orte_data_server_uri);
} }
/* setup our route to the server */ /* setup our route to the server */
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
@ -154,8 +154,8 @@ static void execute(int sd, short args, void *cbdata)
/* we need to initialize our connection to the server */ /* we need to initialize our connection to the server */
if (ORTE_SUCCESS != (rc = init_server())) { if (ORTE_SUCCESS != (rc = init_server())) {
orte_show_help("help-orted.txt", "noserver", true, orte_show_help("help-orted.txt", "noserver", true,
(NULL == orte_pmix_server_globals.server_uri) ? (NULL == orte_data_server_uri) ?
"NULL" : orte_pmix_server_globals.server_uri); "NULL" : orte_data_server_uri);
goto callback; goto callback;
} }
} }

Просмотреть файл

@ -653,6 +653,46 @@ void orte_data_server(int status, orte_process_name_t* sender,
goto SEND_ANSWER; goto SEND_ANSWER;
break; break;
case ORTE_PMIX_PURGE_PROC_CMD:
/* unpack the proc whose data is to be purged - session
* data is purged by providing a requestor whose rank
* is wildcard */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &requestor, &count, OPAL_NAME))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_data_server_output,
"%s data server: purge data from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&requestor)));
/* cycle across the stored data, looking for a match */
for (k=0; k < orte_data_server_store.size; k++) {
data = (orte_data_object_t*)opal_pointer_array_get_item(&orte_data_server_store, k);
if (NULL == data) {
continue;
}
/* check if data posted by the same process */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &data->owner, &requestor)) {
continue;
}
/* check persistence - if it is intended to persist beyond the
* proc itself, then we only delete it if rank=wildcard*/
if ((data->persistence == OPAL_PMIX_PERSIST_APP ||
data->persistence == OPAL_PMIX_PERSIST_SESSION) &&
ORTE_VPID_WILDCARD != requestor.vpid) {
continue;
}
/* remove the object */
opal_pointer_array_set_item(&orte_data_server_store, k, NULL);
OBJ_RELEASE(data);
}
/* no response is required */
OBJ_RELEASE(answer);
return;
default: default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
rc = ORTE_ERR_BAD_PARAM; rc = ORTE_ERR_BAD_PARAM;

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved. * Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2007 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2007 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2015 Intel, Inc. All rights reserved. * Copyright (c) 2015-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -38,7 +38,7 @@ BEGIN_C_DECLS
#define ORTE_PMIX_PUBLISH_CMD 0x01 #define ORTE_PMIX_PUBLISH_CMD 0x01
#define ORTE_PMIX_LOOKUP_CMD 0x02 #define ORTE_PMIX_LOOKUP_CMD 0x02
#define ORTE_PMIX_UNPUBLISH_CMD 0x03 #define ORTE_PMIX_UNPUBLISH_CMD 0x03
#define ORTE_PMIX_PURGE_PROC_CMD 0x04
/* provide hooks to startup and finalize the data server */ /* provide hooks to startup and finalize the data server */
ORTE_DECLSPEC int orte_data_server_init(void); ORTE_DECLSPEC int orte_data_server_init(void);