1
1

fixes for grpcomm rcd/brucks algorithms

Этот коммит содержится в:
Elena 2014-10-09 06:12:26 +02:00
родитель 9947758d98
Коммит e319c95267
11 изменённых файлов: 606 добавлений и 436 удалений

Просмотреть файл

@ -30,6 +30,7 @@
#include "orte_config.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_hash_table.h"
#include "opal/dss/dss_types.h"
#include "opal/mca/mca.h"
#include "opal/mca/hwloc/hwloc.h"
@ -67,6 +68,7 @@ OBJ_CLASS_DECLARATION(orte_grpcomm_base_active_t);
typedef struct {
opal_list_t actives;
opal_list_t ongoing;
opal_hash_table_t sig_table;
} orte_grpcomm_base_t;
ORTE_DECLSPEC extern orte_grpcomm_base_t orte_grpcomm_base;

Просмотреть файл

@ -70,6 +70,7 @@ static int orte_grpcomm_base_close(void)
}
OPAL_LIST_DESTRUCT(&orte_grpcomm_base.actives);
OPAL_LIST_DESTRUCT(&orte_grpcomm_base.ongoing);
OBJ_DESTRUCT(&orte_grpcomm_base.sig_table);
return mca_base_framework_components_close(&orte_grpcomm_base_framework, NULL);
}
@ -82,6 +83,8 @@ static int orte_grpcomm_base_open(mca_base_open_flag_t flags)
{
OBJ_CONSTRUCT(&orte_grpcomm_base.actives, opal_list_t);
OBJ_CONSTRUCT(&orte_grpcomm_base.ongoing, opal_list_t);
OBJ_CONSTRUCT(&orte_grpcomm_base.sig_table, opal_hash_table_t);
opal_hash_table_init(&orte_grpcomm_base.sig_table, 128);
return mca_base_framework_components_open(&orte_grpcomm_base_framework, flags);
}
@ -97,6 +100,7 @@ static void scon(orte_grpcomm_signature_t *p)
{
p->signature = NULL;
p->sz = 0;
p->seq_num = 0;
}
static void sdes(orte_grpcomm_signature_t *p)
{
@ -115,8 +119,10 @@ static void ccon(orte_grpcomm_coll_t *p)
p->dmns = NULL;
p->ndmns = 0;
p->nreported = 0;
p->distance_mask_recv = 0;
p->cbfunc = NULL;
p->cbdata = NULL;
p->buffers = NULL;
}
static void cdes(orte_grpcomm_coll_t *p)
{
@ -127,6 +133,7 @@ static void cdes(orte_grpcomm_coll_t *p)
if (NULL != p->dmns) {
free(p->dmns);
}
free(p->buffers);
}
OBJ_CLASS_INSTANCE(orte_grpcomm_coll_t,
opal_list_item_t,

Просмотреть файл

@ -128,9 +128,11 @@ int orte_grpcomm_API_xcast(orte_grpcomm_signature_t *sig,
static void allgather_stub(int fd, short args, void *cbdata)
{
orte_grpcomm_caddy_t *cd = (orte_grpcomm_caddy_t*)cbdata;
int ret = OPAL_SUCCESS;
int rc;
orte_grpcomm_base_active_t *active;
orte_grpcomm_coll_t *coll;
void *seq_number;
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:allgather stub",
@ -139,6 +141,28 @@ static void allgather_stub(int fd, short args, void *cbdata)
/* retrieve an existing tracker, create it if not
* already found. The allgather module is responsible
* for releasing it upon completion of the collective */
ret = opal_hash_table_get_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), &seq_number);
if (OPAL_ERR_NOT_FOUND == ret) {
cd->sig->seq_num = 0;
} else if (OPAL_SUCCESS == ret) {
cd->sig->seq_num = *((uint32_t *)(seq_number)) + 1;
} else {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather can't not get signature from hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cd);
return;
}
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num);
if (OPAL_SUCCESS != ret) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather can't not add new signature to hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cd);
return;
}
coll = orte_grpcomm_base_get_tracker(cd->sig, true);
coll->cbfunc = cd->cbfunc;
coll->cbdata = cd->cbdata;
@ -169,9 +193,8 @@ int orte_grpcomm_API_allgather(orte_grpcomm_signature_t *sig,
* access framework-global data safely */
cd = OBJ_NEW(orte_grpcomm_caddy_t);
/* ensure the data doesn't go away */
OBJ_RETAIN(sig);
OBJ_RETAIN(buf);
cd->sig = sig;
opal_dss.copy((void **)&cd->sig, (void *)sig, ORTE_SIGNATURE);
cd->buf = buf;
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
@ -197,7 +220,7 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
/* if only one is NULL, then we can't possibly match */
break;
}
if (OPAL_EQUAL == opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE)) {
if (OPAL_EQUAL == (rc = opal_dss.compare(sig, coll->sig, ORTE_SIGNATURE))) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -213,16 +236,17 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
return NULL;
}
coll = OBJ_NEW(orte_grpcomm_coll_t);
opal_dss.copy((void **)&coll->sig, (void *)sig, ORTE_SIGNATURE);
if (1 < opal_output_get_verbosity(orte_grpcomm_base_framework.framework_output)) {
char *tmp=NULL;
(void)opal_dss.print(&tmp, NULL, sig, ORTE_SIGNATURE);
(void)opal_dss.print(&tmp, NULL, coll->sig, ORTE_SIGNATURE);
opal_output(0, "%s grpcomm:base: creating new coll for procs %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), tmp);
free(tmp);
}
coll = OBJ_NEW(orte_grpcomm_coll_t);
OBJ_RETAIN(sig);
coll->sig = sig;
opal_list_append(&orte_grpcomm_base.ongoing, &coll->super);
/* now get the daemons involved */

Просмотреть файл

@ -6,6 +6,8 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -39,7 +41,8 @@ static int xcast(orte_vpid_t *vpids,
opal_buffer_t *msg);
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf);
static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distance);
static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t distance);
static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name_t *peer, uint32_t distance);
static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
@ -91,65 +94,28 @@ static int allgather(orte_grpcomm_coll_t *coll,
"%s grpcomm:coll:bruck algo employed for %d processes",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
/* if we only have one proc participating, just copy the data across and return */
if ((coll->ndmns != 0) && ((coll->ndmns & (coll->ndmns - 1)) == 0)) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:bruck number of participating daemons (%d) is power 2",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int) coll->ndmns ));
return ORTE_ERROR;
}
/* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf);
/* record that we contributed */
coll->nreported += 1;
/* mark local data received */
coll->distance_mask_recv |= 1;
/* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf);
/* Communication step:
At every step i, rank r:
- doubles the distance
- sends message containing all data collected so far to rank r - distance
- receives message containing all data collected so far from rank (r + distance)
*/
/* find my position in the group of participants. This
* value is the "rank" we will use in the algo
*/
brks_allgather_send_dist(coll, 1);
/* process data */
brks_allgather_process_data(coll, 1);
return ORTE_SUCCESS;
}
static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distance) {
orte_process_name_t peer_send, peer_recv;
static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name_t *peer, uint32_t distance) {
opal_buffer_t *send_buf;
int rc;
peer_send.jobid = ORTE_PROC_MY_NAME->jobid;
peer_recv.jobid = ORTE_PROC_MY_NAME->jobid;
if (1 == coll->ndmns) {
peer_send.vpid = ORTE_PROC_MY_NAME->vpid;
peer_recv.vpid = ORTE_PROC_MY_NAME->vpid;
} else {
orte_vpid_t nv, rank;
rank = ORTE_VPID_INVALID;
for (nv = 0; nv < coll->ndmns; nv++) {
if (coll->dmns[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* first send my current contents */
nv = (coll->ndmns + rank - distance) % coll->ndmns;
peer_send.vpid = coll->dmns[nv];
/* now setup to recv from my other partner */
nv = (rank + distance) % coll->ndmns;
peer_recv.vpid = coll->dmns[nv];
}
send_buf = OBJ_NEW(opal_buffer_t);
/* pack the signature */
@ -164,12 +130,6 @@ static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t dista
OBJ_RELEASE(send_buf);
return rc;
}
/* pack the number of reported processes */
if (OPAL_SUCCESS != (rc = opal_dss.pack(send_buf, &coll->nreported, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(send_buf);
return rc;
}
/* pack the data */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(send_buf, &coll->bucket))) {
ORTE_ERROR_LOG(rc);
@ -178,11 +138,12 @@ static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t dista
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:bruck sending to %s",
"%s grpcomm:coll:brks SENDING TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer_send)));
ORTE_NAME_PRINT(peer)));
if (0 > (rc = orte_rml.send_buffer_nb(&peer_send, send_buf,
if (0 > (rc = orte_rml.send_buffer_nb(peer, send_buf,
ORTE_RML_TAG_ALLGATHER_BRKS,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
@ -190,27 +151,95 @@ static int brks_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t dista
return rc;
};
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:bruck receiving from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer_recv)));
return ORTE_SUCCESS;
}
static void brks_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t distance) {
/* Communication step:
At every step i, rank r:
- doubles the distance
- sends message containing all data collected so far to rank r - distance
- receives message containing all data collected so far from rank (r + distance)
*/
orte_process_name_t peer;
orte_vpid_t nv, rank;
int rc;
peer.jobid = ORTE_PROC_MY_NAME->jobid;
/* get my own rank */
rank = ORTE_VPID_INVALID;
for (orte_vpid_t nv = 0; nv < coll->ndmns; nv++) {
if (coll->dmns[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"Peer not found"));
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
brks_finalize_coll(coll, ORTE_ERR_NOT_FOUND);
return;
}
while (distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks process distance %u (mask recv: 0x%x)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
/* first send my current contents */
nv = (coll->ndmns + rank - distance) % coll->ndmns;
peer.vpid = coll->dmns[nv];
brks_allgather_send_dist(coll, &peer, distance);
/* check whether data for next distance is available*/
if ((NULL != coll->buffers) && (coll->buffers[distance - 1] != NULL)) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks %u distance data found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, coll->buffers[distance - 1]))) {
ORTE_ERROR_LOG(rc);
brks_finalize_coll(coll, rc);
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
OBJ_RELEASE(coll->buffers[distance - 1]);
coll->buffers[distance - 1] = NULL;
distance = distance << 1;
continue;
}
break;
}
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks reported %lu process from %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)coll->nreported,
(unsigned long)coll->ndmns));
/* if we are done, then complete things */
if (coll->nreported >= coll->ndmns){
brks_finalize_coll(coll, ORTE_SUCCESS);
}
return;
}
static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int32_t cnt, num_remote;
int32_t cnt;
int rc;
orte_grpcomm_signature_t *sig;
orte_grpcomm_coll_t *coll;
orte_vpid_t distance, new_distance;
uint32_t distance;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub received data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
"%s grpcomm:coll:brks RECEIVING FROM %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the signature */
cnt = 1;
@ -225,7 +254,6 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
OBJ_RELEASE(sig);
return;
}
/* unpack the distance */
distance = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &distance, &cnt, OPAL_INT32))) {
@ -234,17 +262,14 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
brks_finalize_coll(coll, rc);
return;
}
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
/* unpack number of reported processes */
num_remote = 0;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &num_remote, &cnt, OPAL_INT32))) {
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
brks_finalize_coll(coll, rc);
return;
}
coll->nreported += num_remote;
/* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, "
"Process the next distance (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
/* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig);
@ -252,13 +277,36 @@ static void brks_allgather_recv_dist(int status, orte_process_name_t* sender,
brks_finalize_coll(coll, rc);
return;
}
//update distance and send
new_distance = distance <<= 1;
if (new_distance < coll->ndmns) {
brks_allgather_send_dist(coll, new_distance);
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
brks_allgather_process_data(coll, (uint32_t)(distance << 1));
} else {
brks_finalize_coll(coll, ORTE_SUCCESS);
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks data from %d distance received, "
"still waiting for data (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), coll->ndmns - 1))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
brks_finalize_coll(coll, rc);
return;
}
}
if (NULL == (coll->buffers[distance - 1] = OBJ_NEW(opal_buffer_t))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
brks_finalize_coll(coll, rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(coll->buffers[distance - 1], buffer))) {
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
brks_finalize_coll(coll, rc);
return;
}
}
OBJ_RELEASE(sig);
@ -270,6 +318,10 @@ static int brks_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
opal_buffer_t *reply;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:brks declared collective complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
reply = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &coll->nreported, 1, OPAL_UINT64))) {

Просмотреть файл

@ -57,6 +57,7 @@ typedef struct {
opal_object_t super;
orte_process_name_t *signature;
size_t sz;
uint32_t seq_num;
} orte_grpcomm_signature_t;
OBJ_CLASS_DECLARATION(orte_grpcomm_signature_t);
@ -73,6 +74,10 @@ typedef struct {
size_t ndmns;
/* number reported in */
size_t nreported;
/* distance masks for receive */
uint32_t distance_mask_recv;
/* received buckets */
opal_buffer_t ** buffers;
/* callback function */
orte_grpcomm_cbfunc_t cbfunc;
/* user-provided callback data */

