1
1

Enable substitution of the communication calls in the orted when sending messages back to the HNP by creating a function for this purpose and saving the pointer to it in orte_odls_base. Higher level libraries can then override the default function to use their own method.

This commit was SVN r22950.
Этот коммит содержится в:
Ralph Castain 2010-04-09 18:50:10 +00:00
родитель c32f046d7c
Коммит 4f8279df3d
5 изменённых файлов: 107 добавлений и 132 удалений

Просмотреть файл

@ -41,6 +41,13 @@ ORTE_DECLSPEC int orte_odls_base_open(void);
#if !ORTE_DISABLE_FULL_SUPPORT
typedef void (*orte_odls_base_cbfunc_t)(int fd, short event, void *data);
typedef int (*orte_odls_base_comm_fn_t)(orte_process_name_t *recipient,
opal_buffer_t *buf,
orte_rml_tag_t tag,
orte_odls_base_cbfunc_t cbfunc);
/**
* Struct to hold globals for the odls framework
*/
@ -53,6 +60,8 @@ typedef struct orte_odls_base_t {
opal_list_t available_components;
/** selected component */
orte_odls_base_component_t selected_component;
/* comm fn for updating state */
orte_odls_base_comm_fn_t comm;
} orte_odls_base_t;
/**
@ -82,6 +91,10 @@ ORTE_DECLSPEC void orte_base_default_waitpid_fired(orte_process_name_t *proc, in
/* setup singleton job data */
ORTE_DECLSPEC void orte_odls_base_setup_singleton_jobdat(orte_jobid_t jobid);
ORTE_DECLSPEC int orte_odls_base_comm(orte_process_name_t *recipient,
opal_buffer_t *buf, orte_rml_tag_t tag,
orte_odls_base_cbfunc_t cbfunc);
#endif /* ORTE_DISABLE_FULL_SUPPORT */
END_C_DECLS

Просмотреть файл

