1
1

Create a new message handling method for serializing responses. Place recvd messages on a list, using a file descriptor and the event library to trigger processing. This is identical in design to what is used in the IOF.

Use it first in the plm_base_receive to serialize multiple comm_spawn and update_proc requests.

This commit was SVN r21717.
Этот коммит содержится в:
Ralph Castain 2009-07-19 18:07:04 +00:00
родитель 1d74ab6e3c
Коммит 1a5f7245c8
4 изменённых файлов: 320 добавлений и 213 удалений

Просмотреть файл

@ -54,6 +54,13 @@
#include "orte/mca/plm/base/base.h"
static bool recv_issued=false;
static opal_mutex_t lock;
static opal_list_t recvs;
static opal_event_t ready;
static int ready_fd[2];
static bool processing;
static void process_msg(int fd, short event, void *data);
int orte_plm_base_comm_start(void)
{
@ -67,6 +74,13 @@ int orte_plm_base_comm_start(void)
"%s plm:base:receive start comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
processing = false;
OBJ_CONSTRUCT(&lock, opal_mutex_t);
OBJ_CONSTRUCT(&recvs, opal_list_t);
pipe(ready_fd);
opal_event_set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL);
opal_event_add(&ready, 0);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_PLM,
ORTE_RML_NON_PERSISTENT,
@ -86,6 +100,12 @@ int orte_plm_base_comm_stop(void)
return ORTE_SUCCESS;
}
OBJ_DESTRUCT(&recvs);
opal_event_del(&ready);
close(ready_fd[0]);
processing = false;
OBJ_DESTRUCT(&lock);
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive stop comm",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -98,9 +118,9 @@ int orte_plm_base_comm_stop(void)
/* process incoming messages in order of receipt */
void orte_plm_base_receive_process_msg(int fd, short event, void *data)
void process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
orte_msg_packet_t *msgpkt;
orte_plm_cmd_flag_t command;
orte_std_cntr_t count;
orte_jobid_t job;
@ -110,16 +130,29 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
orte_proc_t *proc;
orte_proc_state_t state;
orte_exit_code_t exit_code;
int rc, ret;
int rc=ORTE_SUCCESS, ret;
struct timeval beat;
orte_app_context_t *app, *child_app;
opal_list_item_t *item;
int dump[128];
OPAL_THREAD_LOCK(&lock);
/* tag that we are processing the list */
processing = true;
/* clear the file descriptor to stop the event from refiring */
read(fd, &dump, sizeof(dump));
while (NULL != (item = opal_list_remove_first(&recvs))) {
msgpkt = (orte_msg_packet_t*)item;
/* setup a default response */
OBJ_CONSTRUCT(&answer, opal_buffer_t);
job = ORTE_JOBID_INVALID;
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_PLM_CMD))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &command, &count, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
@ -132,7 +165,7 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
/* unpack the job object */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &jdata, &count, ORTE_JOB))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &jdata, &count, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
goto ANSWER_LAUNCH;
}
@ -155,7 +188,7 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
job = jdata->jobid;
} else { /* this is a GLOBAL launch cmd */
/* get the parent's job object */
if (NULL == (parent = orte_get_job_data_object(mev->sender.jobid))) {
if (NULL == (parent = orte_get_job_data_object(msgpkt->sender.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
goto ANSWER_LAUNCH;
}
@ -181,7 +214,7 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
}
/* find the sender's node in the job map */
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, mev->sender.vpid))) {
if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, msgpkt->sender.vpid))) {
/* set the bookmark so the child starts from that place - this means
* that the first child process could be co-located with the proc
* that called comm_spawn, assuming slots remain on that node. Otherwise,
@ -218,15 +251,18 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
}
/* send the response back to the sender */
if (0 > (ret = orte_rml.send_buffer(&mev->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) {
if (0 > (ret = orte_rml.send_buffer(&msgpkt->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) {
ORTE_ERROR_LOG(ret);
}
break;
case ORTE_PLM_UPDATE_PROC_STATE:
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive update proc state command",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
count = 1;
jdata = NULL;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &job, &count, ORTE_JOBID))) {
while (ORTE_SUCCESS == (rc = opal_dss.unpack(msgpkt->buffer, &job, &count, ORTE_JOBID))) {
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got update_proc_state for job %s",
@ -241,20 +277,20 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
goto CLEANUP;
}
count = 1;
while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &vpid, &count, ORTE_VPID))) {
while (ORTE_SUCCESS == (rc = opal_dss.unpack(msgpkt->buffer, &vpid, &count, ORTE_VPID))) {
if (ORTE_VPID_INVALID == vpid) {
/* flag indicates that this job is complete - move on */
break;
}
/* unpack the state */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &state, &count, ORTE_PROC_STATE))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &state, &count, ORTE_PROC_STATE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* unpack the exit code */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &exit_code, &count, ORTE_EXIT_CODE))) {
if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &exit_code, &count, ORTE_EXIT_CODE))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
@ -310,7 +346,7 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive got heartbeat from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&mev->sender)));
ORTE_NAME_PRINT(&msgpkt->sender)));
/* lookup the daemon object */
if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
/* this job can not possibly have been removed, so this is an error */
@ -318,12 +354,12 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
goto CLEANUP;
}
gettimeofday(&beat, NULL);
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, mev->sender.vpid))) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, msgpkt->sender.vpid))) {
/* this proc is no longer in table - skip it */
OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output,
"%s plm:base:receive daemon %s is not in proc table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_VPID_PRINT(mev->sender.vpid)));
ORTE_VPID_PRINT(msgpkt->sender.vpid)));
break;
}
proc->beat = beat.tv_sec;
@ -331,18 +367,32 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data)
default:
ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS);
return;
rc = ORTE_ERR_VALUE_OUT_OF_BOUNDS;
break;
}
CLEANUP:
/* release the message */
OBJ_RELEASE(mev);
OBJ_RELEASE(msgpkt);
OBJ_DESTRUCT(&answer);
if (ORTE_SUCCESS != rc) {
goto DEPART;
}
}
/* reset the event */
processing = false;
opal_event_add(&ready, 0);
DEPART:
/* release the thread */
OPAL_THREAD_UNLOCK(&lock);
/* see if an error occurred - if so, wakeup the HNP so we can exit */
if (ORTE_PROC_IS_HNP && ORTE_SUCCESS != rc) {
orte_trigger_event(&orte_exit);
}
}
/*
@ -371,7 +421,7 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
* buffer, however, is NOT released here, although its payload IS transferred
* to the message buffer for later processing
*/
ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_receive_process_msg);
ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], true, sender, &buffer);
/* reissue the recv */
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
@ -385,3 +435,11 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender,
return;
}
/* where HNP messages come */
void orte_plm_base_receive_process_msg(int fd, short event, void *data)
{
orte_message_event_t *mev = (orte_message_event_t*)data;
ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], false, &mev->sender, &mev->buffer);
OBJ_RELEASE(mev);
}

