1
1

Fix the DVM by ensuring that all nodes, even those that didn't participate (i.e., didn't have any local children) in a job, clean up all resources associated with that job upon its completion. With the advent of backend distributed mapping, nodes that weren't part of the job would still allocate resources on other nodes - and then start from that point when mapping the next job. This change ensures that all daemons start from the same point each time.

Signed-off-by: Ralph Castain <rhc@open-mpi.org>
Этот коммит содержится в:
Ralph Castain 2017-04-04 17:31:38 -07:00
родитель a605bd4265
Коммит 74863a0ea4
5 изменённых файлов: 93 добавлений и 3 удалений

Просмотреть файл

@ -89,6 +89,10 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* request full topology string */
#define ORTE_DAEMON_REPORT_TOPOLOGY_CMD (orte_daemon_cmd_flag_t) 33
/* tell DVM daemons to cleanup resources from job */
#define ORTE_DAEMON_DVM_CLEANUP_JOB_CMD (orte_daemon_cmd_flag_t) 34
/*
* Struct written up the pipe from the child to the parent.
*/

Просмотреть файл

@ -410,7 +410,7 @@ static void check_complete(int fd, short args, void *cbdata)
* we call the errmgr so that any attempt to restart the job will
* avoid doing so in the exact same place as the current job
*/
if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) {
if (NULL != jdata->map && jdata->state == ORTE_JOB_STATE_TERMINATED) {
map = jdata->map;
for (index = 0; index < map->nodes->size; index++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, index))) {

Просмотреть файл

@ -69,6 +69,7 @@
#include "orte/mca/odls/base/base.h"
#include "orte/mca/plm/plm.h"
#include "orte/mca/plm/base/plm_private.h"
#include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/routed/routed.h"
#include "orte/mca/ess/ess.h"
#include "orte/mca/state/state.h"
@ -122,6 +123,7 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
opal_pstats_t pstat;
char *rtmod;
char *coprocessors;
orte_job_map_t *map;
/* unpack the command */
n = 1;
@ -557,6 +559,66 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
break;
/**** DVM CLEANUP JOB COMMAND ****/
case ORTE_DAEMON_DVM_CLEANUP_JOB_CMD:
/* unpack the jobid */
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &job, &n, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
/* look up job data object */
if (NULL == (jdata = orte_get_job_data_object(job))) {
/* we can safely ignore this request as the job
* was already cleaned up */
goto CLEANUP;
}
/* if we have any local children for this job, then we
* can ignore this request as we would have already
* dealt with it */
if (0 < jdata->num_local_procs) {
goto CLEANUP;
}
/* release all resources (even those on other nodes) that we
* assigned to this job */
if (NULL != jdata->map) {
map = (orte_job_map_t*)jdata->map;
for (n = 0; n < map->nodes->size; n++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, n))) {
continue;
}
for (i = 0; i < node->procs->size; i++) {
if (NULL == (proct = (orte_proc_t*)opal_pointer_array_get_item(node->procs, i))) {
continue;
}
if (proct->name.jobid != jdata->jobid) {
/* skip procs from another job */
continue;
}
node->slots_inuse--;
node->num_procs--;
/* set the entry in the node array to NULL */
opal_pointer_array_set_item(node->procs, i, NULL);
/* release the proc once for the map entry */
OBJ_RELEASE(proct);
}
/* set the node location to NULL */
opal_pointer_array_set_item(map->nodes, n, NULL);
/* maintain accounting */
OBJ_RELEASE(node);
/* flag that the node is no longer in a map */
ORTE_FLAG_UNSET(node, ORTE_NODE_FLAG_MAPPED);
}
OBJ_RELEASE(map);
jdata->map = NULL;
}
break;
/**** REPORT TOPOLOGY COMMAND ****/
case ORTE_DAEMON_REPORT_TOPOLOGY_CMD:
answer = OBJ_NEW(opal_buffer_t);
@ -1337,6 +1399,9 @@ static char *get_orted_comm_cmd_str(int command)
case ORTE_DAEMON_GET_MEMPROFILE:
return strdup("ORTE_DAEMON_GET_MEMPROFILE");
case ORTE_DAEMON_DVM_CLEANUP_JOB_CMD:
return strdup("ORTE_DAEMON_DVM_CLEANUP_JOB_CMD");
default:
return strdup("Unknown Command!");
}

Просмотреть файл

@ -345,7 +345,7 @@ static void dump_aborted_procs(void)
/* find the job that caused the problem */
n = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&job, &nptr);
while (OPAL_SUCCESS == n) {
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
if (NULL == job || job->jobid == ORTE_PROC_MY_NAME->jobid) {
goto next;
}
if (ORTE_JOB_STATE_UNDEF != job->state &&

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2007-2009 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -75,6 +75,7 @@
#include "opal/class/opal_pointer_array.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/grpcomm/grpcomm.h"
#include "orte/mca/odls/odls.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rml/base/rml_contact.h"
@ -519,6 +520,8 @@ static void notify_requestor(int sd, short args, void *cbdata)
orte_proc_t *pptr;
int ret, id, *idptr;
opal_buffer_t *reply;
orte_daemon_cmd_flag_t command;
orte_grpcomm_signature_t *sig;
/* notify the requestor */
reply = OBJ_NEW(opal_buffer_t);
@ -557,6 +560,24 @@ static void notify_requestor(int sd, short args, void *cbdata)
ORTE_RML_TAG_NOTIFY_COMPLETE,
send_callback, jdata);
/* now ensure that _all_ daemons know that this job has terminated so even
* those that did not participate in it will know to cleanup the resources
* they assigned to the job. This is necessary now that the mapping function
* has been moved to the backend daemons - otherwise, non-participating daemons
* retain the slot assignments on the participating daemons, and then incorrectly
* map subsequent jobs thinking those nodes are still "busy" */
reply = OBJ_NEW(opal_buffer_t);
command = ORTE_DAEMON_DVM_CLEANUP_JOB_CMD;
opal_dss.pack(reply, &command, 1, ORTE_DAEMON_CMD);
opal_dss.pack(reply, &jdata->jobid, 1, ORTE_JOBID);
sig = OBJ_NEW(orte_grpcomm_signature_t);
sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
sig->signature[0].vpid = ORTE_VPID_WILDCARD;
orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
/* we cannot cleanup the job object as we might
* hit an error during transmission, so clean it
* up in the send callback */