1
1
openmpi/orte/mca/rmaps/base/rmaps_base_receive.c
Ralph Castain f4a458532b This doesn't totally resolve the comm_spawn problem, but it helps a little. I'll continue working on it and hope to resolve it completely shortly. The issue primarily centers on where to start mapping the child job's processes, and how to deal with oversubscription that might result. At the moment, I am trying to resolve the first issue first (hey, that even sounds right!).
This change does a couple of things:

1. Since the USE_PARENT_ALLOC attribute is a directive about regarding allocation of resources to a job, it more properly should be an attribute of the RAS. Change the name to reflect that and move the attribute define to the ras_types.h file.

2. Add the attributes list to the RMAPS map_job interface. This provides us with the desired flexibility to dynamically specify directives for mapping. The system will - in the absence of any attribute-based directive - default to the values provided in the MCA parameters (either from environment or command-line interface).

This commit was SVN r12164.
2006-10-18 14:01:44 +00:00

162 строки
4.3 KiB
C

/* -*- C -*-
*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "orte/orte_constants.h"
#include "orte/orte_types.h"
#include "opal/util/output.h"
#include "opal/mca/mca.h"
#include "opal/mca/base/mca_base_param.h"
#include "orte/dss/dss.h"
#include "orte/util/proc_info.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/rmaps.h"
#include "orte/mca/rml/rml.h"
#include "orte/mca/rmaps/base/rmaps_private.h"
static bool recv_issued=false;
int orte_rmaps_base_comm_start(void)
{
int rc;
if (recv_issued) {
return ORTE_SUCCESS;
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_RML_NAME_ANY,
ORTE_RML_TAG_RMAPS,
ORTE_RML_PERSISTENT,
orte_rmaps_base_recv,
NULL))) {
ORTE_ERROR_LOG(rc);
}
recv_issued = true;
return rc;
}
int orte_rmaps_base_comm_stop(void)
{
int rc;
if (!recv_issued) {
return ORTE_SUCCESS;
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_RML_NAME_ANY, ORTE_RML_TAG_RMAPS))) {
ORTE_ERROR_LOG(rc);
}
recv_issued = false;
return rc;
}
/*
* handle message from proxies
* NOTE: The incoming buffer "buffer" is OBJ_RELEASED by the calling program.
* DO NOT RELEASE THIS BUFFER IN THIS CODE
*/
void orte_rmaps_base_recv(int status, orte_process_name_t* sender,
orte_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_buffer_t answer;
orte_rmaps_cmd_flag_t command;
orte_std_cntr_t count;
orte_jobid_t job;
opal_list_t attrs;
opal_list_item_t *item;
int rc;
/* get the command */
count = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &command, &count, ORTE_RMAPS_CMD))) {
ORTE_ERROR_LOG(rc);
return;
}
/* setup to return an answer */
OBJ_CONSTRUCT(&answer, orte_buffer_t);
/* pack the command in the answer - this is done to allow the caller to check
* that we are talking about the same command
*/
if (ORTE_SUCCESS != (rc = orte_dss.pack(&answer, &command, 1, ORTE_RMAPS_CMD))) {
ORTE_ERROR_LOG(rc);
return;
}
switch (command) {
case ORTE_RMAPS_MAP_CMD:
/* get the jobid */
count = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &job, &count, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto SEND_ANSWER;
}
/* get any attributes */
OBJ_CONSTRUCT(&attrs, opal_list_t);
count = 1;
if (ORTE_SUCCESS != (rc = orte_dss.unpack(buffer, &attrs, &count, ORTE_ATTR_LIST))) {
ORTE_ERROR_LOG(rc);
goto SEND_ANSWER;
}
/* process the request */
if (ORTE_SUCCESS != (rc = orte_rmaps.map_job(job, &attrs))) {
ORTE_ERROR_LOG(rc);
goto SEND_ANSWER;
}
while (NULL != (item = opal_list_remove_first(&attrs))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&attrs);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
}
SEND_ANSWER:
if (0 > orte_rml.send_buffer(sender, &answer, tag, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
}
/* cleanup */
OBJ_DESTRUCT(&answer);
}