From f115b4aed29c6155d9496bda7787ac10428a3164 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 2 Apr 2008 13:35:06 +0000 Subject: [PATCH] Checkpoint the revised gather algorithm This commit was SVN r18072. --- orte/mca/grpcomm/exp/grpcomm_exp.c | 133 ++++++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/orte/mca/grpcomm/exp/grpcomm_exp.c b/orte/mca/grpcomm/exp/grpcomm_exp.c index 4622ab99b9..ce9ab9a5a4 100644 --- a/orte/mca/grpcomm/exp/grpcomm_exp.c +++ b/orte/mca/grpcomm/exp/grpcomm_exp.c @@ -65,6 +65,7 @@ static int xcast(orte_jobid_t job, opal_buffer_t *buffer, orte_rml_tag_t tag); static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode); +static int new_barrier(void); /* Module def */ orte_grpcomm_base_module_t orte_grpcomm_exp_module = { @@ -73,7 +74,7 @@ orte_grpcomm_base_module_t orte_grpcomm_exp_module = { xcast, orte_grpcomm_base_allgather, orte_grpcomm_base_allgather_list, - orte_grpcomm_base_barrier, + new_barrier, next_recips, orte_grpcomm_base_set_proc_attr, orte_grpcomm_base_get_proc_attr, @@ -798,3 +799,133 @@ static int next_recips(opal_list_t *names, orte_grpcomm_mode_t mode) } +/*** TEST AREA FOR NEW COLLECTIVES ***/ +#define DEGREE 2 +static orte_std_cntr_t barrier_num_recvd; +static bool barrier_failed; + +static void barrier_server_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, + orte_rml_tag_t tag, void *cbdata) +{ + int rc; + + /* bump counter */ + ++barrier_num_recvd; + /* reissue the recv */ + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + barrier_failed = true; + } +} + +static int new_barrier(void) +{ + orte_vpid_t first_child, num_daemons; + orte_std_cntr_t num_children; + opal_buffer_t buf; + orte_daemon_cmd_flag_t command=ORTE_DAEMON_COLL_CMD; + orte_rml_tag_t target_tag=ORTE_RML_TAG_BARRIER_SERVER; + orte_process_name_t parent; + int rc; + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:exp: barrier entered", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + + /* if I am not the HNP or daemon, then just send to my + * local daemon + */ + if (!orte_process_info.hnp && + !orte_process_info.daemon) { + OBJ_CONSTRUCT(&buf, opal_buffer_t); + /* tell the daemon to collect the data */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + /* tell the daemon where it is eventually to be delivered */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(&buf, &target_tag, 1, ORTE_RML_TAG))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + /* send to local daemon */ + if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON, &buf, ORTE_RML_TAG_DAEMON, 0))) { + ORTE_ERROR_LOG(rc); + OBJ_DESTRUCT(&buf); + return rc; + } + OBJ_DESTRUCT(&buf); + return ORTE_SUCCESS; + } + + num_daemons = orte_process_info.num_procs; + first_child = (ORTE_PROC_MY_NAME->vpid * DEGREE) + 1; + + if (first_child < num_daemons) { + /* How many children do I have? */ + if (first_child + DEGREE > num_daemons) { + num_children = num_daemons - first_child; + } else { + num_children = DEGREE; + } + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "Rank %d has %d children, first child %d\n", + ORTE_PROC_MY_NAME->vpid, + num_children, first_child)); + + /* Use non-blocking receives so that they can progress + * simultaneously + */ + barrier_num_recvd = 0; + barrier_failed = false; + rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER, + ORTE_RML_NON_PERSISTENT, barrier_server_recv, NULL); + if (rc != ORTE_SUCCESS && rc != ORTE_ERR_NOT_IMPLEMENTED) { + ORTE_ERROR_LOG(rc); + return rc; + } + + ORTE_PROGRESSED_WAIT(false, barrier_num_recvd, num_children); + + /* cancel the lingering recv */ + if (ORTE_SUCCESS != (rc = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_BARRIER_SERVER))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s got all data from children", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + } + + /* If you have a parent, send */ + if (0 != ORTE_PROC_MY_NAME->vpid) { + /* setup the buffer */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + + parent.jobid = ORTE_PROC_MY_NAME->jobid; + parent.vpid = (ORTE_PROC_MY_NAME->vpid - 1) / DEGREE; + + OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output, + "%s sending to parent %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&parent))); + + if (0 > orte_rml.send_buffer(&parent, &buf, ORTE_RML_TAG_BARRIER_SERVER, 0)) { + ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE); + OBJ_DESTRUCT(&buf); + return ORTE_ERR_COMM_FAILURE; + } + OBJ_DESTRUCT(&buf); + } + + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output, + "%s grpcomm:exp: barrier completed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + return ORTE_SUCCESS; +}