Просмотреть файл

@ -58,6 +58,22 @@ orte_rml_component_t *orte_rml_component = NULL;
static bool component_open_called = false;
/* instantiate the msg_pkt object */
static void msg_pkt_constructor(orte_msg_packet_t *pkt)
{
pkt->buffer = NULL;
}
static void msg_pkt_destructor(orte_msg_packet_t *pkt)
{
if (NULL != pkt->buffer) {
OBJ_RELEASE(pkt->buffer);
}
}
OBJ_CLASS_INSTANCE(orte_msg_packet_t,
opal_list_item_t,
msg_pkt_constructor,
msg_pkt_destructor);
int
orte_rml_base_open(void)
{

Просмотреть файл

@ -38,11 +38,11 @@
#endif
#include "opal/mca/mca.h"
#include "orte/mca/rml/rml_types.h"
#include "opal/mca/crs/crs.h"
#include "opal/mca/crs/base/base.h"
#include "orte/mca/rml/rml_types.h"
BEGIN_C_DECLS

Просмотреть файл

@ -27,6 +27,7 @@
#include "orte_config.h"
#include "orte/constants.h"
#include "orte/types.h"
#include <limits.h>
#ifdef HAVE_SYS_UIO_H
@ -37,12 +38,44 @@
#include <net/uio.h>
#endif
#include "opal/dss/dss_types.h"
#include "opal/class/opal_list.h"
BEGIN_C_DECLS
/* ******************************************************************** */
typedef struct {
opal_list_item_t super;
orte_process_name_t sender;
opal_buffer_t *buffer;
} orte_msg_packet_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t);
#define ORTE_PROCESS_MESSAGE(rlist, lck, flg, fd, crt, sndr, buf) \
do { \
orte_msg_packet_t *pkt; \
int data=1; \
pkt = OBJ_NEW(orte_msg_packet_t); \
pkt->sender.jobid = (sndr)->jobid; \
pkt->sender.vpid = (sndr)->vpid; \
if ((crt)) { \
pkt->buffer = OBJ_NEW(opal_buffer_t); \
opal_dss.copy_payload(pkt->buffer, *(buf)); \
} else { \
pkt->buffer = *(buf); \
*(buf) = NULL; \
} \
OPAL_THREAD_LOCK((lck)); \
opal_list_append((rlist), &pkt->super); \
if (!(flg)) { \
write((fd), &data, sizeof(data)); \
} \
OPAL_THREAD_UNLOCK((lck)); \
} while(0);
/**
* Constant tag values for well-known services