@ -575,7 +575,7 @@ wireup:
int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_jobid_t *job)
{
int rc, ret;
int rc;
orte_vpid_t j, host_daemon;
orte_odls_child_t *child;
orte_std_cntr_t cnt;
@ -1011,22 +1011,16 @@ REPORT_ERROR:
OBJ_CONSTRUCT(&alert, opal_buffer_t);
*job = ORTE_JOBID_INVALID;
opal_dss.pack(&alert, job, 1, ORTE_JOBID);
/* if we are the HNP, then we would rather not send this to ourselves -
* instead, we queue it up for local processing
*/
if (ORTE_PROC_IS_HNP) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch);
} else {
/* go ahead and send the update to the HNP */
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret);
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_odls_base.comm(ORTE_PROC_MY_HNP,
&alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch))) {
ORTE_ERROR_LOG(rc);
}
/* cleanup */
OBJ_DESTRUCT(&alert);
/* cleanup */
if (NULL != app_idx) {
free(app_idx);
app_idx = NULL;
@ -1974,25 +1968,11 @@ CLEANUP:
ORTE_ERROR_LOG(ret);
}
/* if we are the HNP, then we would rather not send this to ourselves -
* instead, we queue it up for local processing
*/
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:launch flagging launch report to myself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch);
} else {
OPAL_OUTPUT_VERBOSE((5, orte_odls_globals.output,
"%s odls:launch sending launch report to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_HNP)));
/* go ahead and send the update to the HNP */
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK, 0))) {
ORTE_ERROR_LOG(ret);
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_odls_base.comm(ORTE_PROC_MY_HNP,
&alert, ORTE_RML_TAG_APP_LAUNCH_CALLBACK,
orte_plm_base_app_report_launch))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&alert);
@ -2278,7 +2258,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
opal_list_item_t *item;
orte_odls_child_t *child;
orte_std_cntr_t cnt;
int rc;
int rc=ORTE_SUCCESS;
bool found=false, registering=false;
int8_t flag;
orte_odls_job_t *jobdat, *jdat;
@ -2400,7 +2380,8 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
goto CLEANUP;
}
}
rc = ORTE_SUCCESS;
OBJ_DESTRUCT(&buffer);
/* if we are deregistering, then we are done */
@ -2432,20 +2413,11 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
OBJ_DESTRUCT(&buffer);
goto CLEANUP;
}
/* if we are the HNP, then we would rather not send this to ourselves -
* instead, we queue it up for local processing
*/
if (ORTE_PROC_IS_HNP) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &buffer,
ORTE_RML_TAG_INIT_ROUTES,
orte_routed_base_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_INIT_ROUTES, 0))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buffer);
goto CLEANUP;
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_odls_base.comm(ORTE_PROC_MY_HNP,
&buffer, ORTE_RML_TAG_INIT_ROUTES,
orte_routed_base_process_msg))) {
ORTE_ERROR_LOG(rc);
}
OBJ_DESTRUCT(&buffer);
}
@ -2453,7 +2425,7 @@ int orte_odls_base_default_require_sync(orte_process_name_t *proc,
CLEANUP:
opal_condition_signal(&orte_odls_globals.cond);
OPAL_THREAD_UNLOCK(&orte_odls_globals.mutex);
return ORTE_SUCCESS;
return rc;
}
static bool any_live_children(orte_jobid_t job)
@ -2563,19 +2535,11 @@ static void check_proc_complete(orte_odls_child_t *child)
/* release the child object */
OBJ_RELEASE(child);
/* if we are the HNP, then we would rather not send this to ourselves -
* instead, we queue it up for local processing
*/
if (ORTE_PROC_IS_HNP) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
ORTE_ERROR_LOG(rc);
goto unlock;
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_odls_base.comm(ORTE_PROC_MY_HNP,
&alert, ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg))) {
ORTE_ERROR_LOG(rc);
}
} else {
/* since it didn't abort, let's see if all of that job's procs are done */
@ -2619,19 +2583,11 @@ static void check_proc_complete(orte_odls_child_t *child)
opal_list_remove_item(&orte_local_jobdata, &jdat->super);
OBJ_RELEASE(jdat);
/* if we are the HNP, then we would rather not send this to ourselves -
* instead, we queue it up for local processing
*/
if (ORTE_PROC_IS_HNP) {
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, &alert,
ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg);
} else {
/* go ahead and send it */
if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &alert, ORTE_RML_TAG_PLM, 0))) {
ORTE_ERROR_LOG(rc);
goto unlock;
}
/* send it */
if (ORTE_SUCCESS != (rc = orte_odls_base.comm(ORTE_PROC_MY_HNP,
&alert, ORTE_RML_TAG_PLM,
orte_plm_base_receive_process_msg))) {
ORTE_ERROR_LOG(rc);
}
}
}
@ -3221,3 +3177,26 @@ int orte_odls_base_get_proc_stats(opal_buffer_t *answer,
return ORTE_SUCCESS;
}
int orte_odls_base_comm(orte_process_name_t *recipient,
opal_buffer_t *buf, orte_rml_tag_t tag,
orte_odls_base_cbfunc_t cbfunc)
{
int ret;
if (recipient->jobid == ORTE_PROC_MY_NAME->jobid &&
recipient->vpid == ORTE_PROC_MY_NAME->vpid &&
NULL != cbfunc) {
/* if I am the recipient and a direct fn is provided, use a message event */
ORTE_MESSAGE_EVENT(ORTE_PROC_MY_NAME, buf, tag, cbfunc);
ret = ORTE_SUCCESS;
} else {
/* go ahead and send it */
if (0 > (ret = orte_rml.send_buffer(recipient, buf, tag, 0))) {
ORTE_ERROR_LOG(ret);
} else {
ret = ORTE_SUCCESS;
}
}
return ret;
}

Просмотреть файл

@ -209,6 +209,7 @@ int orte_odls_base_open(void)
orte_odls_globals.debugger = NULL;
orte_odls_globals.debugger_launched = false;
OBJ_CONSTRUCT(&orte_odls_globals.sysinfo, opal_list_t);
orte_odls_base.comm = orte_odls_base_comm;
/* get any external processor bindings */
OPAL_PAFFINITY_CPU_ZERO(orte_odls_globals.my_cores);

Просмотреть файл

