1
1
openmpi/orte/mca/rmgr/base/rmgr_base_unpack.c
Josh Hursey d1e1a68645 This commit contains the necessary changes to get "mpirun a.out" working
correctly with MPI_Comm_spawn.

The problem wiht MPI_Comm_spawn was that the 'parent' process was 
rmgr.create'ing and then rmgr.launch'ing the children via the rmgr proxy
component. The HNP saw these commands and processed them normally, but
since we never went through the HNP's rmgr (urm component) spawn() 
logic the triggers and key/value pairs were never created. So the
children were launched correctly, but since the HNP did not
have any triggers setup, never triggered the xcast for the
children to finish orte_init().

This fix puts the trigger and key/value pair initialization in 
rmgr_urm_spawn() for the 'mpirun a.out' case, *and* in the 
rmgr_base_unpack routine that deals with the creation of the
job for the child as requested by the proxy component. This
will allow the triggers to be registered for the proxy's request
which only happens during MPI_Comm_spawn*

Small change for a lot of debugging. Notice that his reverts r11037
to its previous version, and adds a newline to handle the spawn
cases.

This commit was SVN r11046.

The following SVN revision numbers were found above:
  r11037 --> open-mpi/ompi@5813fb7d2a
2006-07-28 17:17:31 +00:00

284 строки
6.9 KiB
C

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "orte_config.h"
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <string.h>
#include "orte/orte_constants.h"
#include "opal/util/output.h"
#include "opal/util/trace.h"
#include "orte/dss/dss.h"
#include "orte/mca/rmgr/base/base.h"
#include "orte/mca/errmgr/errmgr.h"
/*
*
*/
static int orte_rmgr_base_cmd_query(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc = orte_rmgr.query();
OPAL_TRACE(4);
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_create(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
int32_t ret;
orte_app_context_t** context;
orte_jobid_t jobid;
size_t i, cnt, num_context;
OPAL_TRACE(4);
cnt = 1;
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &num_context, &cnt, ORTE_SIZE))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(NULL == (context = malloc(sizeof(orte_app_context_t*)*num_context))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
return ORTE_ERR_OUT_OF_RESOURCE;
}
cnt = num_context;
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, context, &cnt, ORTE_APP_CONTEXT))) {
ORTE_ERROR_LOG(rc);
free(context);
return rc;
}
ret = orte_rmgr.create(context, num_context, &jobid);
ret = orte_rmgr_base_proc_stage_gate_init(jobid);
if(ORTE_SUCCESS != (rc = orte_dss.pack(rsp, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if(ORTE_SUCCESS != (rc = orte_dss.pack(rsp, &ret, 1, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
cleanup:
for(i=0; i<num_context; i++) {
OBJ_RELEASE(context[i]);
}
free(context);
return ret;
}
static int orte_rmgr_base_cmd_allocate(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc;
orte_jobid_t jobid;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.allocate(jobid);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_deallocate(orte_buffer_t* req, orte_buffer_t* rsp)
{
int32_t rc;
orte_jobid_t jobid;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.deallocate(jobid);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_map(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.map(jobid);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_launch(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.launch(jobid);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_term_job(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.terminate_job(jobid);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_term_proc(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_process_name_t name;
size_t cnt = 1;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &name, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
} else {
rc = orte_rmgr.terminate_proc(&name);
}
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_signal_job(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_jobid_t jobid;
size_t cnt = 1;
int32_t signal;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &jobid, &cnt, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &signal, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_rmgr.signal_job(jobid, signal);
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
static int orte_rmgr_base_cmd_signal_proc(orte_buffer_t* req, orte_buffer_t* rsp)
{
int rc;
orte_process_name_t name;
size_t cnt = 1;
int32_t signal;
OPAL_TRACE(4);
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &name, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
}
if(ORTE_SUCCESS != (rc = orte_dss.unpack(req, &signal, &cnt, ORTE_INT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
rc = orte_rmgr.signal_proc(&name, signal);
return orte_dss.pack(rsp, &rc, 1, ORTE_INT32);
}
int orte_rmgr_base_cmd_dispatch(orte_buffer_t* req, orte_buffer_t* rsp)
{
orte_rmgr_cmd_t cmd;
size_t cnt = 1;
int rc;
OPAL_TRACE(4);
rc = orte_dss.unpack(req, &cmd, &cnt, ORTE_RMGR_CMD);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
return rc;
}
switch(cmd) {
case ORTE_RMGR_CMD_QUERY:
return orte_rmgr_base_cmd_query(req,rsp);
case ORTE_RMGR_CMD_CREATE:
return orte_rmgr_base_cmd_create(req,rsp);
case ORTE_RMGR_CMD_ALLOCATE:
return orte_rmgr_base_cmd_allocate(req,rsp);
case ORTE_RMGR_CMD_DEALLOCATE:
return orte_rmgr_base_cmd_deallocate(req,rsp);
case ORTE_RMGR_CMD_MAP:
return orte_rmgr_base_cmd_map(req,rsp);
case ORTE_RMGR_CMD_LAUNCH:
return orte_rmgr_base_cmd_launch(req,rsp);
case ORTE_RMGR_CMD_TERM_JOB:
return orte_rmgr_base_cmd_term_job(req,rsp);
case ORTE_RMGR_CMD_TERM_PROC:
return orte_rmgr_base_cmd_term_proc(req,rsp);
case ORTE_RMGR_CMD_SIGNAL_JOB:
return orte_rmgr_base_cmd_signal_job(req,rsp);
case ORTE_RMGR_CMD_SIGNAL_PROC:
return orte_rmgr_base_cmd_signal_proc(req,rsp);
default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
}