Просмотреть файл

@ -6,6 +6,8 @@
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -18,6 +20,7 @@
#include "orte/types.h"
#include "orte/runtime/orte_wait.h"
#include <math.h>
#include <string.h>
#include "opal/dss/dss.h"
@ -39,11 +42,13 @@ static int xcast(orte_vpid_t *vpids,
opal_buffer_t *msg);
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf);
static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distance);
static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t distance);
static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name_t *peer, uint32_t distance);
static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static int rcd_finalize_coll(orte_grpcomm_coll_t *coll, int ret);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_rcd_module = {
init,
@ -85,58 +90,35 @@ static int xcast(orte_vpid_t *vpids,
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *sendbuf)
{
/* check the number of involved daemons - if it is not a power of two,
* then we cannot do it */
if (0 == ((coll->ndmns != 0) && !(coll->ndmns & (coll->ndmns - 1)))) {
return ORTE_ERR_TAKE_NEXT_OPTION;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub algo employed for %d processes",
"%s grpcomm:coll:recdub algo employed for %d daemons",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns));
/* if we only have one proc participating, just copy the data across and return */
if (!((coll->ndmns != 0) && ((coll->ndmns & (coll->ndmns - 1)) == 0))) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub number of participating daemons (%d) is not power 2",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)coll->ndmns ));
return ORTE_ERROR;
}
/* record that we contributed */
coll->nreported += 1;
/* mark local data received */
coll->distance_mask_recv |= 1;
/* start by seeding the collection with our own data */
opal_dss.copy_payload(&coll->bucket, sendbuf);
/* Communication step:
At every step i, rank r:
- exchanges message containing all data collected so far with rank peer = (r ^ 2^i).
*/
rcd_allgather_send_dist(coll, 1);
/* process data */
rcd_allgather_process_data(coll, 1);
return ORTE_SUCCESS;
}
static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distance) {
orte_process_name_t peer;
static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_process_name_t *peer, uint32_t distance) {
opal_buffer_t *send_buf;
int rc;
peer.jobid = ORTE_PROC_MY_NAME->jobid;
if (1 == coll->ndmns) {
peer.vpid = ORTE_PROC_MY_NAME->vpid;
} else {
orte_vpid_t nv, rank;
rank = ORTE_VPID_INVALID;
for (nv = 0; nv < coll->ndmns; nv++) {
if (coll->dmns[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
}
/* first send my current contents */
nv = rank ^ distance;
peer.vpid = coll->dmns[nv];
}
send_buf = OBJ_NEW(opal_buffer_t);
/* pack the signature */
@ -145,14 +127,8 @@ static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distan
OBJ_RELEASE(send_buf);
return rc;
}
/* pack the current distance */
if (OPAL_SUCCESS != (rc = opal_dss.pack(send_buf, &distance, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(send_buf);
return rc;
}
/* pack the number of reported processes */
if (OPAL_SUCCESS != (rc = opal_dss.pack(send_buf, &coll->nreported, 1, OPAL_INT32))) {
/* pack the distance */
if (OPAL_SUCCESS != (rc = opal_dss.pack(send_buf, &distance, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(send_buf);
return rc;
@ -165,40 +141,106 @@ static int rcd_allgather_send_dist(orte_grpcomm_coll_t *coll, orte_vpid_t distan
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub sending to %s",
"%s grpcomm:coll:recdub SENDING TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer)));
ORTE_NAME_PRINT(peer)));
if (0 > (rc = orte_rml.send_buffer_nb(&peer, send_buf,
if (0 > (rc = orte_rml.send_buffer_nb(peer, send_buf,
ORTE_RML_TAG_ALLGATHER_RCD,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(send_buf);
return rc;
};
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub receiving from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&peer)));
return ORTE_SUCCESS;
}
static void rcd_allgather_process_data(orte_grpcomm_coll_t *coll, uint32_t distance) {
/* Communication step:
At every step i, rank r:
- exchanges message containing all data collected so far with rank peer = (r ^ 2^i).
*/
orte_process_name_t peer;
orte_vpid_t nv, rank;
uint32_t distance_index;
int rc;
peer.jobid = ORTE_PROC_MY_NAME->jobid;
/* get my own rank */
rank = ORTE_VPID_INVALID;
for (orte_vpid_t nv = 0; nv < coll->ndmns; nv++) {
if (coll->dmns[nv] == ORTE_PROC_MY_NAME->vpid) {
rank = nv;
break;
}
}
/* check for bozo case */
if (ORTE_VPID_INVALID == rank) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"Peer not found"));
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
rcd_finalize_coll(coll, ORTE_ERR_NOT_FOUND);
return;
}
while(distance < coll->ndmns) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub process distance %u (mask recv: 0x%x)",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
/* first send my current contents */
nv = rank ^ distance;
peer.vpid = coll->dmns[nv];
rcd_allgather_send_dist(coll, &peer, distance);
/* check whether data for next distance is available*/
distance_index = log2(distance);
if ((NULL != coll->buffers) && (NULL != coll->buffers[distance_index])) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub %u distance data found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance));
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, coll->buffers[distance_index]))) {
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
OBJ_RELEASE(coll->buffers[distance_index]);
coll->buffers[distance_index] = NULL;
distance = distance << 1;
continue;
}
break;
}
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub reported %lu process from %lu",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)coll->nreported,
(unsigned long)coll->ndmns));
/* if we are done, then complete things */
if (coll->nreported >= coll->ndmns){
rcd_finalize_coll(coll, ORTE_SUCCESS);
}
return;
}
static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
int32_t cnt, num_remote;
int32_t cnt;
uint32_t distance, distance_index;
int rc;
orte_grpcomm_signature_t *sig;
orte_grpcomm_coll_t *coll;
orte_vpid_t distance, new_distance;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub received data",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
"%s grpcomm:coll:recdub RECEIVING FROM %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* unpack the signature */
cnt = 1;
@ -213,26 +255,22 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
OBJ_RELEASE(sig);
return;
}
/* unpack the distance */
distance = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &distance, &cnt, OPAL_INT32))) {
distance = 0;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &distance, &cnt, OPAL_UINT32))) {
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
assert(0 == (coll->distance_mask_recv & (uint32_t)(1 << distance)));
/* unpack number of reported */
num_remote = 0;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &num_remote, &cnt, OPAL_INT32))) {
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
coll->nreported += num_remote;
/* Check whether we can process next distance */
if (coll->distance_mask_recv & ((uint32_t)(1 << (distance >> 1)))) {
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, "
"Process the next distance (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
/* capture any provided content */
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(&coll->bucket, buffer))) {
OBJ_RELEASE(sig);
@ -240,13 +278,37 @@ static void rcd_allgather_recv_dist(int status, orte_process_name_t* sender,
rcd_finalize_coll(coll, rc);
return;
}
//update distance and send
new_distance = distance <<= 1;
if (new_distance < coll->ndmns) {
rcd_allgather_send_dist(coll, new_distance);
coll->nreported += distance;
coll->distance_mask_recv |= (uint32_t)(1 << distance);
rcd_allgather_process_data(coll, (uint32_t)(distance << 1));
} else {
rcd_finalize_coll(coll, ORTE_SUCCESS);
OPAL_OUTPUT_VERBOSE((80, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub data from %d distance received, "
"still waiting for data (mask recv: 0x%x).",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), distance, coll->distance_mask_recv));
if (NULL == coll->buffers) {
if (NULL == (coll->buffers = (opal_buffer_t **)calloc(sizeof(opal_buffer_t *), log2(coll->ndmns)))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
}
distance_index = log2(distance);
if (NULL == (coll->buffers[distance_index] = OBJ_NEW(opal_buffer_t))) {
rc = OPAL_ERR_OUT_OF_RESOURCE;
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.copy_payload(coll->buffers[distance_index], buffer))) {
OBJ_RELEASE(sig);
ORTE_ERROR_LOG(rc);
rcd_finalize_coll(coll, rc);
return;
}
}
OBJ_RELEASE(sig);
@ -258,6 +320,10 @@ static int rcd_finalize_coll(orte_grpcomm_coll_t *coll, int ret) {
opal_buffer_t *reply;
int rc;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:coll:recdub declared collective complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
reply = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &coll->nreported, 1, OPAL_UINT64))) {

Просмотреть файл

@ -285,11 +285,16 @@ int orte_dt_compare_sig(orte_grpcomm_signature_t *value1, orte_grpcomm_signature
}
if (value2->sz > value1->sz) {
return OPAL_VALUE2_GREATER;
}
if (value1->seq_num > value2->seq_num) {
return OPAL_VALUE1_GREATER;
}
if (value2->seq_num > value1->seq_num) {
return OPAL_VALUE2_GREATER;
}
/* same size - check contents */
if (0 == memcmp(value1->signature, value2->signature, value1->sz*sizeof(orte_process_name_t))) {
return OPAL_EQUAL;
}
return OPAL_VALUE2_GREATER;
}

