Enable a new command line option to --report-events that instructs mpirun to RML-report specific events during job life to the requestor.
This commit was SVN r21954.
Этот коммит содержится в:
родитель
3acdb53494
Коммит
8ae4b55d16
@ -151,6 +151,10 @@ char *orte_rankfile;
|
||||
/* default rank assigment and binding policy */
|
||||
orte_mapping_policy_t orte_default_mapping_policy = 0;
|
||||
|
||||
/* tool communication controls */
|
||||
bool orte_report_events = false;
|
||||
char *orte_report_events_uri = NULL;
|
||||
|
||||
#endif /* !ORTE_DISABLE_FULL_RTE */
|
||||
|
||||
int orte_debug_output = -1;
|
||||
|
@ -118,6 +118,18 @@ typedef struct orte_job_t orte_job_t;
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
/* sometimes we need to reset the exit status - for example, when we
|
||||
* are restarting a failed process
|
||||
*/
|
||||
#define ORTE_RESET_EXIT_STATUS() \
|
||||
do { \
|
||||
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, \
|
||||
"%s:%s(%d) reseting exit status", \
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
|
||||
__FILE__, __LINE__)); \
|
||||
orte_exit_status = 0; \
|
||||
} while(0);
|
||||
|
||||
|
||||
/* define a macro for computing time differences - used for timing tests
|
||||
* across the code base
|
||||
@ -593,6 +605,10 @@ ORTE_DECLSPEC extern char *orte_rankfile;
|
||||
/* default rank assigment and binding policy */
|
||||
ORTE_DECLSPEC extern orte_mapping_policy_t orte_default_mapping_policy;
|
||||
|
||||
/* tool communication controls */
|
||||
ORTE_DECLSPEC extern bool orte_report_events;
|
||||
ORTE_DECLSPEC extern char *orte_report_events_uri;
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
END_C_DECLS
|
||||
|
@ -373,6 +373,14 @@ int orte_register_params(void)
|
||||
ORTE_XSET_BINDING_POLICY(ORTE_BIND_TO_CORE);
|
||||
}
|
||||
|
||||
/* tool communication controls */
|
||||
mca_base_param_reg_string_name("orte", "report_events",
|
||||
"URI to which events are to be reported (default: NULL)]",
|
||||
false, false, NULL, &orte_report_events_uri);
|
||||
if (NULL != orte_report_events_uri) {
|
||||
orte_report_events = true;
|
||||
}
|
||||
|
||||
#endif /* ORTE_DISABLE_FULL_SUPPORT */
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
|
@ -410,6 +410,10 @@ static opal_cmd_line_init_t cmd_line_init[] = {
|
||||
NULL, OPAL_CMD_LINE_TYPE_BOOL,
|
||||
"Use regular expressions for launch" },
|
||||
|
||||
{ "orte", "report", "events", '\0', "report-events", "report-events", 1,
|
||||
NULL, OPAL_CMD_LINE_TYPE_STRING,
|
||||
"Report events to a tool listening at the specified URI" },
|
||||
|
||||
/* End of list */
|
||||
{ NULL, NULL, NULL, '\0', NULL, NULL, 0,
|
||||
NULL, OPAL_CMD_LINE_TYPE_NULL, NULL }
|
||||
@ -1332,9 +1336,7 @@ static int parse_globals(int argc, char* argv[], opal_cmd_line_t *cmd_line)
|
||||
ORTE_SET_MAPPING_POLICY(ORTE_MAPPING_BYSLOT);
|
||||
}
|
||||
|
||||
/* extract any binding policy directives - they will
|
||||
* be ignored unless paffinity_alone is set
|
||||
*/
|
||||
/* extract any binding policy directives */
|
||||
if (orterun_globals.bind_to_socket) {
|
||||
ORTE_SET_BINDING_POLICY(ORTE_BIND_TO_SOCKET);
|
||||
} else if (orterun_globals.bind_to_board) {
|
||||
|
@ -30,12 +30,15 @@
|
||||
#include "orte/mca/odls/odls_types.h"
|
||||
#include "orte/mca/rml/rml.h"
|
||||
#include "orte/mca/rml/rml_types.h"
|
||||
#include "orte/mca/rml/base/rml_contact.h"
|
||||
#include "orte/mca/routed/routed.h"
|
||||
#include "orte/util/name_fns.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/util/comm/comm.h"
|
||||
|
||||
/* internal communication handshake */
|
||||
/* quick timeout loop */
|
||||
static bool timer_fired;
|
||||
static opal_buffer_t answer;
|
||||
@ -53,9 +56,23 @@ static void quicktime_cb(int fd, short event, void *cbdata)
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static void send_cbfunc(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
/* cancel the timer */
|
||||
if (NULL != quicktime) {
|
||||
opal_evtimer_del(quicktime);
|
||||
free(quicktime);
|
||||
quicktime = NULL;
|
||||
}
|
||||
/* declare the work done */
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static void recv_info(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
{
|
||||
int rc;
|
||||
|
||||
@ -73,21 +90,133 @@ static void recv_info(int status, orte_process_name_t* sender,
|
||||
timer_fired = true;
|
||||
}
|
||||
|
||||
static void send_cbfunc(int status, orte_process_name_t* sender,
|
||||
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
||||
void* cbdata)
|
||||
|
||||
/* name of attached tool */
|
||||
static orte_process_name_t tool;
|
||||
static bool tool_connected = false;
|
||||
|
||||
/* connect a tool to us so we can send reports */
|
||||
int orte_util_comm_connect_tool(char *uri)
|
||||
{
|
||||
/* cancel the timer */
|
||||
if (NULL != quicktime) {
|
||||
opal_evtimer_del(quicktime);
|
||||
free(quicktime);
|
||||
quicktime = NULL;
|
||||
int rc;
|
||||
|
||||
opal_output(0, "connecting tool %s", uri);
|
||||
|
||||
/* set the contact info into the comm hash tables*/
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.set_contact_info(uri))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return(rc);
|
||||
}
|
||||
OBJ_RELEASE(buffer);
|
||||
/* declare the work done */
|
||||
timer_fired = true;
|
||||
|
||||
/* extract the tool's name and store it */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml_base_parse_uris(uri, &tool, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* set the route to be direct */
|
||||
if (ORTE_SUCCESS != (rc = orte_routed.update_route(&tool, &tool))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
tool_connected = true;
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
/* whether we are in step mode */
|
||||
static bool step=false;
|
||||
|
||||
/* report an event to a connected tool */
|
||||
int orte_util_comm_report_event(orte_comm_event_t ev)
|
||||
{
|
||||
int rc, i;
|
||||
opal_buffer_t buf;
|
||||
orte_node_t *node;
|
||||
|
||||
/* if nothing is connected, ignore this */
|
||||
if (!tool_connected) {
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
opal_output(0, "reporting event");
|
||||
|
||||
/* init a buffer for the data */
|
||||
OBJ_CONSTRUCT(&buf, opal_buffer_t);
|
||||
/* flag the type of event */
|
||||
opal_dss.pack(&buf, &ev, 1, ORTE_COMM_EVENT);
|
||||
|
||||
switch (ev) {
|
||||
case ORTE_COMM_EVENT_ALLOCATE:
|
||||
/* loop through nodes, storing just node names */
|
||||
for (i=0; i < orte_node_pool->size; i++) {
|
||||
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) {
|
||||
continue;
|
||||
}
|
||||
opal_dss.pack(&buf, &node->name, 1, OPAL_STRING);
|
||||
}
|
||||
break;
|
||||
|
||||
case ORTE_COMM_EVENT_MAP:
|
||||
break;
|
||||
|
||||
case ORTE_COMM_EVENT_LAUNCH:
|
||||
break;
|
||||
|
||||
default:
|
||||
ORTE_ERROR_LOG(ORTE_ERROR);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return ORTE_ERROR;
|
||||
break;
|
||||
}
|
||||
|
||||
/* do the send */
|
||||
if (0 > (rc = orte_rml.send_buffer(&tool, &buf, ORTE_RML_TAG_TOOL, 0))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&buf);
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (step) {
|
||||
/* the caller wants to wait until an ack is received -
|
||||
* define a max time to wait for an answer
|
||||
*/
|
||||
OBJ_CONSTRUCT(&answer, opal_buffer_t);
|
||||
timer_fired = false;
|
||||
error_exit = ORTE_SUCCESS;
|
||||
ORTE_DETECT_TIMEOUT(&quicktime, 100, 1000, 100000, quicktime_cb);
|
||||
|
||||
/* get the answer */
|
||||
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
||||
ORTE_RML_TAG_TOOL,
|
||||
ORTE_RML_NON_PERSISTENT,
|
||||
recv_info,
|
||||
NULL))) {
|
||||
/* cancel the timer */
|
||||
if (NULL != quicktime) {
|
||||
opal_evtimer_del(quicktime);
|
||||
free(quicktime);
|
||||
quicktime = NULL;
|
||||
}
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_DESTRUCT(&answer);
|
||||
return rc;
|
||||
}
|
||||
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
|
||||
/* cleanup */
|
||||
OBJ_DESTRUCT(&answer);
|
||||
|
||||
if (ORTE_SUCCESS != error_exit) {
|
||||
return error_exit;
|
||||
}
|
||||
}
|
||||
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int orte_util_comm_query_job_info(const orte_process_name_t *hnp, orte_jobid_t job,
|
||||
int *num_jobs, orte_job_t ***job_info_array)
|
||||
{
|
||||
@ -129,6 +258,9 @@ int orte_util_comm_query_job_info(const orte_process_name_t *hnp, orte_jobid_t j
|
||||
/* wait for send to complete */
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
|
||||
/* release the buffer */
|
||||
OBJ_RELEASE(cmd);
|
||||
|
||||
/* did it succeed? */
|
||||
if (ORTE_SUCCESS != error_exit) {
|
||||
return error_exit;
|
||||
@ -235,6 +367,9 @@ int orte_util_comm_query_node_info(const orte_process_name_t *hnp, char *node,
|
||||
/* wait for send to complete */
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
|
||||
/* release the buffer */
|
||||
OBJ_RELEASE(cmd);
|
||||
|
||||
/* did it succeed? */
|
||||
if (ORTE_SUCCESS != error_exit) {
|
||||
return error_exit;
|
||||
@ -344,6 +479,9 @@ int orte_util_comm_query_proc_info(const orte_process_name_t *hnp, orte_jobid_t
|
||||
/* wait for send to complete */
|
||||
ORTE_PROGRESSED_WAIT(timer_fired, 0, 1);
|
||||
|
||||
/* release the buffer */
|
||||
OBJ_RELEASE(cmd);
|
||||
|
||||
/* did it succeed? */
|
||||
if (ORTE_SUCCESS != error_exit) {
|
||||
return error_exit;
|
||||
|
@ -34,6 +34,17 @@
|
||||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef uint8_t orte_comm_event_t;
|
||||
#define ORTE_COMM_EVENT OPAL_UINT8
|
||||
|
||||
#define ORTE_COMM_EVENT_ALLOCATE 0x01
|
||||
#define ORTE_COMM_EVENT_MAP 0x02
|
||||
#define ORTE_COMM_EVENT_LAUNCH 0x04
|
||||
|
||||
ORTE_DECLSPEC int orte_util_comm_connect_tool(char *uri);
|
||||
|
||||
ORTE_DECLSPEC int orte_util_comm_report_event(orte_comm_event_t ev);
|
||||
|
||||
ORTE_DECLSPEC int orte_util_comm_query_job_info(const orte_process_name_t *hnp, orte_jobid_t job,
|
||||
int *num_jobs, orte_job_t ***job_info_array);
|
||||
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user