@ -39,8 +39,11 @@ ORTE_DECLSPEC void orte_daemon_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata);
/* direct cmd processing entry point - used by HNP */
/* direct cmd processing entry points */
ORTE_DECLSPEC void orte_daemon_cmd_processor(int fd, short event, void *data);
ORTE_DECLSPEC int orte_daemon_process_commands(orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
/* a time flag that needs to be visible elsewhere */
ORTE_DECLSPEC extern struct timeval orte_daemon_msg_recvd;

Просмотреть файл

@ -78,9 +78,6 @@
/*
* Globals
*/
static int process_commands(orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag);
static char *get_orted_comm_cmd_str(int command);
/* instantiate this - it is shared via orted.h */
@ -124,7 +121,9 @@ static void send_relay(opal_buffer_t *buf)
ORTE_VPID_PRINT(nm->vpid)));
target.vpid = nm->vpid;
if (0 > (ret = orte_rml.send_buffer(&target, buf, ORTE_RML_TAG_DAEMON, 0))) {
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(&target,
buf, ORTE_RML_TAG_DAEMON,
orte_daemon_cmd_processor))) {
ORTE_ERROR_LOG(ret);
goto CLEANUP;
}
@ -304,7 +303,7 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
buffer->unpack_ptr = buffer->base_ptr + save_rel;
/* process the command */
if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) {
if (ORTE_SUCCESS != (ret = orte_daemon_process_commands(sender, buffer, tag))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s orte:daemon:cmd:processor failed on error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_ERROR_NAME(ret)));
@ -319,7 +318,7 @@ void orte_daemon_cmd_processor(int fd, short event, void *data)
}
/* process the command */
if (ORTE_SUCCESS != (ret = process_commands(sender, buffer, tag))) {
if (ORTE_SUCCESS != (ret = orte_daemon_process_commands(sender, buffer, tag))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s orte:daemon:cmd:processor failed on error %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_ERROR_NAME(ret)));
@ -341,9 +340,9 @@ CLEANUP:
return;
}
static int process_commands(orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
int orte_daemon_process_commands(orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag)
{
orte_daemon_cmd_flag_t command;
opal_buffer_t *relay_msg;
@ -689,9 +688,9 @@ static int process_commands(orte_process_name_t* sender,
goto CLEANUP;
}
/* return response */
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer,
ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
break;
@ -719,9 +718,8 @@ static int process_commands(orte_process_name_t* sender,
goto CLEANUP;
}
if (0 > orte_rml.send_buffer(sender, answer, tag, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, tag, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
break;
@ -744,11 +742,10 @@ static int process_commands(orte_process_name_t* sender,
OBJ_RELEASE(answer);
goto CLEANUP;
}
/* callback function will release buffer */
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
} else {
/* if we are the HNP, process the request */
int32_t i, num_jobs;
@ -816,12 +813,11 @@ static int process_commands(orte_process_name_t* sender,
}
}
}
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
}
}
break;
/**** REPORT_NODE_INFO_CMD COMMAND ****/
@ -842,9 +838,8 @@ static int process_commands(orte_process_name_t* sender,
OBJ_RELEASE(answer);
goto CLEANUP;
}
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
} else {
@ -911,9 +906,8 @@ static int process_commands(orte_process_name_t* sender,
}
}
/* send the info */
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
}
@ -937,10 +931,10 @@ static int process_commands(orte_process_name_t* sender,
OBJ_RELEASE(answer);
goto CLEANUP;
}
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
} else {
/* if we are the HNP, process the request */
orte_job_t *jdata;
@ -1018,9 +1012,8 @@ static int process_commands(orte_process_name_t* sender,
}
}
/* send the info */
if (0 > orte_rml.send_buffer(sender, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(sender, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
}
@ -1100,10 +1093,6 @@ static int process_commands(orte_process_name_t* sender,
goto SEND_TOP_ANSWER;
}
/* the callback function will release relay_msg buffer */
#if 0
if (0 > orte_rml.send_buffer_nb(&proc2, relay_msg, ORTE_RML_TAG_DAEMON, 0,
send_callback, NULL)) {
#endif
if (0 > orte_rml.send_buffer(&proc2, relay_msg, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(relay_msg);
@ -1154,10 +1143,6 @@ static int process_commands(orte_process_name_t* sender,
goto SEND_TOP_ANSWER;
}
/* the callback function will release relay_msg buffer */
#if 0
if (0 > orte_rml.send_buffer_nb(&proc2, relay_msg, ORTE_RML_TAG_DAEMON, 0,
send_callback, NULL)) {
#endif
if (0 > orte_rml.send_buffer(&proc2, relay_msg, ORTE_RML_TAG_DAEMON, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(relay_msg);
@ -1191,16 +1176,14 @@ static int process_commands(orte_process_name_t* sender,
}
}
SEND_TOP_ANSWER:
/* send the answer back to requester - callback
* function will release buffer
*/
/* send the answer back to requester */
if (ORTE_PROC_IS_HNP) {
/* if I am the HNP, I need to also provide the number of
* replies the caller should recv and the sample time
*/
time_t mytime;
char *cptr;
relay_msg = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(relay_msg, &num_replies, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(ret);
@ -1222,21 +1205,17 @@ static int process_commands(orte_process_name_t* sender,
ret = ORTE_ERR_COMM_FAILURE;
break;
}
#if 0
if (0 > orte_rml.send_buffer_nb(return_addr, answer, ORTE_RML_TAG_TOOL, 0,
send_callback, NULL)) {
#endif
if (0 > orte_rml.send_buffer(return_addr, answer, ORTE_RML_TAG_TOOL, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
ret = ORTE_ERR_COMM_FAILURE;
if (ORTE_SUCCESS != (ret = orte_odls_base.comm(return_addr, answer, ORTE_RML_TAG_TOOL, NULL))) {
ORTE_ERROR_LOG(ret);
}
OBJ_RELEASE(answer);
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
ret = ORTE_ERR_BAD_PARAM;
}
CLEANUP:
return ret;
}