Просмотреть файл

@ -369,12 +369,12 @@ int orte_dt_copy_sig(orte_grpcomm_signature_t **dest, orte_grpcomm_signature_t *
}
(*dest)->sz = src->sz;
(*dest)->signature = (orte_process_name_t*)malloc(src->sz * sizeof(orte_process_name_t));
(*dest)->seq_num = src->seq_num;
if (NULL == (*dest)->signature) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OBJ_RELEASE(*dest);
return ORTE_ERR_OUT_OF_RESOURCE;
}
memcpy(&(*dest)->signature, &src->signature, src->sz * sizeof(orte_process_name_t));
memcpy((*dest)->signature, src->signature, src->sz * sizeof(orte_process_name_t));
return ORTE_SUCCESS;
}

Просмотреть файл

@ -880,6 +880,11 @@ int orte_dt_pack_sig(opal_buffer_t *buffer, const void *src, int32_t num_vals,
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack the sequence number */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &ptr[i]->seq_num, 1, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 < ptr[i]->sz) {
/* pack the array */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ptr[i]->signature, ptr[i]->sz, ORTE_NAME))) {

Просмотреть файл

@ -917,7 +917,7 @@ int orte_dt_print_sig(char **output, char *prefix, orte_grpcomm_signature_t *src
}
/* there must be at least one */
asprintf(&tmp, "%sORTE_SIG\tValue: ", prefx);
asprintf(&tmp, "%sORTE_SIG\tSeqNumber:%d\tValue: ", prefx, src->seq_num);
for (i=0; i < src->sz; i++) {
asprintf(&tmp2, "%s%s", tmp, ORTE_NAME_PRINT(&src->signature[i]));

Просмотреть файл

@ -972,6 +972,10 @@ int orte_dt_unpack_sig(opal_buffer_t *buffer, void *dest, int32_t *num_vals,
ORTE_ERROR_LOG(rc);
return rc;
}
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ptr[i]->seq_num, &cnt, OPAL_UINT32))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (0 < ptr[i]->sz) {
/* allocate space for the array */
ptr[i]->signature = (orte_process_name_t*)malloc(ptr[i]->sz * sizeof(orte_process_name_t));