diff --git a/orte/mca/plm/base/plm_base_receive.c b/orte/mca/plm/base/plm_base_receive.c index b5fa5b8f6e..799d30fe3a 100644 --- a/orte/mca/plm/base/plm_base_receive.c +++ b/orte/mca/plm/base/plm_base_receive.c @@ -54,6 +54,13 @@ #include "orte/mca/plm/base/base.h" static bool recv_issued=false; +static opal_mutex_t lock; +static opal_list_t recvs; +static opal_event_t ready; +static int ready_fd[2]; +static bool processing; + +static void process_msg(int fd, short event, void *data); int orte_plm_base_comm_start(void) { @@ -67,6 +74,13 @@ int orte_plm_base_comm_start(void) "%s plm:base:receive start comm", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + processing = false; + OBJ_CONSTRUCT(&lock, opal_mutex_t); + OBJ_CONSTRUCT(&recvs, opal_list_t); + pipe(ready_fd); + opal_event_set(&ready, ready_fd[0], OPAL_EV_READ, process_msg, NULL); + opal_event_add(&ready, 0); + if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLM, ORTE_RML_NON_PERSISTENT, @@ -86,6 +100,12 @@ int orte_plm_base_comm_stop(void) return ORTE_SUCCESS; } + OBJ_DESTRUCT(&recvs); + opal_event_del(&ready); + close(ready_fd[0]); + processing = false; + OBJ_DESTRUCT(&lock); + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, "%s plm:base:receive stop comm", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); @@ -98,9 +118,9 @@ int orte_plm_base_comm_stop(void) /* process incoming messages in order of receipt */ -void orte_plm_base_receive_process_msg(int fd, short event, void *data) +void process_msg(int fd, short event, void *data) { - orte_message_event_t *mev = (orte_message_event_t*)data; + orte_msg_packet_t *msgpkt; orte_plm_cmd_flag_t command; orte_std_cntr_t count; orte_jobid_t job; @@ -110,239 +130,269 @@ void orte_plm_base_receive_process_msg(int fd, short event, void *data) orte_proc_t *proc; orte_proc_state_t state; orte_exit_code_t exit_code; - int rc, ret; + int rc=ORTE_SUCCESS, ret; struct timeval beat; orte_app_context_t *app, *child_app; + opal_list_item_t *item; + int dump[128]; - /* setup a default response */ - OBJ_CONSTRUCT(&answer, opal_buffer_t); - job = ORTE_JOBID_INVALID; - - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &command, &count, ORTE_PLM_CMD))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } + OPAL_THREAD_LOCK(&lock); - switch (command) { - case ORTE_PLM_LAUNCH_JOB_CMD: - OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive job launch command", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - - /* unpack the job object */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &jdata, &count, ORTE_JOB))) { - ORTE_ERROR_LOG(rc); - goto ANSWER_LAUNCH; - } - - /* if is a LOCAL slave cmd */ - if (jdata->controls & ORTE_JOB_CONTROL_LOCAL_SLAVE) { - /* In this case, I cannot lookup job info. All I do is pass - * this along to the local launcher, IF it is available - */ - if (NULL == orte_plm.spawn) { - /* can't do this operation */ - ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); - rc = ORTE_ERR_NOT_SUPPORTED; - goto ANSWER_LAUNCH; - } - if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { - ORTE_ERROR_LOG(rc); - goto ANSWER_LAUNCH; - } - job = jdata->jobid; - } else { /* this is a GLOBAL launch cmd */ - /* get the parent's job object */ - if (NULL == (parent = orte_get_job_data_object(mev->sender.jobid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - goto ANSWER_LAUNCH; - } - - /* if the prefix was set in the parent's job, we need to transfer - * that prefix to the child's app_context so any further launch of - * orteds can find the correct binary. There always has to be at - * least one app_context in both parent and child, so we don't - * need to check that here. However, be sure not to overwrite - * the prefix if the user already provided it! - */ - app = (orte_app_context_t*)opal_pointer_array_get_item(parent->apps, 0); - child_app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0); - if (NULL != app->prefix_dir && - NULL == child_app->prefix_dir) { - child_app->prefix_dir = strdup(app->prefix_dir); - } - - /* process any add-hostfile and add-host options that were provided */ - if (ORTE_SUCCESS != (rc = orte_ras_base_add_hosts(jdata))) { - ORTE_ERROR_LOG(rc); - goto ANSWER_LAUNCH; - } - - /* find the sender's node in the job map */ - if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, mev->sender.vpid))) { - /* set the bookmark so the child starts from that place - this means - * that the first child process could be co-located with the proc - * that called comm_spawn, assuming slots remain on that node. Otherwise, - * the procs will start on the next available node - */ - jdata->bookmark = proc->node; - } - - /* launch it */ - if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { - ORTE_ERROR_LOG(rc); - goto ANSWER_LAUNCH; - } - job = jdata->jobid; - - /* return the favor so that any repetitive comm_spawns track each other */ - parent->bookmark = jdata->bookmark; - } - - /* if the child is an ORTE job, wait for the procs to report they are alive */ - if (!(jdata->controls & ORTE_JOB_CONTROL_NON_ORTE_JOB)) { - ORTE_PROGRESSED_WAIT(false, jdata->num_reported, jdata->num_procs); - } + /* tag that we are processing the list */ + processing = true; - ANSWER_LAUNCH: - OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive job %s launched", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_JOBID_PRINT(job))); - - /* pack the jobid to be returned */ - if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) { - ORTE_ERROR_LOG(ret); - } - - /* send the response back to the sender */ - if (0 > (ret = orte_rml.send_buffer(&mev->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) { - ORTE_ERROR_LOG(ret); - } - break; - - case ORTE_PLM_UPDATE_PROC_STATE: - count = 1; - jdata = NULL; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &job, &count, ORTE_JOBID))) { - + /* clear the file descriptor to stop the event from refiring */ + read(fd, &dump, sizeof(dump)); + + while (NULL != (item = opal_list_remove_first(&recvs))) { + msgpkt = (orte_msg_packet_t*)item; + + /* setup a default response */ + OBJ_CONSTRUCT(&answer, opal_buffer_t); + job = ORTE_JOBID_INVALID; + + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &command, &count, ORTE_PLM_CMD))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + + switch (command) { + case ORTE_PLM_LAUNCH_JOB_CMD: OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive got update_proc_state for job %s", + "%s plm:base:receive job launch command", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* unpack the job object */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &jdata, &count, ORTE_JOB))) { + ORTE_ERROR_LOG(rc); + goto ANSWER_LAUNCH; + } + + /* if is a LOCAL slave cmd */ + if (jdata->controls & ORTE_JOB_CONTROL_LOCAL_SLAVE) { + /* In this case, I cannot lookup job info. All I do is pass + * this along to the local launcher, IF it is available + */ + if (NULL == orte_plm.spawn) { + /* can't do this operation */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); + rc = ORTE_ERR_NOT_SUPPORTED; + goto ANSWER_LAUNCH; + } + if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { + ORTE_ERROR_LOG(rc); + goto ANSWER_LAUNCH; + } + job = jdata->jobid; + } else { /* this is a GLOBAL launch cmd */ + /* get the parent's job object */ + if (NULL == (parent = orte_get_job_data_object(msgpkt->sender.jobid))) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + goto ANSWER_LAUNCH; + } + + /* if the prefix was set in the parent's job, we need to transfer + * that prefix to the child's app_context so any further launch of + * orteds can find the correct binary. There always has to be at + * least one app_context in both parent and child, so we don't + * need to check that here. However, be sure not to overwrite + * the prefix if the user already provided it! + */ + app = (orte_app_context_t*)opal_pointer_array_get_item(parent->apps, 0); + child_app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, 0); + if (NULL != app->prefix_dir && + NULL == child_app->prefix_dir) { + child_app->prefix_dir = strdup(app->prefix_dir); + } + + /* process any add-hostfile and add-host options that were provided */ + if (ORTE_SUCCESS != (rc = orte_ras_base_add_hosts(jdata))) { + ORTE_ERROR_LOG(rc); + goto ANSWER_LAUNCH; + } + + /* find the sender's node in the job map */ + if (NULL != (proc = (orte_proc_t*)opal_pointer_array_get_item(parent->procs, msgpkt->sender.vpid))) { + /* set the bookmark so the child starts from that place - this means + * that the first child process could be co-located with the proc + * that called comm_spawn, assuming slots remain on that node. Otherwise, + * the procs will start on the next available node + */ + jdata->bookmark = proc->node; + } + + /* launch it */ + if (ORTE_SUCCESS != (rc = orte_plm.spawn(jdata))) { + ORTE_ERROR_LOG(rc); + goto ANSWER_LAUNCH; + } + job = jdata->jobid; + + /* return the favor so that any repetitive comm_spawns track each other */ + parent->bookmark = jdata->bookmark; + } + + /* if the child is an ORTE job, wait for the procs to report they are alive */ + if (!(jdata->controls & ORTE_JOB_CONTROL_NON_ORTE_JOB)) { + ORTE_PROGRESSED_WAIT(false, jdata->num_reported, jdata->num_procs); + } + + ANSWER_LAUNCH: + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive job %s launched", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(job))); - /* lookup the job object */ - if (NULL == (jdata = orte_get_job_data_object(job))) { - /* this job may already have been removed from the array, so just cleanly - * ignore this request - */ - goto CLEANUP; + /* pack the jobid to be returned */ + if (ORTE_SUCCESS != (ret = opal_dss.pack(&answer, &job, 1, ORTE_JOBID))) { + ORTE_ERROR_LOG(ret); } + + /* send the response back to the sender */ + if (0 > (ret = orte_rml.send_buffer(&msgpkt->sender, &answer, ORTE_RML_TAG_PLM_PROXY, 0))) { + ORTE_ERROR_LOG(ret); + } + break; + + case ORTE_PLM_UPDATE_PROC_STATE: + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive update proc state command", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); count = 1; - while (ORTE_SUCCESS == (rc = opal_dss.unpack(mev->buffer, &vpid, &count, ORTE_VPID))) { - if (ORTE_VPID_INVALID == vpid) { - /* flag indicates that this job is complete - move on */ - break; - } - /* unpack the state */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &state, &count, ORTE_PROC_STATE))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } - /* unpack the exit code */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(mev->buffer, &exit_code, &count, ORTE_EXIT_CODE))) { - ORTE_ERROR_LOG(rc); - goto CLEANUP; - } + jdata = NULL; + while (ORTE_SUCCESS == (rc = opal_dss.unpack(msgpkt->buffer, &job, &count, ORTE_JOBID))) { OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive got update_proc_state for vpid %lu state %x exit_code %d", + "%s plm:base:receive got update_proc_state for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (unsigned long)vpid, (unsigned int)state, (int)exit_code)); + ORTE_JOBID_PRINT(job))); - /* retrieve the proc object */ - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) { - /* this proc is no longer in table - skip it */ + /* lookup the job object */ + if (NULL == (jdata = orte_get_job_data_object(job))) { + /* this job may already have been removed from the array, so just cleanly + * ignore this request + */ + goto CLEANUP; + } + count = 1; + while (ORTE_SUCCESS == (rc = opal_dss.unpack(msgpkt->buffer, &vpid, &count, ORTE_VPID))) { + if (ORTE_VPID_INVALID == vpid) { + /* flag indicates that this job is complete - move on */ + break; + } + /* unpack the state */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &state, &count, ORTE_PROC_STATE))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + /* unpack the exit code */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(msgpkt->buffer, &exit_code, &count, ORTE_EXIT_CODE))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive proc %s is not in proc table", + "%s plm:base:receive got update_proc_state for vpid %lu state %x exit_code %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_VPID_PRINT(vpid))); - continue; + (unsigned long)vpid, (unsigned int)state, (int)exit_code)); + + /* retrieve the proc object */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, vpid))) { + /* this proc is no longer in table - skip it */ + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive proc %s is not in proc table", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_VPID_PRINT(vpid))); + continue; + } + + /* update the termination counter IFF the state is changing to something + * indicating terminated + */ + if (ORTE_PROC_STATE_UNTERMINATED < state && + ORTE_PROC_STATE_UNTERMINATED > proc->state) { + ++jdata->num_terminated; + } + /* update the data */ + proc->state = state; + proc->exit_code = exit_code; + + /* update orte's exit status if it is non-zero */ + ORTE_UPDATE_EXIT_STATUS(exit_code); + } - - /* update the termination counter IFF the state is changing to something - * indicating terminated - */ - if (ORTE_PROC_STATE_UNTERMINATED < state && - ORTE_PROC_STATE_UNTERMINATED > proc->state) { - ++jdata->num_terminated; - } - /* update the data */ - proc->state = state; - proc->exit_code = exit_code; - - /* update orte's exit status if it is non-zero */ - ORTE_UPDATE_EXIT_STATUS(exit_code); - + count = 1; } - count = 1; - } - if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - ORTE_ERROR_LOG(rc); - } else { - rc = ORTE_SUCCESS; - } - /* NOTE: jdata CAN BE NULL. This is caused by an orted - * being ordered to kill all its procs, but there are no - * procs left alive on that node. This can happen, for example, - * when a proc aborts somewhere, but the procs on this node - * have completed. - * So check job has to know how to handle a NULL pointer - */ - orte_plm_base_check_job_completed(jdata); - break; - - case ORTE_PLM_HEARTBEAT_CMD: - OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive got heartbeat from %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&mev->sender))); - /* lookup the daemon object */ - if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { - /* this job can not possibly have been removed, so this is an error */ - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - goto CLEANUP; - } - gettimeofday(&beat, NULL); - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, mev->sender.vpid))) { - /* this proc is no longer in table - skip it */ - OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, - "%s plm:base:receive daemon %s is not in proc table", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_VPID_PRINT(mev->sender.vpid))); + if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + ORTE_ERROR_LOG(rc); + } else { + rc = ORTE_SUCCESS; + } + /* NOTE: jdata CAN BE NULL. This is caused by an orted + * being ordered to kill all its procs, but there are no + * procs left alive on that node. This can happen, for example, + * when a proc aborts somewhere, but the procs on this node + * have completed. + * So check job has to know how to handle a NULL pointer + */ + orte_plm_base_check_job_completed(jdata); break; - } - proc->beat = beat.tv_sec; - break; - - default: - ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); - return; + + case ORTE_PLM_HEARTBEAT_CMD: + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive got heartbeat from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&msgpkt->sender))); + /* lookup the daemon object */ + if (NULL == (jdata = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { + /* this job can not possibly have been removed, so this is an error */ + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + goto CLEANUP; + } + gettimeofday(&beat, NULL); + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, msgpkt->sender.vpid))) { + /* this proc is no longer in table - skip it */ + OPAL_OUTPUT_VERBOSE((5, orte_plm_globals.output, + "%s plm:base:receive daemon %s is not in proc table", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_VPID_PRINT(msgpkt->sender.vpid))); + break; + } + proc->beat = beat.tv_sec; + break; + + default: + ORTE_ERROR_LOG(ORTE_ERR_VALUE_OUT_OF_BOUNDS); + rc = ORTE_ERR_VALUE_OUT_OF_BOUNDS; + break; + } + + CLEANUP: + /* release the message */ + OBJ_RELEASE(msgpkt); + OBJ_DESTRUCT(&answer); + if (ORTE_SUCCESS != rc) { + goto DEPART; + } } + + /* reset the event */ + processing = false; + opal_event_add(&ready, 0); + +DEPART: + /* release the thread */ + OPAL_THREAD_UNLOCK(&lock); -CLEANUP: - /* release the message */ - OBJ_RELEASE(mev); - OBJ_DESTRUCT(&answer); - /* see if an error occurred - if so, wakeup the HNP so we can exit */ if (ORTE_PROC_IS_HNP && ORTE_SUCCESS != rc) { orte_trigger_event(&orte_exit); } + } /* @@ -371,8 +421,8 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, * buffer, however, is NOT released here, although its payload IS transferred * to the message buffer for later processing */ - ORTE_MESSAGE_EVENT(sender, buffer, tag, orte_plm_base_receive_process_msg); - + ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], true, sender, &buffer); + /* reissue the recv */ if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_PLM, @@ -385,3 +435,11 @@ void orte_plm_base_recv(int status, orte_process_name_t* sender, return; } +/* where HNP messages come */ +void orte_plm_base_receive_process_msg(int fd, short event, void *data) +{ + orte_message_event_t *mev = (orte_message_event_t*)data; + + ORTE_PROCESS_MESSAGE(&recvs, &lock, processing, ready_fd[1], false, &mev->sender, &mev->buffer); + OBJ_RELEASE(mev); +} diff --git a/orte/mca/rml/base/rml_base_components.c b/orte/mca/rml/base/rml_base_components.c index 1dbfafc606..2eb9551045 100644 --- a/orte/mca/rml/base/rml_base_components.c +++ b/orte/mca/rml/base/rml_base_components.c @@ -58,6 +58,22 @@ orte_rml_component_t *orte_rml_component = NULL; static bool component_open_called = false; +/* instantiate the msg_pkt object */ +static void msg_pkt_constructor(orte_msg_packet_t *pkt) +{ + pkt->buffer = NULL; +} +static void msg_pkt_destructor(orte_msg_packet_t *pkt) +{ + if (NULL != pkt->buffer) { + OBJ_RELEASE(pkt->buffer); + } +} +OBJ_CLASS_INSTANCE(orte_msg_packet_t, + opal_list_item_t, + msg_pkt_constructor, + msg_pkt_destructor); + int orte_rml_base_open(void) { diff --git a/orte/mca/rml/rml.h b/orte/mca/rml/rml.h index a861653414..af5a87eb0d 100644 --- a/orte/mca/rml/rml.h +++ b/orte/mca/rml/rml.h @@ -38,11 +38,11 @@ #endif #include "opal/mca/mca.h" -#include "orte/mca/rml/rml_types.h" - #include "opal/mca/crs/crs.h" #include "opal/mca/crs/base/base.h" +#include "orte/mca/rml/rml_types.h" + BEGIN_C_DECLS diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index d4dc5d5d0b..6122731c15 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -27,6 +27,7 @@ #include "orte_config.h" #include "orte/constants.h" +#include "orte/types.h" #include #ifdef HAVE_SYS_UIO_H @@ -37,12 +38,44 @@ #include #endif +#include "opal/dss/dss_types.h" +#include "opal/class/opal_list.h" BEGIN_C_DECLS /* ******************************************************************** */ +typedef struct { + opal_list_item_t super; + orte_process_name_t sender; + opal_buffer_t *buffer; +} orte_msg_packet_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_msg_packet_t); + + +#define ORTE_PROCESS_MESSAGE(rlist, lck, flg, fd, crt, sndr, buf) \ + do { \ + orte_msg_packet_t *pkt; \ + int data=1; \ + pkt = OBJ_NEW(orte_msg_packet_t); \ + pkt->sender.jobid = (sndr)->jobid; \ + pkt->sender.vpid = (sndr)->vpid; \ + if ((crt)) { \ + pkt->buffer = OBJ_NEW(opal_buffer_t); \ + opal_dss.copy_payload(pkt->buffer, *(buf)); \ + } else { \ + pkt->buffer = *(buf); \ + *(buf) = NULL; \ + } \ + OPAL_THREAD_LOCK((lck)); \ + opal_list_append((rlist), &pkt->super); \ + if (!(flg)) { \ + write((fd), &data, sizeof(data)); \ + } \ + OPAL_THREAD_UNLOCK((lck)); \ + } while(0); + /** * Constant tag values for well-known services